Repository: celix
Updated Branches:
  refs/heads/nanomsg 7c141424d -> 707b8e54a


Nanomsg


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/15f268d1
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/15f268d1
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/15f268d1

Branch: refs/heads/nanomsg
Commit: 15f268d131370e55dc2dd4c5b6c247baa522fc58
Parents: 7c14142
Author: Erjan Altena <[email protected]>
Authored: Wed Nov 28 21:07:00 2018 +0100
Committer: Erjan Altena <[email protected]>
Committed: Wed Nov 28 21:37:36 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_admin.cc                 |  12 +-
 .../src/pubsub_nanomsg_common.cc                |  15 +-
 .../src/pubsub_nanomsg_common.h                 |   2 +-
 .../src/pubsub_nanomsg_topic_receiver.cc        |   5 +-
 .../src/pubsub_nanomsg_topic_receiver.h         |   2 +-
 .../src/pubsub_nanomsg_topic_sender.cc          | 211 +++++++------------
 .../src/pubsub_nanomsg_topic_sender.h           |  54 +++--
 7 files changed, 138 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 42ed632..9fdf2a3 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -341,7 +341,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const 
char *scope, const c
             const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, 
PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType,
                                                 nullptr);
-            celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, 
sender->getUrl());
+            celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, 
sender->getUrl().c_str());
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(ctx, 
"CELIX_CONTAINER_NAME", nullptr);
             if (cn != nullptr) {
@@ -568,12 +568,12 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char 
*commandLine __attribut
             long serSvcId = sender->getSerializerSvcId();
             auto kvs = serializers.map.find(serSvcId);
             const char* serType = ( kvs == serializers.map.end() ? "!Error" :  
kvs->second.serType);
-            const char *scope = sender->getScope();
-            const char *topic = sender->getTopic();
-            const char *url = sender->getUrl();
-            fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
+            const auto scope = sender->getScope();
+            const auto topic = sender->getTopic();
+            const auto url = sender->getUrl();
+            fprintf(out, "|- Topic Sender %s/%s\n", scope.c_str(), 
topic.c_str());
             fprintf(out, "   |- serializer type = %s\n", serType);
-            fprintf(out, "   |- url             = %s\n", url);
+            fprintf(out, "   |- url             = %s\n", url.c_str());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
index 3ecd19c..333cb2d 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc
@@ -41,16 +41,15 @@ bool psa_nanomsg_checkVersion(version_pt msgVersion, const 
pubsub_nanmosg_msg_he
     return check;
 }
 
-void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const 
std::string &topic, char *filter) {
-    for (int i = 0; i < 5; ++i) { // 5 ??
-        filter[i] = '\0';
-    }
+std::string psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const 
std::string &topic) {
+    std::string result("");
     if (scope.size() >= 2)  { //3 ??
-        filter[0] = scope[0];
-        filter[1] = scope[1];
+        result += scope[0];
+        result += scope[1];
     }
     if (topic.size() >= 2)  { //3 ??
-        filter[2] = topic[0];
-        filter[3] = topic[1];
+        result += topic[0];
+        result += topic[1];
     }
+    return result;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
index 276169f..2c03d4c 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h
@@ -49,7 +49,7 @@ typedef struct pubsub_nanomsg_msg_header 
pubsub_nanmosg_msg_header_t;
 
 
 int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, 
unsigned int *msgTypeId);
-void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const 
std::string &topic, char *filter);
+std::string psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const 
std::string &topic);
 
 bool psa_nanomsg_checkVersion(version_pt msgVersion, const 
pubsub_nanmosg_msg_header_t *hdr);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 9f77a4c..30c2af7 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -72,7 +72,7 @@ 
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
     ctx = _ctx;
     logHelper = _logHelper;
     serializer = _serializer;
-    psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, 
m_scopeAndTopicFilter);
+    m_scopeAndTopicFilter = psa_nanomsg_setScopeAndTopicFilter(m_scope, 
m_topic);
 
     m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
     if (m_nanoMsgSocket < 0) {
@@ -86,8 +86,7 @@ 
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
             std::bad_alloc{};
         }
 
-        char subscriberFilter[5]; // 5 ??
-        psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscriberFilter);
+        auto subscriberFilter = psa_nanomsg_setScopeAndTopicFilter(m_scope, 
m_topic);
 
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, 
m_topic.c_str());
         char buf[size + 1];

