NanoMsg
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7c141424 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7c141424 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7c141424 Branch: refs/heads/nanomsg Commit: 7c141424d925afa98a83bc693649dc3500354965 Parents: cdefb0d Author: Erjan Altena <[email protected]> Authored: Tue Nov 27 21:02:18 2018 +0100 Committer: Erjan Altena <[email protected]> Committed: Tue Nov 27 21:02:18 2018 +0100 ---------------------------------------------------------------------- .../src/pubsub_nanomsg_admin.cc | 42 ++-- .../src/pubsub_nanomsg_admin.h | 2 +- .../src/pubsub_nanomsg_topic_sender.cc | 240 ++++++++----------- .../src/pubsub_nanomsg_topic_sender.h | 66 ++++- 4 files changed, 178 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc index cf516ee..42ed632 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc @@ -99,20 +99,20 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() { std::lock_guard<std::mutex> lock(topicSenders.mutex); for (auto kv : topicSenders.map) { auto *sender = kv.second; - pubsub_nanoMsgTopicSender_destroy(sender); + delete (sender); } } { std::lock_guard<std::mutex> lock(topicReceivers.mutex); - for (auto kv: topicReceivers.map) { + for (auto &kv: topicReceivers.map) { delete kv.second; } } { std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex); - for (auto entry : discoveredEndpoints.map) { + for (auto &entry : discoveredEndpoints.map) { auto *ep = entry.second; celix_properties_destroy(ep); } @@ -252,10 +252,10 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper std::lock_guard<std::mutex> senderLock(topicSenders.mutex); for (auto kv: topicSenders.map) { auto *sender = kv.second; - if (sender != nullptr && entry.svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) { + if (sender != nullptr && entry.svcId == sender->getSerializerSvcId()) { char *key = kv.first; topicSenders.map.erase(kv.first); - pubsub_nanoMsgTopicSender_destroy(sender); + delete (sender); free(key); } } @@ -263,7 +263,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper { std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex); - for (auto kv : topicReceivers.map){ + for (auto &kv : topicReceivers.map){ auto *receiver = kv.second; if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) { auto key = kv.first; @@ -322,7 +322,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c celix_properties_t *newEndpoint = nullptr; char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - pubsub_nanomsg_topic_sender_t *sender = nullptr; + pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = nullptr; std::lock_guard<std::mutex> serializerLock(serializers.mutex); std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); sender = topicSenders.map.find(key)->second; @@ -333,7 +333,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c serEntry = &kv->second; } if (serEntry != nullptr) { - sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress, + sender = new pubsub::nanomsg::pubsub_nanomsg_topic_sender(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress, basePort, maxPort); } if (sender != nullptr) { @@ -341,7 +341,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c const char *serType = serEntry->serType; newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, nullptr); - celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, pubsub_nanoMsgTopicSender_url(sender)); + celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, sender->getUrl()); //if available also set container name const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr); if (cn != nullptr) { @@ -378,10 +378,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons auto kv = topicSenders.map.find(key); if (kv != topicSenders.map.end()) { char *mapKey = kv->first; - pubsub_nanomsg_topic_sender_t *sender = kv->second; + pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kv->second; free(mapKey); //TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender? - pubsub_nanoMsgTopicSender_destroy(sender); + delete (sender); } else { L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic); } @@ -495,7 +495,7 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { std::lock_guard<std::mutex> threadLock(topicReceivers.mutex); - for (auto entry: topicReceivers.map) { + for (auto &entry: topicReceivers.map) { pubsub::nanomsg::topic_receiver *receiver = entry.second; connectEndpointToReceiver(receiver, endpoint); } @@ -541,7 +541,7 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); - for (auto entry : topicReceivers.map) { + for (auto &entry : topicReceivers.map) { pubsub::nanomsg::topic_receiver *receiver = entry.second; disconnectEndpointFromReceiver(receiver, endpoint); } @@ -564,13 +564,13 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut std::lock_guard<std::mutex> serializerLock(serializers.mutex); std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); for (auto kvts: topicSenders.map) { - pubsub_nanomsg_topic_sender_t *sender = kvts.second; - long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender); + pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kvts.second; + long serSvcId = sender->getSerializerSvcId(); auto kvs = serializers.map.find(serSvcId); const char* serType = ( kvs == serializers.map.end() ? "!Error" : kvs->second.serType); - const char *scope = pubsub_nanoMsgTopicSender_scope(sender); - const char *topic = pubsub_nanoMsgTopicSender_topic(sender); - const char *url = pubsub_nanoMsgTopicSender_url(sender); + const char *scope = sender->getScope(); + const char *topic = sender->getTopic(); + const char *url = sender->getUrl(); fprintf(out, "|- Topic Sender %s/%s\n", scope, topic); fprintf(out, " |- serializer type = %s\n", serType); fprintf(out, " |- url = %s\n", url); @@ -582,7 +582,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut fprintf(out, "\nTopic Receivers:\n"); std::lock_guard<std::mutex> serializerLock(serializers.mutex); std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); - for (auto entry : topicReceivers.map) { + for (auto &entry : topicReceivers.map) { pubsub::nanomsg::topic_receiver *receiver = entry.second; long serSvcId = receiver->serializerSvcId(); auto kv = serializers.map.find(serSvcId); @@ -596,10 +596,10 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str()); fprintf(out, " |- serializer type = %s\n", serType); - for (auto url : connected) { + for (auto &url : connected) { fprintf(out, " |- connected url = %s\n", url.c_str()); } - for (auto url : unconnected) { + for (auto &url : unconnected) { fprintf(out, " |- unconnected url = %s\n", url.c_str()); } } http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h index 689ae20..7c2e9a0 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h @@ -119,7 +119,7 @@ private: pubsub_serializer_service_t *svc; } psa_nanomsg_serializer_entry_t; ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{}; - ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{}; + ProtectedMap<char*, pubsub::nanomsg::pubsub_nanomsg_topic_sender*> topicSenders{}; ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{}; ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{}; }; http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc index d5ed28f..1c75e71 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc @@ -28,11 +28,9 @@ #include <nanomsg/bus.h> -#include <pubsub_serializer.h> #include <pubsub_constants.h> #include <pubsub/publisher.h> #include <pubsub_common.h> -#include <log_helper.h> #include "pubsub_nanomsg_topic_sender.h" #include "pubsub_psa_nanomsg_constants.h" #include "pubsub_nanomsg_common.h" @@ -41,69 +39,47 @@ #define NANOMSG_BIND_MAX_RETRY 10 #define L_DEBUG(...) \ - logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) + logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) #define L_INFO(...) \ - logHelper_log(sender->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__) + logHelper_log(logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__) #define L_WARN(...) \ - logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__) + logHelper_log(logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__) #define L_ERROR(...) \ - logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) - -struct pubsub_nanomsg_topic_sender { - celix_bundle_context_t *ctx; - log_helper_t *logHelper; - long serializerSvcId; - pubsub_serializer_service_t *serializer; - - char *scope; - char *topic; - char scopeAndTopicFilter[5]; - char *url; - - struct { - celix_thread_mutex_t mutex; - int socket; - } nanomsg; - - struct { - long svcId; - celix_service_factory_t factory; - } publisher; - - struct { - celix_thread_mutex_t mutex; - hash_map_t *map; //key = bndId, value = psa_nanomsg_bounded_service_entry_t - } boundedServices; -}; + logHelper_log(logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) + typedef struct psa_nanomsg_bounded_service_entry { - pubsub_nanomsg_topic_sender_t *parent; + pubsub::nanomsg::pubsub_nanomsg_topic_sender *parent; pubsub_publisher_t service; long bndId; hash_map_t *msgTypes; int getCount; } psa_nanomsg_bounded_service_entry_t; -static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, - const celix_properties_t *svcProperties); -static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, - const celix_properties_t *svcProperties); +//static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, +// const celix_properties_t *svcProperties); +//static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, +// const celix_properties_t *svcProperties); static unsigned int rand_range(unsigned int min, unsigned int max); -static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender); +static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender); static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg); -pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper, - const char *scope, const char *topic, - long serializerSvcId, pubsub_serializer_service_t *ser, - const char *bindIP, unsigned int basePort, - unsigned int maxPort) { - pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(calloc(1, sizeof(*sender))); - sender->ctx = ctx; - sender->logHelper = logHelper; - sender->serializerSvcId = serializerSvcId; - sender->serializer = ser; - psa_nanomsg_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter); +pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, + log_helper_t *_logHelper, + const char *_scope, + const char *_topic, + long _serializerSvcId, + pubsub_serializer_service_t *_ser, + const char *_bindIp, + unsigned int _basePort, + unsigned int _maxPort) : + ctx{_ctx}, + logHelper{_logHelper}, + serializerSvcId {_serializerSvcId}, + serializer{_ser}{ + + psa_nanomsg_setScopeAndTopicFilter(_scope, _topic, scopeAndTopicFilter); //setting up nanomsg socket for nanomsg TopicSender { @@ -116,10 +92,10 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con int rv = -1, retry=0; while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) { /* Randomized part due to same bundle publishing on different topics */ - unsigned int port = rand_range(basePort,maxPort); - size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", bindIP, port) + 1; - char *url = static_cast<char*>(calloc(len, sizeof(char*))); - snprintf(url, len, "tcp://%s:%u", bindIP, port); + unsigned int port = rand_range(_basePort,_maxPort); + size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", _bindIp, port) + 1; + char *_url = static_cast<char*>(calloc(len, sizeof(char*))); + snprintf(_url, len, "tcp://%s:%u", _bindIp, port); len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1; char *bindUrl = static_cast<char*>(calloc(len, sizeof(char))); @@ -127,165 +103,155 @@ pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_con rv = nn_bind (socket, bindUrl); if (rv == -1) { perror("Error for nn_bind"); - free(url); + free(_url); } else { - sender->url = url; - sender->nanomsg.socket = socket; + this->url = _url; + nanomsg.socket = socket; } retry++; free(bindUrl); } } - if (sender->url != NULL) { - sender->scope = strndup(scope, 1024 * 1024); - sender->topic = strndup(topic, 1024 * 1024); + if (url != NULL) { + scope = strndup(_scope, 1024 * 1024); + topic = strndup(_topic, 1024 * 1024); - celixThreadMutex_create(&sender->boundedServices.mutex, NULL); - celixThreadMutex_create(&sender->nanomsg.mutex, NULL); - sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL); + celixThreadMutex_create(&boundedServices.mutex, NULL); + celixThreadMutex_create(&nanomsg.mutex, NULL); + boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL); } //register publisher services using a service factory - if (sender->url != NULL) { - sender->publisher.factory.handle = sender; - sender->publisher.factory.getService = psa_nanomsg_getPublisherService; - sender->publisher.factory.ungetService = psa_nanomsg_ungetPublisherService; + if (url != NULL) { + publisher.factory.handle = this; + publisher.factory.getService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) { + return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService( + requestingBundle, + svcProperties); + }; + publisher.factory.ungetService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) { + return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->ungetPublisherService( + requestingBundle, + svcProperties); + }; celix_properties_t *props = celix_properties_create(); - celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic); - celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope); + celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic); + celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope); celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; - opts.factory = &sender->publisher.factory; + opts.factory = &publisher.factory; opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME; opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION; opts.properties = props; - sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts); - } - - if (sender->url == NULL) { - free(sender); - sender = NULL; + publisher.svcId = celix_bundleContext_registerServiceWithOptions(_ctx, &opts); } - return sender; } -void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender) { - if (sender != NULL) { - celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId); +pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() { + celix_bundleContext_unregisterService(ctx, publisher.svcId); - nn_close(sender->nanomsg.socket); + nn_close(nanomsg.socket); - celixThreadMutex_lock(&sender->boundedServices.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map); - while (hashMapIterator_hasNext(&iter)) { - psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter)); - if (entry != NULL) { - sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes); - free(entry); - } + celixThreadMutex_lock(&boundedServices.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(boundedServices.map); + while (hashMapIterator_hasNext(&iter)) { + psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter)); + if (entry != NULL) { + serializer->destroySerializerMap(serializer->handle, entry->msgTypes); + free(entry); } - hashMap_destroy(sender->boundedServices.map, false, false); - celixThreadMutex_unlock(&sender->boundedServices.mutex); - - celixThreadMutex_destroy(&sender->boundedServices.mutex); - celixThreadMutex_destroy(&sender->nanomsg.mutex); - - free(sender->scope); - free(sender->topic); - free(sender->url); - free(sender); } -} + hashMap_destroy(boundedServices.map, false, false); + celixThreadMutex_unlock(&boundedServices.mutex); -long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender) { - return sender->serializerSvcId; -} + celixThreadMutex_destroy(&boundedServices.mutex); + celixThreadMutex_destroy(&nanomsg.mutex); -const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender) { - return sender->scope; + free(scope); + free(topic); + free(url); } -const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender) { - return sender->topic; +long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const { + return serializerSvcId; } -const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender) { - return sender->url; +const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const { + return scope; } -void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) { - //TODO subscriber count -> topic info +const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const { + return topic; } -void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *, const celix_properties_t *) { - //TODO +const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const { + return url; } -static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, + +void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) { - pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle); long bndId = celix_bundle_getId(requestingBundle); - celixThreadMutex_lock(&sender->boundedServices.mutex); - psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId)); + celixThreadMutex_lock(&boundedServices.mutex); + psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId)); if (entry != NULL) { entry->getCount += 1; } else { entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(calloc(1, sizeof(*entry))); entry->getCount = 1; - entry->parent = sender; + entry->parent = this; entry->bndId = bndId; - int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes); + int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes); if (rc == 0) { entry->service.handle = entry; entry->service.localMsgTypeIdForMsgType = psa_nanoMsg_localMsgTypeIdForMsgType; entry->service.send = psa_nanomsg_topicPublicationSend; entry->service.sendMultipart = NULL; //not supported TODO remove - hashMap_put(sender->boundedServices.map, (void*)bndId, entry); + hashMap_put(boundedServices.map, (void*)bndId, entry); } else { - L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", sender->scope, sender->topic); + L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", scope, topic); } } - celixThreadMutex_unlock(&sender->boundedServices.mutex); + celixThreadMutex_unlock(&boundedServices.mutex); return &entry->service; } -static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, +void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) { - pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(handle); long bndId = celix_bundle_getId(requestingBundle); - celixThreadMutex_lock(&sender->boundedServices.mutex); - psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(sender->boundedServices.map, (void*)bndId)); + celixThreadMutex_lock(&boundedServices.mutex); + psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId)); if (entry != NULL) { entry->getCount -= 1; } if (entry != NULL && entry->getCount == 0) { //free entry - hashMap_remove(sender->boundedServices.map, (void*)bndId); - int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes); + hashMap_remove(boundedServices.map, (void*)bndId); + int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes); if (rc != 0) { L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n"); } free(entry); } - celixThreadMutex_unlock(&sender->boundedServices.mutex); + celixThreadMutex_unlock(&boundedServices.mutex); } static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg) { int status = CELIX_SUCCESS; psa_nanomsg_bounded_service_entry_t *bound = static_cast<psa_nanomsg_bounded_service_entry_t*>(handle); - pubsub_nanomsg_topic_sender_t *sender = bound->parent; + pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = bound->parent; pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId)); @@ -329,27 +295,27 @@ static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId int rc = nn_sendmsg(sender->nanomsg.socket, &msg, 0 ); free(serializedOutput); if (rc < 0) { - L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno)); + //TODO L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno)); } else { - L_INFO("[PSA_ZMQ_TS] Send message with size %d\n", rc); - L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n", msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor); + //TODO L_INFO("[PSA_ZMQ_TS] Send message with size %d\n", rc); + //TODO L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n", msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor); } } else { - L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic); + //TODO L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic); } } else { status = CELIX_SERVICE_EXCEPTION; - L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic); + //TODO L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic); } return status; } -static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender) { +static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender */*sender*/) { static bool firstSend = true; if(firstSend){ - L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); + //TODO L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); sleep(FIRST_SEND_DELAY_IN_SECONDS); firstSend = false; } http://git-wip-us.apache.org/repos/asf/celix/blob/7c141424/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h index ec85c37..90ab6ce 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h @@ -20,23 +20,63 @@ #define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H #include "celix_bundle_context.h" +#include <log_helper.h> +#include <pubsub_serializer.h> -typedef struct pubsub_nanomsg_topic_sender pubsub_nanomsg_topic_sender_t; +namespace pubsub { + namespace nanomsg { + class pubsub_nanomsg_topic_sender { + public: + pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, log_helper_t *_logHelper, const char *_scope, + const char *_topic, long _serializerSvcId, pubsub_serializer_service_t *_ser, + const char *_bindIp, unsigned int _basePort, unsigned int _maxPort); -pubsub_nanomsg_topic_sender_t* pubsub_nanoMsgTopicSender_create(celix_bundle_context_t *ctx, log_helper_t *logHelper, - const char *scope, const char *topic, - long serializerSvcId, pubsub_serializer_service_t *ser, - const char *bindIP, unsigned int basePort, - unsigned int maxPort); -void pubsub_nanoMsgTopicSender_destroy(pubsub_nanomsg_topic_sender_t *sender); + ~pubsub_nanomsg_topic_sender(); -const char* pubsub_nanoMsgTopicSender_scope(pubsub_nanomsg_topic_sender_t *sender); -const char* pubsub_nanoMsgTopicSender_topic(pubsub_nanomsg_topic_sender_t *sender); -const char* pubsub_nanoMsgTopicSender_url(pubsub_nanomsg_topic_sender_t *sender); + pubsub_nanomsg_topic_sender(const pubsub_nanomsg_topic_sender &) = delete; -long pubsub_nanoMsgTopicSender_serializerSvcId(pubsub_nanomsg_topic_sender_t *sender); + const pubsub_nanomsg_topic_sender &operator=(const pubsub_nanomsg_topic_sender &) = delete; -void pubsub_nanoMsgTopicSender_connectTo(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint); -void pubsub_nanoMsgTopicSender_disconnectFrom(pubsub_nanomsg_topic_sender_t *sender, const celix_properties_t *endpoint); + long getSerializerSvcId() const ; + const char *getScope() const ; + const char *getTopic() const ; + const char *getUrl() const; + + void* getPublisherService(const celix_bundle_t *requestingBundle, + const celix_properties_t *svcProperties __attribute__((unused))); + void ungetPublisherService(const celix_bundle_t *requestingBundle, + const celix_properties_t *svcProperties __attribute__((unused))); + int topicPublicationSend(unsigned int msgTypeId, const void *inMsg); + void delay_first_send_for_late_joiners() ; + + + //private: + celix_bundle_context_t *ctx; + log_helper_t *logHelper; + long serializerSvcId; + pubsub_serializer_service_t *serializer; + + char *scope{}; + char *topic{}; + char scopeAndTopicFilter[5]; + char *url{}; + + struct { + celix_thread_mutex_t mutex; + int socket; + } nanomsg{}; + + struct { + long svcId; + celix_service_factory_t factory; + } publisher{}; + + struct { + celix_thread_mutex_t mutex{}; + hash_map_t *map{}; //key = bndId, value = psa_nanomsg_bounded_service_entry_t + } boundedServices{}; + }; + } +} #endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
