Repository: celix Updated Branches: refs/heads/feature/CELIX-408_runtime 7efe4331a -> 2d9c77d53
CELIX-408: Refactoring in the pubsub serializer usage. The topology manger does not have to started before the pubsub admins anymore Known issues: - The serializer how to be started before the admins, to prevent issue when using a admin without serializer. Note admin should only register itself if a serializer is available. - Still considerd instable Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/2d9c77d5 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/2d9c77d5 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/2d9c77d5 Branch: refs/heads/feature/CELIX-408_runtime Commit: 2d9c77d530184d5d3d119cd3f92202e332996c78 Parents: 7efe433 Author: Pepijn Noltes <pepijnnol...@gmail.com> Authored: Wed Apr 12 13:27:09 2017 +0200 Committer: Pepijn Noltes <pepijnnol...@gmail.com> Committed: Wed Apr 12 13:27:09 2017 +0200 ---------------------------------------------------------------------- pubsub/deploy/CMakeLists.txt | 50 ++++---- .../include/pubsub_publish_service_private.h | 4 +- .../private/src/pubsub_admin_impl.c | 5 +- .../private/src/topic_publication.c | 104 +++++++++------- .../private/src/topic_subscription.c | 57 ++++++--- .../include/pubsub_publish_service_private.h | 4 +- .../private/include/topic_subscription.h | 2 +- .../private/src/pubsub_admin_impl.c | 7 +- .../private/src/topic_publication.c | 121 +++++++++++-------- .../private/src/topic_subscription.c | 102 ++++++++++------ .../private/src/pubsub_serializer_impl.c | 9 +- .../private/src/pubsub_topology_manager.c | 11 +- pubsub/test/CMakeLists.txt | 14 +-- pubsub/test/msg_descriptors/msg.descriptor | 3 +- pubsub/test/test/msg.h | 4 +- pubsub/test/test/sut_activator.c | 5 +- pubsub/test/test/tst_activator.cpp | 13 +- 17 files changed, 322 insertions(+), 193 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/deploy/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/deploy/CMakeLists.txt b/pubsub/deploy/CMakeLists.txt index 52fecc9..85b49d2 100644 --- a/pubsub/deploy/CMakeLists.txt +++ b/pubsub/deploy/CMakeLists.txt @@ -24,12 +24,12 @@ add_deploy("pubsub_publisher_udp_mc" BUNDLES shell shell_tui - org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery + org.apache.celix.pubsub_serializer.PubSubSerializerJson + org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery org.apache.celix.pubsub_topology_manager.PubSubTopologyManager org.apache.celix.pubsub_admin.PubSubAdminUdpMc org.apache.celix.pubsub_publisher.PoiPublisher org.apache.celix.pubsub_publisher.PoiPublisher2 - org.apache.celix.pubsub_serializer.PubSubSerializerJson ) add_deploy("pubsub_subscriber_udp_mc" @@ -37,11 +37,11 @@ add_deploy("pubsub_subscriber_udp_mc" BUNDLES shell shell_tui - org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery + org.apache.celix.pubsub_serializer.PubSubSerializerJson + org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery org.apache.celix.pubsub_topology_manager.PubSubTopologyManager org.apache.celix.pubsub_admin.PubSubAdminUdpMc org.apache.celix.pubsub_subscriber.PoiSubscriber - org.apache.celix.pubsub_serializer.PubSubSerializerJson ) add_deploy("pubsub_subscriber2_udp_mc" @@ -49,11 +49,11 @@ add_deploy("pubsub_subscriber2_udp_mc" BUNDLES shell shell_tui - org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery + org.apache.celix.pubsub_serializer.PubSubSerializerJson + org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery org.apache.celix.pubsub_topology_manager.PubSubTopologyManager org.apache.celix.pubsub_admin.PubSubAdminUdpMc org.apache.celix.pubsub_subscriber.PoiSubscriber - org.apache.celix.pubsub_serializer.PubSubSerializerJson ) if (ETCD_CMD AND XTERM_CMD) @@ -81,13 +81,13 @@ if (BUILD_PUBSUB_PSA_ZMQ) BUNDLES shell shell_tui - org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery + org.apache.celix.pubsub_serializer.PubSubSerializerJson + org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery org.apache.celix.pubsub_topology_manager.PubSubTopologyManager org.apache.celix.pubsub_admin.PubSubAdminZmq org.apache.celix.pubsub_admin.PubSubAdminUdpMc org.apache.celix.pubsub_publisher.PoiPublisher org.apache.celix.pubsub_publisher.PoiPublisher2 - org.apache.celix.pubsub_serializer.PubSubSerializerJson PROPERTIES poi1.psa=zmq poi2.psa=udp @@ -98,12 +98,12 @@ if (BUILD_PUBSUB_PSA_ZMQ) BUNDLES shell shell_tui - org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery + org.apache.celix.pubsub_serializer.PubSubSerializerJson + org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery org.apache.celix.pubsub_topology_manager.PubSubTopologyManager org.apache.celix.pubsub_admin.PubSubAdminZmq org.apache.celix.pubsub_admin.PubSubAdminUdpMc org.apache.celix.pubsub_subscriber.PoiSubscriber - org.apache.celix.pubsub_serializer.PubSubSerializerJson PROPERTIES poi1.psa=zmq poi2.psa=udp @@ -115,12 +115,12 @@ if (BUILD_PUBSUB_PSA_ZMQ) BUNDLES shell shell_tui - org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery + org.apache.celix.pubsub_serializer.PubSubSerializerJson + org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery org.apache.celix.pubsub_topology_manager.PubSubTopologyManager org.apache.celix.pubsub_admin.PubSubAdminZmq org.apache.celix.pubsub_publisher.PoiPublisher org.apache.celix.pubsub_subscriber.PoiSubscriber - org.apache.celix.pubsub_serializer.PubSubSerializerJson ) add_deploy("pubsub_publisher_zmq" @@ -128,12 +128,12 @@ if (BUILD_PUBSUB_PSA_ZMQ) BUNDLES shell shell_tui - org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery + org.apache.celix.pubsub_serializer.PubSubSerializerJson + org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery org.apache.celix.pubsub_topology_manager.PubSubTopologyManager org.apache.celix.pubsub_admin.PubSubAdminZmq org.apache.celix.pubsub_publisher.PoiPublisher org.apache.celix.pubsub_publisher.PoiPublisher2 - org.apache.celix.pubsub_serializer.PubSubSerializerJson PROPERTIES pubsub.scope=my_small_scope ) @@ -143,11 +143,11 @@ if (BUILD_PUBSUB_PSA_ZMQ) BUNDLES shell shell_tui + org.apache.celix.pubsub_serializer.PubSubSerializerJson org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery org.apache.celix.pubsub_topology_manager.PubSubTopologyManager org.apache.celix.pubsub_admin.PubSubAdminZmq org.apache.celix.pubsub_subscriber.PoiSubscriber - org.apache.celix.pubsub_serializer.PubSubSerializerJson ) add_deploy("pubsub_subscriber2_zmq" @@ -155,11 +155,12 @@ if (BUILD_PUBSUB_PSA_ZMQ) BUNDLES shell shell_tui + org.apache.celix.pubsub_serializer.PubSubSerializerJson org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery org.apache.celix.pubsub_topology_manager.PubSubTopologyManager org.apache.celix.pubsub_admin.PubSubAdminZmq org.apache.celix.pubsub_subscriber.PoiSubscriber - org.apache.celix.pubsub_serializer.PubSubSerializerJson + ) # ZMQ Multipart @@ -168,23 +169,23 @@ if (BUILD_PUBSUB_PSA_ZMQ) BUNDLES shell shell_tui + org.apache.celix.pubsub_serializer.PubSubSerializerJson org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery org.apache.celix.pubsub_topology_manager.PubSubTopologyManager org.apache.celix.pubsub_admin.PubSubAdminZmq org.apache.celix.pubsub_subscriber.MpSubscriber - org.apache.celix.pubsub_serializer.PubSubSerializerJson ) add_deploy("pubsub_mp_publisher_zmq" GROUP "pubsub" BUNDLES - shell - shell_tui - org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery - org.apache.celix.pubsub_topology_manager.PubSubTopologyManager - org.apache.celix.pubsub_admin.PubSubAdminZmq - org.apache.celix.pubsub_publisher.MpPublisher - org.apache.celix.pubsub_serializer.PubSubSerializerJson + shell + shell_tui + org.apache.celix.pubsub_serializer.PubSubSerializerJson + org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery + org.apache.celix.pubsub_topology_manager.PubSubTopologyManager + org.apache.celix.pubsub_admin.PubSubAdminZmq + org.apache.celix.pubsub_publisher.MpPublisher ) if (ETCD_CMD AND XTERM_CMD) @@ -207,6 +208,7 @@ if (BUILD_PUBSUB_PSA_ZMQ) GROUP pubsub DEPLOYMENTS pubsub_publisher + pubsub_subscriber_zmq pubsub_subscriber2_zmq COMMANDS etcd http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h index 57d942a..b43fb08 100644 --- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h +++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h @@ -42,13 +42,13 @@ typedef struct pubsub_udp_msg { } pubsub_udp_msg_t; typedef struct topic_publication *topic_publication_pt; -celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out); +celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out); celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub); celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); -celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); +celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory); http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c index ebfe3e6..3693970 100644 --- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c +++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c @@ -431,7 +431,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_ if (factory == NULL) { topic_publication_pt pub = NULL; - status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, admin->serializerSvc, admin->mcIpAddress,&pub); + status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, admin->mcIpAddress,&pub); + pubsub_topicPublicationSetSerializer(pub, admin->serializerSvc); if(status == CELIX_SUCCESS){ status = pubsub_topicPublicationStart(admin->bundle_context,pub,&factory); if(status==CELIX_SUCCESS && factory !=NULL){ @@ -655,7 +656,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize while(hashMapIterator_hasNext(lp_iter)){ service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter); topic_publication_pt topic_pub = (topic_publication_pt) factory->handle; - pubsub_topicPublicationAddSerializer(topic_pub, admin->serializerSvc); + pubsub_topicPublicationSetSerializer(topic_pub, admin->serializerSvc); } hashMapIterator_destroy(lp_iter); celixThreadMutex_unlock(&admin->localPublicationsLock); http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c index be0a433..227761b 100644 --- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c +++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c @@ -63,7 +63,7 @@ struct topic_publication { typedef struct publish_bundle_bound_service { topic_publication_pt parent; - pubsub_publisher_pt service; + pubsub_publisher_t pubSvc; bundle_pt bundle; char *scope; char *topic; @@ -97,7 +97,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig static void delay_first_send_for_late_joiners(void); -celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out){ +celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out){ char* ep = malloc(EP_ADDRESS_LEN); memset(ep,0,EP_ADDRESS_LEN); @@ -116,7 +116,7 @@ celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pub->destAddr.sin_family = AF_INET; pub->destAddr.sin_addr.s_addr = inet_addr(bindIP); pub->destAddr.sin_port = htons(port); - pub->serializerSvc = serializer; + pub->serializerSvc = NULL; pubsub_topicPublicationAddPublisherEP(pub,pubEP); @@ -222,30 +222,36 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub return CELIX_SUCCESS; } -celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){ +celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&(pub->tp_lock)); //clear old serializer if (pub->serializerSvc != NULL) { - hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle , value = svc + hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle_pt, publish_bundle_bound_service_t* while (hashMapIterator_hasNext(&iter)) { publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter); + celixThreadMutex_lock(&bound->mp_lock); pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); + celixThreadMutex_unlock(&bound->mp_lock); bound->map = NULL; } } //setup new serializer pub->serializerSvc = serializerSvc; - hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); - bundle_pt bundle = hashMapEntry_getKey(entry); - publish_bundle_bound_service_t* bound = hashMapEntry_getValue(entry); - pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map); - } + if (pub->serializerSvc != NULL) { + hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); + bundle_pt bundle = hashMapEntry_getKey(entry); + publish_bundle_bound_service_t *bound = hashMapEntry_getValue(entry); + celixThreadMutex_lock(&bound->mp_lock); + pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map); + celixThreadMutex_unlock(&bound->mp_lock); + } + } celixThreadMutex_unlock(&(pub->tp_lock)); @@ -260,7 +266,9 @@ celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); while (hashMapIterator_hasNext(&iter)) { publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter); + celixThreadMutex_lock(&bound->mp_lock); pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); + celixThreadMutex_unlock(&bound->mp_lock); bound->map = NULL; } } @@ -294,7 +302,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt } if (bound != NULL) { - *service = bound->service; + *service = &bound->pubSvc; } celixThreadMutex_unlock(&(publish->tp_lock)); @@ -374,13 +382,21 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con if (bound->map == NULL) { printf("TP: Serializer is not set!\n"); + status = 1; } else if (msgSer == NULL ){ printf("TP: No msg serializer available for msg type id %d\n", msgTypeId); + hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers); + printf("Note supported messages:\n"); + while (hashMapIterator_hasNext(&iter)) { + pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter); + printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId); + } + status = 1; } int major=0, minor=0; - if (msgSer != NULL) { + if (status == 0 && msgSer != NULL) { pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); msg_hdr->type = msgTypeId; @@ -407,9 +423,6 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con free(msg); free(serializedOutput); - } else { - printf("TP: Message %u not supported.\n",msgTypeId); - status=-1; } celixThreadMutex_unlock(&(bound->mp_lock)); @@ -418,9 +431,30 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con return status; } -static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){ - *msgTypeId = utils_stringHash(msgType); - return 0; +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* out){ + publish_bundle_bound_service_t* bound = handle; + unsigned int msgTypeId = 0; + + celixThreadMutex_lock(&bound->mp_lock); + if (bound->map != NULL) { + hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers); + while (hashMapIterator_hasNext(&iter)) { + pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter); + if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) { + msgTypeId = msgSer->msgId; + break; + } + } + } + celixThreadMutex_unlock(&bound->mp_lock); + + if (msgTypeId != 0) { + *out = msgTypeId; + return 0; + } else { + printf("TP: Cannot find msg type id for msg type %s\n", msgType); + return 1; + } } @@ -432,15 +466,10 @@ static unsigned int rand_range(unsigned int min, unsigned int max){ } static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle) { - - publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound)); + //PRECOND lock on publish->tp_lock + publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound)); if (bound != NULL) { - bound->service = calloc(1, sizeof(*bound->service)); - } - - if (bound != NULL && bound->service != NULL) { - bound->parent = tp; bound->bundle = bundle; bound->getCount = 1; @@ -452,21 +481,17 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to bound->scope=strdup(pubEP->scope); bound->topic=strdup(pubEP->topic); bound->largeUdpHandle = largeUdp_create(1); - bound->service->handle = bound; - bound->service->localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; - bound->service->send = pubsub_topicPublicationSend; - bound->service->sendMultipart = NULL; //Multipart not supported (jet) for UDP + bound->pubSvc.handle = bound; + bound->pubSvc.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; + bound->pubSvc.send = pubsub_topicPublicationSend; + bound->pubSvc.sendMultipart = NULL; //Multipart not supported (jet) for UDP - //TODO check if lock on tp is needed? (e.g. is lock already done by caller?) - if (tp->serializerSvc != NULL) { + if (tp->serializerSvc != NULL) { tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map); } } else { - if (bound != NULL) { - free(bound->service); - } free(bound); return NULL; } @@ -475,14 +500,9 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to } static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc) { - + //PRECOND lock on publish->tp_lock celixThreadMutex_lock(&boundSvc->mp_lock); - if (boundSvc->service != NULL) { - free(boundSvc->service); - } - - //TODO check if lock on parent is needed, e.g. does the caller already lock? if (boundSvc->map != NULL) { if (boundSvc->parent->serializerSvc == NULL) { printf("TP: Cannot destroy pubsub msg serializer map. No serliazer service\n"); http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c index da23b21..a424112 100644 --- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c +++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c @@ -29,7 +29,6 @@ #include <unistd.h> #include <signal.h> -#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> @@ -57,7 +56,7 @@ #define UDP_BUFFER_SIZE 65535 #define MAX_UDP_SESSIONS 16 -struct topic_subscription{ +struct topic_subscription { char* ifIpAddress; service_tracker_pt tracker; @@ -70,7 +69,8 @@ struct topic_subscription{ //NOTE. using a service ptr can be dangerous, because pointer can be reused. //ensuring that pointer are removed before new (refurbish) pionter comes along is crucial! - hash_map_pt msgSerializersMap; // key = service ptr, value = pubsub_msg_serializer_map_t* + hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t* + hash_map_pt bundleMap; //key = service ptr, value = bundle_pt hash_map_pt socketMap; // key = URL, value = listen-socket unsigned int nrSubscribers; @@ -118,7 +118,8 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl celixThreadMutex_create(&ts->ts_lock,NULL); arrayList_create(&ts->sub_ep_list); - ts->msgSerializersMap = hashMap_create(NULL, NULL, NULL, NULL); + ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL); + ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL); ts->socketMap = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS); @@ -164,7 +165,8 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ serviceTracker_destroy(ts->tracker); arrayList_clear(ts->sub_ep_list); arrayList_destroy(ts->sub_ep_list); - hashMap_destroy(ts->msgSerializersMap,false,false); + hashMap_destroy(ts->msgSerializerMapMap, false, false); + hashMap_destroy(ts->bundleMap, false, false); hashMap_destroy(ts->socketMap,false,false); largeUdp_destroy(ts->largeUdpHandle); @@ -394,13 +396,34 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { return ts->nrSubscribers; } -celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){ +celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); + //clear old + if (ts->serializerSvc != NULL) { + hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); + pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry); + pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); + ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); + hashMap_put(ts->msgSerializerMapMap, subsvc, NULL); + } + } ts->serializerSvc = serializerSvc; - + //init new + if (ts->serializerSvc != NULL) { + hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); + while (hashMapIterator_hasNext(&iter)) { + pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter); + bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc); + pubsub_msg_serializer_map_t* map = NULL; + ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map); + hashMap_put(ts->msgSerializerMapMap, subsvc, map); + } + } celixThreadMutex_unlock(&ts->ts_lock); return status; @@ -411,10 +434,13 @@ celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts celixThreadMutex_lock(&ts->ts_lock); if (ts->serializerSvc == serializerSvc) { //only act if svc removed is services used - hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializersMap); + hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); while (hashMapIterator_hasNext(&iter)) { - pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter); + hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); + pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry); + pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); + hashMap_put(ts->msgSerializerMapMap, subsvc, NULL); } ts->serializerSvc = NULL; } @@ -428,7 +454,7 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc topic_subscription_pt ts = handle; celixThreadMutex_lock(&ts->ts_lock); - if (!hashMap_containsKey(ts->msgSerializersMap, svc)) { + if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) { bundle_pt bundle = NULL; serviceReference_getBundle(reference, &bundle); @@ -436,7 +462,8 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc pubsub_msg_serializer_map_t* map = NULL; ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map); if (map != NULL) { - hashMap_put(ts->msgSerializersMap, svc, map); + hashMap_put(ts->msgSerializerMapMap, svc, map); + hashMap_put(ts->bundleMap, svc, bundle); } } } @@ -452,10 +479,12 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere celixThreadMutex_lock(&ts->ts_lock); - if (hashMap_containsKey(ts->msgSerializersMap, svc)) { - pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializersMap, svc); + if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) { + pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializerMapMap, svc); if (ts->serializerSvc != NULL){ ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); + hashMap_remove(ts->bundleMap, svc); + hashMap_remove(ts->msgSerializerMapMap, svc); } } celixThreadMutex_unlock(&ts->ts_lock); @@ -467,7 +496,7 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){ - hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializersMap); + hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializerMapMap); celixThreadMutex_lock(&sub->ts_lock); while (hashMapIterator_hasNext(&iter)) { hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h index 158dfe7..dbd2ff1 100644 --- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h +++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h @@ -34,13 +34,13 @@ typedef struct topic_publication *topic_publication_pt; -celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out); +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out); celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub); celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); -celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); +celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory); http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h index 1fbbaaf..c1e78c3 100644 --- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h +++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h @@ -51,7 +51,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); -celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc); +celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc); celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc); celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription); http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c index 5c9a5d5..6095d8a 100644 --- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c +++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c @@ -444,7 +444,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint if (factory == NULL) { topic_publication_pt pub = NULL; - status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->serializerSvc, admin->ipAddress, admin->basePort, admin->maxPort, &pub); + status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->ipAddress, admin->basePort, admin->maxPort, &pub); + pubsub_topicPublicationSetSerializer(pub, admin->serializerSvc); if (status == CELIX_SUCCESS) { status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory); if (status == CELIX_SUCCESS && factory != NULL) { @@ -676,7 +677,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize while(hashMapIterator_hasNext(lp_iter)){ service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter); topic_publication_pt topic_pub = (topic_publication_pt) factory->handle; - pubsub_topicPublicationAddSerializer(topic_pub, admin->serializerSvc); + pubsub_topicPublicationSetSerializer(topic_pub, admin->serializerSvc); } hashMapIterator_destroy(lp_iter); celixThreadMutex_unlock(&admin->localPublicationsLock); @@ -686,7 +687,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions); while(hashMapIterator_hasNext(subs_iter)){ topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter); - pubsub_topicSubscriptionAddSerializer(topic_sub, admin->serializerSvc); + pubsub_topicSubscriptionSetSerializer(topic_sub, admin->serializerSvc); } hashMapIterator_destroy(subs_iter); celixThreadMutex_unlock(&admin->subscriptionsLock); http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c index 2e95874..28bf56e 100644 --- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c +++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c @@ -74,7 +74,7 @@ struct topic_publication { typedef struct publish_bundle_bound_service { topic_publication_pt parent; - pubsub_publisher_pt service; + pubsub_publisher_t pubSvc; bundle_pt bundle; char *topic; pubsub_msg_serializer_map_t* map; @@ -104,7 +104,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig static void delay_first_send_for_late_joiners(void); -celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ celix_status_t status = CELIX_SUCCESS; #ifdef BUILD_WITH_ZMQ_SECURITY @@ -209,7 +209,7 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p pub->endpoint = ep; pub->zmq_socket = socket; - pub->serializerSvc = serializer; + pub->serializerSvc = NULL; #ifdef BUILD_WITH_ZMQ_SECURITY if (pubEP->is_secure){ @@ -298,9 +298,7 @@ celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){ celix_status_t status = CELIX_SUCCESS; //celixThreadMutex_lock(&(pub->tp_lock)); - status = serviceRegistration_unregister(pub->svcFactoryReg); - //celixThreadMutex_unlock(&(pub->tp_lock)); return status; @@ -331,7 +329,7 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub return CELIX_SUCCESS; } -celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){ +celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&(pub->tp_lock)); @@ -341,19 +339,25 @@ celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pu hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); while (hashMapIterator_hasNext(&iter)) { publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter); - pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); + celixThreadMutex_lock(&bound->mp_lock); + pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); + celixThreadMutex_unlock(&bound->mp_lock); bound->map = NULL; } } pub->serializerSvc = serializerSvc; - hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); - bundle_pt bundle = hashMapEntry_getKey(entry); - publish_bundle_bound_service_t* boundSvc = hashMapEntry_getValue(entry); - pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &boundSvc->map); - } + if (pub->serializerSvc != NULL) { + hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); + bundle_pt bundle = hashMapEntry_getKey(entry); + publish_bundle_bound_service_t* bound = hashMapEntry_getValue(entry); + celixThreadMutex_lock(&bound->mp_lock); + pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map); + celixThreadMutex_unlock(&bound->mp_lock); + } + } celixThreadMutex_unlock(&(pub->tp_lock)); @@ -367,9 +371,11 @@ celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, if (pub->serializerSvc == svc) { hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); while (hashMapIterator_hasNext(&iter)) { - publish_bundle_bound_service_t *boundSvc = hashMapIterator_nextValue(&iter); - pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, boundSvc->map); - boundSvc->map = NULL; + publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter); + celixThreadMutex_lock(&bound->mp_lock); + pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); + celixThreadMutex_unlock(&bound->mp_lock); + bound->map = NULL; } pub->serializerSvc = NULL; } @@ -402,7 +408,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt } if(bound!=NULL){ - *service = bound->service; + *service = &bound->pubSvc; } celixThreadMutex_unlock(&(publish->tp_lock)); @@ -489,29 +495,41 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){ } static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) { - return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG); - } static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags){ int status = 0; publish_bundle_bound_service_t* bound = handle; - celixThreadMutex_lock(&(bound->mp_lock)); + if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg printf("TP: Multipart send already in progress. Cannot process a new one.\n"); celixThreadMutex_unlock(&(bound->mp_lock)); return -3; - } + } pubsub_msg_serializer_t* msgSer = NULL; if (bound->map != NULL) { msgSer = hashMap_get(bound->map->serializers, (void*)(uintptr_t)msgTypeId); } - int major=0, minor=0; - if (msgSer != NULL) { + if (bound->map == NULL) { + printf("TP: Serializer is not set!\n"); + status = 1; + } else if (msgSer == NULL ){ + printf("TP: No msg serializer available for msg type id %d\n", msgTypeId); + hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers); + printf("Note supported messages:\n"); + while (hashMapIterator_hasNext(&iter)) { + pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter); + printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId); + } + status = 1; + } + + int major=0, minor=0; + if (status == 0 && msgSer != NULL) { pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); msg_hdr->type = msgTypeId; @@ -579,19 +597,38 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy } } else { - printf("TP: Message %u not supported.",msgTypeId); + printf("TP: Message %u not supported.\n",msgTypeId); status=-1; } celixThreadMutex_unlock(&(bound->mp_lock)); - return status; - } -static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){ - *msgTypeId = utils_stringHash(msgType); - return 0; +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* out){ + publish_bundle_bound_service_t* bound = handle; + unsigned int msgTypeId = 0; + + celixThreadMutex_lock(&bound->mp_lock); + if (bound->map != NULL) { + hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers); + while (hashMapIterator_hasNext(&iter)) { + pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter); + if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) { + msgTypeId = msgSer->msgId; + break; + } + } + } + celixThreadMutex_unlock(&bound->mp_lock); + + if (msgTypeId != 0) { + *out = msgTypeId; + return 0; + } else { + printf("TP: Cannot find msg type id for msg type %s\n", msgType); + return 1; + } } static unsigned int rand_range(unsigned int min, unsigned int max){ @@ -602,14 +639,11 @@ static unsigned int rand_range(unsigned int min, unsigned int max){ } static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ + //PRECOND lock on tp->lock publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound)); if (bound != NULL) { - bound->service = calloc(1, sizeof(*bound->service)); - } - - if (bound != NULL && bound->service != NULL) { bound->parent = tp; bound->bundle = bundle; @@ -617,7 +651,6 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to bound->mp_send_in_progress = false; celixThreadMutex_create(&bound->mp_lock,NULL); - //TODO check if lock is needed. e.g. was the caller already locked? if (tp->serializerSvc != NULL) { tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map); } @@ -626,17 +659,13 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); bound->topic=strdup(pubEP->topic); - bound->service->handle = bound; - bound->service->localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; - bound->service->send = pubsub_topicPublicationSend; - bound->service->sendMultipart = pubsub_topicPublicationSendMultipart; - + bound->pubSvc.handle = bound; + bound->pubSvc.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; + bound->pubSvc.send = pubsub_topicPublicationSend; + bound->pubSvc.sendMultipart = pubsub_topicPublicationSendMultipart; } else { - if (bound != NULL) { - free(bound->service); - } free(bound); return NULL; } @@ -645,13 +674,9 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to } static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc){ - + //PRECOND lock on publish->tp_lock celixThreadMutex_lock(&boundSvc->mp_lock); - if (boundSvc->service != NULL) { - free(boundSvc->service); - } - if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) { boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map); boundSvc->map = NULL; http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c index 7ef2c5d..537fbe5 100644 --- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c +++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c @@ -69,7 +69,8 @@ struct topic_subscription { celix_thread_mutex_t ts_lock; bundle_context_pt context; - hash_map_pt msgSerializers; // key = service ptr, value = pubsub_msg_serializer_map_t* + hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t* + hash_map_pt bundleMap; //key = service ptr, value = bundle_pt array_list_pt pendingConnections; array_list_pt pendingDisconnections; @@ -220,7 +221,8 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, celixThreadMutex_create(&ts->socket_lock, NULL); celixThreadMutex_create(&ts->ts_lock,NULL); arrayList_create(&ts->sub_ep_list); - ts->msgSerializers = hashMap_create(NULL, NULL, NULL, NULL); + ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL); + ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL); arrayList_create(&ts->pendingConnections); arrayList_create(&ts->pendingDisconnections); celixThreadMutex_create(&ts->pendingConnections_lock, NULL); @@ -266,7 +268,8 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ serviceTracker_destroy(ts->tracker); arrayList_clear(ts->sub_ep_list); arrayList_destroy(ts->sub_ep_list); - hashMap_destroy(ts->msgSerializers,false,false); + hashMap_destroy(ts->msgSerializerMapMap,false,false); + hashMap_destroy(ts->bundleMap,false,false); celixThreadMutex_lock(&ts->pendingConnections_lock); arrayList_destroy(ts->pendingConnections); @@ -426,13 +429,34 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { return ts->nrSubscribers; } -celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){ +celix_status_t pubsub_topicSubscriptionSetserializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){ celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - - ts->serializerSvc = serializerSvc; - + //clear old + if (ts->serializerSvc != NULL) { + hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); + pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry); + pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); + ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); + hashMap_put(ts->msgSerializerMapMap, subsvc, NULL); + + } + } + ts->serializerSvc = serializerSvc; + //init new + if (ts->serializerSvc != NULL) { + hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); + while (hashMapIterator_hasNext(&iter)) { + pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter); + bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc); + pubsub_msg_serializer_map_t* map = NULL; + ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map); + hashMap_put(ts->msgSerializerMapMap, subsvc, map); + } + } celixThreadMutex_unlock(&ts->ts_lock); return status; @@ -442,53 +466,59 @@ celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - if (ts->serializerSvc == svc) { - hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializers); - while(hashMapIterator_hasNext(&iter)){ - pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter); - ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); - } - } - ts->serializerSvc = NULL; + if (ts->serializerSvc == svc) { //only act if svc removed is services used + hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); + pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry); + pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); + ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); + hashMap_put(ts->msgSerializerMapMap, subsvc, NULL); + } + ts->serializerSvc = NULL; + } celixThreadMutex_unlock(&ts->ts_lock); return status; } -static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){ +static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void* svc) { celix_status_t status = CELIX_SUCCESS; topic_subscription_pt ts = handle; celixThreadMutex_lock(&ts->ts_lock); - if (!hashMap_containsKey(ts->msgSerializers, service)) { - bundle_pt bundle = NULL; - serviceReference_getBundle(reference, &bundle); - - if (ts->serializerSvc != NULL) { - pubsub_msg_serializer_map_t* map = NULL; - ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map); - if (map != NULL) { - hashMap_put(ts->msgSerializers, service, map); - } else { - printf("TS: Cannot create msg serializer map\n"); - } - } - } + if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) { + bundle_pt bundle = NULL; + serviceReference_getBundle(reference, &bundle); + + if (ts->serializerSvc != NULL) { + pubsub_msg_serializer_map_t* map = NULL; + ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map); + if (map != NULL) { + hashMap_put(ts->msgSerializerMapMap, svc, map); + hashMap_put(ts->bundleMap, svc, bundle); + } + } + } celixThreadMutex_unlock(&ts->ts_lock); printf("TS: New subscriber registered.\n"); return status; } -static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){ +static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void* svc) { celix_status_t status = CELIX_SUCCESS; topic_subscription_pt ts = handle; celixThreadMutex_lock(&ts->ts_lock); - if (hashMap_containsKey(ts->msgSerializers, service)) { - pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializers, service); - ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); - } + if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) { + pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializerMapMap, svc); + if (ts->serializerSvc != NULL){ + ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); + hashMap_remove(ts->bundleMap, svc); + hashMap_remove(ts->msgSerializerMapMap, svc); + } + } celixThreadMutex_unlock(&ts->ts_lock); printf("TS: Subscriber unregistered.\n"); @@ -500,7 +530,7 @@ static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) { pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header); - hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializers); + hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializerMapMap); while (hashMapIterator_hasNext(&iter)) { hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry); http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c index 2dd8258..6145a38 100644 --- a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c +++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c @@ -250,14 +250,17 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId); if (clash) { printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId); + free(impl); + dynMessage_destroy(msgType); } else if ( msgName != NULL && msgVersion != NULL && msgId != 0) { hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, &impl->msgSerializer); } else { - printf("Error adding creating msg serializer\n"); + printf("Error creating msg serializer\n"); + free(impl); + dynMessage_destroy(msgType); } - } - else{ + } else{ printf("DMU: cannot parse message from descriptor %s\n.",path); } fclose(stream); http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c index 36ea422..6047cf8 100644 --- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c +++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c @@ -226,10 +226,20 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_ celixThreadMutex_unlock(&manager->publicationsLock); + + celixThreadMutex_lock(&manager->serializerListLock); + unsigned int size = arrayList_size(manager->serializerList); + if (size > 0) { + pubsub_serializer_service_t* ser = arrayList_get(manager->serializerList, (size-1)); //last, same as result of add/remove serializer + new_psa->setSerializer(new_psa->admin, ser); + } + celixThreadMutex_unlock(&manager->serializerListLock); + celixThreadMutex_lock(&manager->psaListLock); arrayList_add(manager->psaList, new_psa); celixThreadMutex_unlock(&manager->psaListLock); + return status; } @@ -335,7 +345,6 @@ celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void* handle, servic logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added pubsub serializer"); int i; - for(i=0; i<arrayList_size(manager->psaList); i++){ pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,i); psa->setSerializer(psa->admin, new_serializer); http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/test/CMakeLists.txt b/pubsub/test/CMakeLists.txt index 7cd0003..3b1655b 100644 --- a/pubsub/test/CMakeLists.txt +++ b/pubsub/test/CMakeLists.txt @@ -38,11 +38,11 @@ bundle_files(pubsub_sut add_deploy(pubsub_udpmc_sut NAME deploy_sut BUNDLES + org.apache.celix.pubsub_serializer.PubSubSerializerJson org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery + #org.apache.celix.pubsub_admin.PubSubAdminUdpMc + org.apache.celix.pubsub_admin.PubSubAdminZmq org.apache.celix.pubsub_topology_manager.PubSubTopologyManager - org.apache.celix.pubsub_admin.PubSubAdminUdpMc - #org.apache.celix.pubsub_admin.PubSubAdminZmq - org.apache.celix.pubsub_serializer.PubSubSerializerJson pubsub_sut DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc ) @@ -60,11 +60,11 @@ bundle_files(pubsub_tst add_deploy(pubsub_udpmc_tst NAME deploy_tst BUNDLES - org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery - org.apache.celix.pubsub_topology_manager.PubSubTopologyManager - org.apache.celix.pubsub_admin.PubSubAdminUdpMc - #org.apache.celix.pubsub_admin.PubSubAdminZmq org.apache.celix.pubsub_serializer.PubSubSerializerJson + org.apache.celix.pubsub_topology_manager.PubSubTopologyManager + org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery + #org.apache.celix.pubsub_admin.PubSubAdminUdpMc + org.apache.celix.pubsub_admin.PubSubAdminZmq pubsub_tst DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc LAUNCHER celix_test_runner http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/msg_descriptors/msg.descriptor ---------------------------------------------------------------------- diff --git a/pubsub/test/msg_descriptors/msg.descriptor b/pubsub/test/msg_descriptors/msg.descriptor index 03b15ba..0eb28cb 100644 --- a/pubsub/test/msg_descriptors/msg.descriptor +++ b/pubsub/test/msg_descriptors/msg.descriptor @@ -3,6 +3,7 @@ type=message name=msg version=1.0.0 :annotations +classname=org.example.Msg :types :message -{n seqnR} +{i seqnR} http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/msg.h ---------------------------------------------------------------------- diff --git a/pubsub/test/test/msg.h b/pubsub/test/test/msg.h index babfd1f..49169c5 100644 --- a/pubsub/test/test/msg.h +++ b/pubsub/test/test/msg.h @@ -20,8 +20,10 @@ #ifndef MSG_H #define MSG_H +#include <stdint.h> + typedef struct msg { - int seqNr; + uint32_t seqNr; } msg_t; #endif //MSG_H http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/sut_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/test/test/sut_activator.c b/pubsub/test/test/sut_activator.c index 3e3b33b..6f02e79 100644 --- a/pubsub/test/test/sut_activator.c +++ b/pubsub/test/test/sut_activator.c @@ -19,6 +19,7 @@ #include <stdio.h> #include <stdlib.h> +#include <constants.h> #include "bundle_activator.h" #include "service_tracker.h" @@ -57,7 +58,9 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) act->reg = NULL; bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, &act->subSvc, props, &act->reg); - const char* filter = "(&(objectClass=pubsub.publisher)(pubsub.topic=pong))"; + char filter[512]; + snprintf(filter, 512, "(&(%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, "pong"); + service_tracker_customizer_pt customizer = NULL; serviceTrackerCustomizer_create(act, NULL, sut_pubAdded, NULL, sut_pubRemoved, &customizer); serviceTracker_createWithFilter(context, filter, customizer, &act->tracker); http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/tst_activator.cpp ---------------------------------------------------------------------- diff --git a/pubsub/test/test/tst_activator.cpp b/pubsub/test/test/tst_activator.cpp index 266f73e..2cf5309 100644 --- a/pubsub/test/test/tst_activator.cpp +++ b/pubsub/test/test/tst_activator.cpp @@ -32,7 +32,7 @@ #include <CppUTest/TestHarness.h> #include <CppUTestExt/MockSupport.h> - +#include <constants.h> static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release); @@ -70,8 +70,10 @@ celix_status_t bundleActivator_start(__attribute__((unused)) void * userData, bu g_act.subSvc.receive = tst_receive; bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, &g_act.subSvc, props, &g_act.reg); - const char* filter = "(&(objectClass=pubsub.publisher)(pubsub.topic=ping))"; - service_tracker_customizer_pt customizer = NULL; + char filter[512]; + snprintf(filter, 512, "(&(%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, "ping"); + + service_tracker_customizer_pt customizer = NULL; serviceTrackerCustomizer_create(&g_act, NULL, tst_pubAdded, NULL, tst_pubRemoved, &customizer); serviceTracker_createWithFilter(context, filter, customizer, &g_act.tracker); serviceTracker_open(g_act.tracker); @@ -157,7 +159,7 @@ TEST_GROUP(PUBSUB_INT_GROUP) if (count > 0) { break; } else { - printf("No return message received, waiting for a while\n"); + printf("No return message received, waiting for a while. %d/%d\n", i+1, TRIES); } } CHECK(count > 0); @@ -176,7 +178,8 @@ TEST(PUBSUB_INT_GROUP, sendRecvTest) { constexpr int COUNT = 50; msg_t msg; for (int i = 0; i < COUNT; ++i) { - msg.seqNr = i; + msg.seqNr = i+1; + printf("Sending test msg %d of %d\n", i+1, COUNT); pthread_mutex_lock(&g_act.mutex); g_act.pubSvc->send(g_act.pubSvc->handle, g_act.msgId, &msg); pthread_mutex_unlock(&g_act.mutex);