http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 2519e4a..02c462e 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -100,7 +100,7 @@ namespace pubsub {
             pubsub_serializer_service_t *serializer{nullptr};
             const std::string m_scope{};
             const std::string m_topic{};
-            char m_scopeAndTopicFilter[5];
+            std::string m_scopeAndTopicFilter{};
 
             int m_nanoMsgSocket{0};
 

http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index 1c75e71..9a253b2 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -18,6 +18,7 @@
  */
 
 #include <memory.h>
+#include <iostream>
 
 #include <stdlib.h>
 #include <utils.h>
@@ -29,7 +30,6 @@
 
 
 #include <pubsub_constants.h>
-#include <pubsub/publisher.h>
 #include <pubsub_common.h>
 #include "pubsub_nanomsg_topic_sender.h"
 #include "pubsub_psa_nanomsg_constants.h"
@@ -48,23 +48,9 @@
     logHelper_log(logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 
 
-typedef struct psa_nanomsg_bounded_service_entry {
-    pubsub::nanomsg::pubsub_nanomsg_topic_sender *parent;
-    pubsub_publisher_t service;
-    long bndId;
-    hash_map_t *msgTypes;
-    int getCount;
-} psa_nanomsg_bounded_service_entry_t;
-
-//static void* psa_nanomsg_getPublisherService(void *handle, const 
celix_bundle_t *requestingBundle,
-//                                             const celix_properties_t 
*svcProperties);
-//static void psa_nanomsg_ungetPublisherService(void *handle, const 
celix_bundle_t *requestingBundle,
-//                                              const celix_properties_t 
*svcProperties);
 static unsigned int rand_range(unsigned int min, unsigned int max);
 static void 
delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender 
*sender);
 
-static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int 
msgTypeId, const void *msg);
-
 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t
 *_ctx,
                                                          log_helper_t 
*_logHelper,
                                                          const char *_scope,
@@ -77,53 +63,44 @@ 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_
         ctx{_ctx},
         logHelper{_logHelper},
         serializerSvcId {_serializerSvcId},
-        serializer{_ser}{
+        serializer{_ser},
+        scope{_scope},
+        topic{_topic}{
 
-    psa_nanomsg_setScopeAndTopicFilter(_scope, _topic, scopeAndTopicFilter);
+    scopeAndTopicFilter = psa_nanomsg_setScopeAndTopicFilter(_scope, _topic);
 
     //setting up nanomsg socket for nanomsg TopicSender
-    {
-
-        int socket = nn_socket(AF_SP, NN_BUS);
-        if (socket == -1) {
-            perror("Error for nanomsg_socket");
-        }
+    int socket = nn_socket(AF_SP, NN_BUS);
+    if (socket == -1) {
+        perror("Error for nanomsg_socket");
+    }
 
-        int rv = -1, retry=0;
-        while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
-            /* Randomized part due to same bundle publishing on different 
topics */
-            unsigned int port = rand_range(_basePort,_maxPort);
-            size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", _bindIp, 
port) + 1;
-            char *_url = static_cast<char*>(calloc(len, sizeof(char*)));
-            snprintf(_url, len, "tcp://%s:%u", _bindIp, port);
-
-            len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1;
-            char *bindUrl = static_cast<char*>(calloc(len, sizeof(char)));
-            snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
-            rv = nn_bind (socket, bindUrl);
-            if (rv == -1) {
-                perror("Error for nn_bind");
-                free(_url);
-            } else {
-                this->url = _url;
-                nanomsg.socket = socket;
-            }
-            retry++;
-            free(bindUrl);
+    int rv = -1, retry=0;
+    while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
+        /* Randomized part due to same bundle publishing on different topics */
+        unsigned int port = rand_range(_basePort,_maxPort);
+        size_t len = (size_t)snprintf(nullptr, 0, "tcp://%s:%u", _bindIp, 
port) + 1;
+        char *_url = static_cast<char*>(calloc(len, sizeof(char*)));
+        snprintf(_url, len, "tcp://%s:%u", _bindIp, port);
+
+        len = (size_t)snprintf(nullptr, 0, "tcp://0.0.0.0:%u", port) + 1;
+        char *bindUrl = static_cast<char*>(calloc(len, sizeof(char)));
+        snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
+        rv = nn_bind (socket, bindUrl);
+        if (rv == -1) {
+            perror("Error for nn_bind");
+            free(_url);
+        } else {
+            this->url = _url;
+            nanomsg.socket = socket;
         }
+        retry++;
+        free(bindUrl);
     }
 
-    if (url != NULL) {
-        scope = strndup(_scope, 1024 * 1024);
-        topic = strndup(_topic, 1024 * 1024);
-
-        celixThreadMutex_create(&boundedServices.mutex, NULL);
-        celixThreadMutex_create(&nanomsg.mutex, NULL);
-        boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
-    }
+    if (!url.empty()) {
 
-    //register publisher services using a service factory
-    if (url != NULL) {
+        //register publisher services using a service factory
         publisher.factory.handle = this;
         publisher.factory.getService = [](void *handle, const celix_bundle_t 
*requestingBundle, const celix_properties_t *svcProperties) {
             return 
static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService(
@@ -137,8 +114,8 @@ 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_
         };
 
         celix_properties_t *props = celix_properties_create();
-        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic);
-        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope);
+        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic.c_str());
+        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope.c_str());
 
         celix_service_registration_options_t opts = 
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
         opts.factory = &publisher.factory;
@@ -156,39 +133,27 @@ 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() {
 
     nn_close(nanomsg.socket);
 
-    celixThreadMutex_lock(&boundedServices.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(boundedServices.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_nanomsg_bounded_service_entry_t *entry = 
static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter));
-        if (entry != NULL) {
-            serializer->destroySerializerMap(serializer->handle, 
entry->msgTypes);
-            free(entry);
-        }
+    std::lock_guard<std::mutex> lock(boundedServices.mutex);
+    for  (auto &it: boundedServices.map) {
+            serializer->destroySerializerMap(serializer->handle, 
it.second.msgTypes);
     }
-    hashMap_destroy(boundedServices.map, false, false);
-    celixThreadMutex_unlock(&boundedServices.mutex);
-
-    celixThreadMutex_destroy(&boundedServices.mutex);
-    celixThreadMutex_destroy(&nanomsg.mutex);
+    boundedServices.map.clear();
 
-    free(scope);
-    free(topic);
-    free(url);
 }
 
 long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const {
     return serializerSvcId;
 }
 
-const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const {
+const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() 
const {
     return scope;
 }
 
-const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const {
+const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() 
const {
     return topic;
 }
 
-const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const  {
+const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() 
const  {
     return url;
 }
 
@@ -196,64 +161,57 @@ const char* 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const  {
 void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const 
celix_bundle_t *requestingBundle,
                                              const celix_properties_t 
*svcProperties __attribute__((unused))) {
     long bndId = celix_bundle_getId(requestingBundle);
-
-    celixThreadMutex_lock(&boundedServices.mutex);
-    psa_nanomsg_bounded_service_entry_t *entry = 
static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map,
 (void*)bndId));
-    if (entry != NULL) {
-        entry->getCount += 1;
+    void *service{nullptr};
+    std::lock_guard<std::mutex> lock(boundedServices.mutex);
+    auto existingEntry = boundedServices.map.find(bndId);
+    if (existingEntry != boundedServices.map.end()) {
+        existingEntry->second.getCount += 1;
+        service = &existingEntry->second.service;
     } else {
-        entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(calloc(1, 
sizeof(*entry)));
-        entry->getCount = 1;
-        entry->parent = this;
-        entry->bndId = bndId;
+        auto entry = boundedServices.map.emplace(std::piecewise_construct,
+                                    std::forward_as_tuple(bndId),
+                                    std::forward_as_tuple(this, bndId, 
logHelper));
+        int rc = serializer->createSerializerMap(serializer->handle, 
(celix_bundle_t*)requestingBundle, &entry.first->second.msgTypes);
 
-        int rc = serializer->createSerializerMap(serializer->handle, 
(celix_bundle_t*)requestingBundle, &entry->msgTypes);
         if (rc == 0) {
-            entry->service.handle = entry;
-            entry->service.localMsgTypeIdForMsgType = 
psa_nanoMsg_localMsgTypeIdForMsgType;
-            entry->service.send = psa_nanomsg_topicPublicationSend;
-            entry->service.sendMultipart = NULL; //not supported TODO remove
-            hashMap_put(boundedServices.map, (void*)bndId, entry);
+            entry.first->second.service.handle = &entry.first->second;
+            entry.first->second.service.localMsgTypeIdForMsgType = 
psa_nanoMsg_localMsgTypeIdForMsgType;
+            entry.first->second.service.send = [](void *handle, unsigned int 
msgTypeId, const void *msg) {
+                return 
static_cast<pubsub::nanomsg::bounded_service_entry*>(handle)->topicPublicationSend(msgTypeId,
 msg);
+            };
+            entry.first->second.service.sendMultipart = NULL; //not supported 
TODO remove
+            service = &entry.first->second.service;
         } else {
+            boundedServices.map.erase(bndId);
             L_ERROR("Error creating serializer map for NanoMsg TopicSender 
%s/%s", scope, topic);
         }
-
-
-
     }
-    celixThreadMutex_unlock(&boundedServices.mutex);
 
-    return &entry->service;
+    return service;
 }
 
 void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const 
celix_bundle_t *requestingBundle,
-                                              const celix_properties_t 
*svcProperties __attribute__((unused))) {
+                                                                         const 
celix_properties_t */*svcProperties*/) {
     long bndId = celix_bundle_getId(requestingBundle);
 
-    celixThreadMutex_lock(&boundedServices.mutex);
-    psa_nanomsg_bounded_service_entry_t *entry = 
static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map,
 (void*)bndId));
-    if (entry != NULL) {
-        entry->getCount -= 1;
-    }
-    if (entry != NULL && entry->getCount == 0) {
-        //free entry
-        hashMap_remove(boundedServices.map, (void*)bndId);
-        int rc = serializer->destroySerializerMap(serializer->handle, 
entry->msgTypes);
-        if (rc != 0) {
-            L_ERROR("Error destroying publisher service, serializer not 
available / cannot get msg serializer map\n");
+    std::lock_guard<std::mutex> lock(boundedServices.mutex);
+    auto entry = boundedServices.map.find(bndId);
+    if (entry != boundedServices.map.end()) {
+        entry->second.getCount -= 1;
+        if (entry->second.getCount == 0) {
+            int rc = serializer->destroySerializerMap(serializer->handle, 
entry->second.msgTypes);
+            if (rc != 0) {
+                L_ERROR("Error destroying publisher service, serializer not 
available / cannot get msg serializer map\n");
+            }
+            boundedServices.map.erase(bndId);
         }
-
-        free(entry);
     }
-    celixThreadMutex_unlock(&boundedServices.mutex);
 }
 
-static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int 
msgTypeId, const void *inMsg) {
-    int status = CELIX_SUCCESS;
-    psa_nanomsg_bounded_service_entry_t *bound = 
static_cast<psa_nanomsg_bounded_service_entry_t*>(handle);
-    pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = bound->parent;
-
-    pubsub_msg_serializer_t* msgSer = 
static_cast<pubsub_msg_serializer_t*>(hashMap_get(bound->msgTypes, 
(void*)(uintptr_t)msgTypeId));
+int pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int 
msgTypeId, const void *inMsg) {
+    int status;
+    pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = parent;
+    pubsub_msg_serializer_t* msgSer = 
static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes, 
(void*)(uintptr_t)msgTypeId));
 
     if (msgSer != NULL) {
         delay_first_send_for_late_joiners(sender);
@@ -285,37 +243,32 @@ static int psa_nanomsg_topicPublicationSend(void *handle, 
unsigned int msgTypeId
             msg.msg_iov[1].iov_len = serializedOutputLen;
             msg.msg_control = nullptr;
             msg.msg_controllen = 0;
-            //zmsg_t *msg = zmsg_new();
-            //TODO revert to use zmq_msg_init_data (or something like that) 
for zero copy for the payload
-            //TODO remove socket mutex .. not needed (initialized during 
creation)
-            //zmsg_addstr(msg, sender->scopeAndTopicFilter);
-            //zmsg_addmem(msg, &msg_hdr, sizeof(msg_hdr));
-            //zmsg_addmem(msg, serializedOutput, );
             errno = 0;
             int rc = nn_sendmsg(sender->nanomsg.socket, &msg, 0 );
             free(serializedOutput);
             if (rc < 0) {
-                //TODO L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", 
rc, strerror(errno));
+                L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, 
strerror(errno));
             } else {
-                //TODO L_INFO("[PSA_ZMQ_TS] Send message with size %d\n",  rc);
-                //TODO L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, 
minor %d\n",  msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
+                L_INFO("[PSA_ZMQ_TS] Send message with size %d\n",  rc);
+                L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor 
%d\n",  msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
             }
         } else {
-            //TODO L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for 
scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
+            L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for 
scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
         }
     } else {
         status = CELIX_SERVICE_EXCEPTION;
-        //TODO L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg 
type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
+        L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id 
%i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
     }
     return status;
 }
 
-static void 
delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender 
*/*sender*/) {
+static void 
delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender 
*sender) {
 
     static bool firstSend = true;
 
     if(firstSend){
-        //TODO L_INFO("PSA_UDP_MC_TP: Delaying first send for late 
joiners...\n");
+        auto logHelper = sender->logHelper;
+        L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
         sleep(FIRST_SEND_DELAY_IN_SECONDS);
         firstSend = false;
     }

http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
index 90ab6ce..1883a41 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
@@ -18,13 +18,37 @@
  */
 #ifndef CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
 #define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
-
+#include <mutex>
+#include <map>
 #include "celix_bundle_context.h"
 #include <log_helper.h>
 #include <pubsub_serializer.h>
+#include <pubsub/publisher.h>
 
 namespace pubsub {
     namespace nanomsg {
+        class pubsub_nanomsg_topic_sender;
+
+        class bounded_service_entry {
+        public:
+            bounded_service_entry(
+                    pubsub::nanomsg::pubsub_nanomsg_topic_sender *_parent,
+                    long _bndId,
+                    log_helper_t* _logHelper) : parent{_parent}, 
bndId{_bndId}, logHelper{_logHelper} {
+
+            }
+
+            int topicPublicationSend(unsigned int msgTypeId, const void 
*inMsg);
+
+            pubsub::nanomsg::pubsub_nanomsg_topic_sender *parent{};
+            pubsub_publisher_t service{};
+            long bndId{};
+            hash_map_t *msgTypes{};
+            int getCount{1};
+            log_helper_t *logHelper{};
+        } ;
+
+
         class pubsub_nanomsg_topic_sender {
         public:
             pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, 
log_helper_t *_logHelper, const char *_scope,
@@ -38,31 +62,30 @@ namespace pubsub {
             const pubsub_nanomsg_topic_sender &operator=(const 
pubsub_nanomsg_topic_sender &) = delete;
 
             long getSerializerSvcId() const ;
-            const char *getScope() const ;
-            const char *getTopic() const ;
-            const char *getUrl() const;
+            const std::string &getScope() const ;
+            const std::string &getTopic() const ;
+            const std::string &getUrl() const;
 
             void* getPublisherService(const celix_bundle_t *requestingBundle,
-                    const celix_properties_t *svcProperties 
__attribute__((unused)));
+                                      const celix_properties_t *svcProperties 
__attribute__((unused)));
             void ungetPublisherService(const celix_bundle_t *requestingBundle,
-                        const celix_properties_t *svcProperties 
__attribute__((unused)));
+                                       const celix_properties_t *svcProperties 
__attribute__((unused)));
             int topicPublicationSend(unsigned int msgTypeId, const void 
*inMsg);
             void delay_first_send_for_late_joiners() ;
 
-
-                //private:
+            //private:
             celix_bundle_context_t *ctx;
             log_helper_t *logHelper;
             long serializerSvcId;
             pubsub_serializer_service_t *serializer;
 
-            char *scope{};
-            char *topic{};
-            char scopeAndTopicFilter[5];
-            char *url{};
+            std::string scope{};
+            std::string topic{};
+            std::string scopeAndTopicFilter{};
+            std::string url{};
 
             struct {
-                celix_thread_mutex_t mutex;
+                std::mutex mutex;
                 int socket;
             } nanomsg{};
 
@@ -72,8 +95,9 @@ namespace pubsub {
             } publisher{};
 
             struct {
-                celix_thread_mutex_t mutex{};
-                hash_map_t *map{};  //key = bndId, value = 
psa_nanomsg_bounded_service_entry_t
+                std::mutex mutex{};
+                std::map<long, bounded_service_entry> map{};
+                //hash_map_t *map{};  //key = bndId, value = 
psa_nanomsg_bounded_service_entry_t
             } boundedServices{};
         };
     }

Reply via email to