NanoMsg

Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b6a03372
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b6a03372
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b6a03372

Branch: refs/heads/nanomsg
Commit: b6a0337233c48c48bf48a86274682ba8329f91f3
Parents: 15f268d
Author: Erjan Altena <[email protected]>
Authored: Thu Nov 29 20:39:40 2018 +0100
Committer: Erjan Altena <[email protected]>
Committed: Thu Nov 29 20:39:40 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_admin.cc                 |   7 +-
 .../src/pubsub_nanomsg_admin.h                  |   2 +-
 .../src/pubsub_nanomsg_topic_receiver.cc        |  34 +++---
 .../src/pubsub_nanomsg_topic_receiver.h         |   4 +-
 .../src/pubsub_nanomsg_topic_sender.cc          | 119 ++++++++++++-------
 .../src/pubsub_nanomsg_topic_sender.h           |  14 ++-
 6 files changed, 106 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 9fdf2a3..cc17ebb 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -250,13 +250,11 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void 
*/*svc*/, const celix_proper
         auto &entry = kvsm->second;
         {
             std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
-                for (auto kv: topicSenders.map) {
+            for (auto &kv: topicSenders.map) {
                 auto *sender = kv.second;
                 if (sender != nullptr && entry.svcId == 
sender->getSerializerSvcId()) {
-                    char *key = kv.first;
                     topicSenders.map.erase(kv.first);
                     delete (sender);
-                    free(key);
                 }
             }
         }
@@ -377,9 +375,7 @@ celix_status_t 
pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
     std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
     auto kv = topicSenders.map.find(key);
     if (kv != topicSenders.map.end()) {
-        char *mapKey = kv->first;
         pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kv->second;
-        free(mapKey);
         //TODO disconnect endpoints to sender. note is this needed for a 
nanomsg topic sender?
         delete (sender);
     } else {
@@ -557,7 +553,6 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const 
celix_properties_t *en
 celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine 
__attribute__((unused)), FILE *out,
                                                   FILE *errStream 
__attribute__((unused))) {
     celix_status_t  status = CELIX_SUCCESS;
-
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
     {

http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 7c2e9a0..8ec35e5 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -119,7 +119,7 @@ private:
         pubsub_serializer_service_t *svc;
     } psa_nanomsg_serializer_entry_t;
     ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{};
-    ProtectedMap<char*, pubsub::nanomsg::pubsub_nanomsg_topic_sender*> 
topicSenders{};
+    ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender*> 
topicSenders{};
     ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> 
topicReceivers{};
     ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
 };

http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 30c2af7..83d1caf 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -72,7 +72,6 @@ 
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
     ctx = _ctx;
     logHelper = _logHelper;
     serializer = _serializer;
-    m_scopeAndTopicFilter = psa_nanomsg_setScopeAndTopicFilter(m_scope, 
m_topic);
 
     m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
     if (m_nanoMsgSocket < 0) {
@@ -88,20 +87,7 @@ 
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
 
         auto subscriberFilter = psa_nanomsg_setScopeAndTopicFilter(m_scope, 
m_topic);
 
-        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, 
m_topic.c_str());
-        char buf[size + 1];
-        snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, 
m_topic.c_str());
-        celix_service_tracking_options_t opts{};
-        opts.filter.ignoreServiceLanguage = true;
-        opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
-        opts.filter.filter = buf;
-        opts.callbackHandle = this;
-        opts.addWithOwner = [](void *handle, void *svc, const 
celix_properties_t *props, const celix_bundle_t *svcOwner) {
-            static_cast<topic_receiver*>(handle)->addSubscriber(svc, props, 
svcOwner);
-        };
-        opts.removeWithOwner = [](void *handle, void *svc, const 
celix_properties_t *props, const celix_bundle_t *svcOwner) {
-            static_cast<topic_receiver*>(handle)->removeSubscriber(svc, props, 
svcOwner);
-        };
+        auto opts = createOptions();
 
         subscriberTrackerId = 
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
         recvThread.running = true;
@@ -110,6 +96,24 @@ 
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
     }
 }
 
+celix_service_tracking_options_t 
pubsub::nanomsg::topic_receiver::createOptions() {
+    std::stringstream filter_str;
+
+    filter_str << "(" << PUBSUB_SUBSCRIBER_TOPIC << "=" <<  m_topic << ")";
+    celix_service_tracking_options_t opts{};
+    opts.filter.ignoreServiceLanguage = true;
+    opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+    opts.filter.filter = strdup(filter_str.str().c_str()); // TODO : memory 
leak ??
+    opts.callbackHandle = this;
+    opts.addWithOwner = [](void *handle, void *svc, const celix_properties_t 
*props, const celix_bundle_t *svcOwner) {
+            
static_cast<pubsub::nanomsg::topic_receiver*>(handle)->addSubscriber(svc, 
props, svcOwner);
+        };
+    opts.removeWithOwner = [](void *handle, void *svc, const 
celix_properties_t *props, const celix_bundle_t *svcOwner) {
+            
static_cast<pubsub::nanomsg::topic_receiver*>(handle)->removeSubscriber(svc, 
props, svcOwner);
+        };
+    return opts;
+}
+
 pubsub::nanomsg::topic_receiver::~topic_receiver() {
 
         {

http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index 02c462e..6b0950a 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -30,7 +30,7 @@
 
 struct psa_nanomsg_subscriber_entry {
     psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) :
-    svc{_svc}, usageCount{_usageCount} {
+        svc{_svc}, usageCount{_usageCount} {
     }
     pubsub_subscriber_t *svc{};
     int usageCount;
@@ -92,6 +92,7 @@ namespace pubsub {
             void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* 
entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t 
payloadSize);
             void addSubscriber(void *svc, const celix_properties_t *props, 
const celix_bundle_t *bnd);
             void removeSubscriber(void */*svc*/, const celix_properties_t 
*/*props*/, const celix_bundle_t *bnd);
+            celix_service_tracking_options_t createOptions();
 
         private:
             celix_bundle_context_t *ctx{nullptr};
@@ -100,7 +101,6 @@ namespace pubsub {
             pubsub_serializer_service_t *serializer{nullptr};
             const std::string m_scope{};
             const std::string m_topic{};
-            std::string m_scopeAndTopicFilter{};
 
             int m_nanoMsgSocket{0};
 

http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index 9a253b2..7521d78 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@ -19,7 +19,7 @@
 
 #include <memory.h>
 #include <iostream>
-
+#include <sstream>
 #include <stdlib.h>
 #include <utils.h>
 #include <arpa/inet.h>
@@ -38,18 +38,46 @@
 #define FIRST_SEND_DELAY_IN_SECONDS                 2
 #define NANOMSG_BIND_MAX_RETRY                      10
 
-#define L_DEBUG(...) \
-    logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
-#define L_INFO(...) \
-    logHelper_log(logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
-#define L_WARN(...) \
-    logHelper_log(logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
-#define L_ERROR(...) \
-    logHelper_log(logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+template <typename T>
+std::stringstream LOG_STREAM(T first) {
+    std::stringstream ss;
+    ss << first;
+    return ss;
+}
+
+template <typename T, typename... Args>
+std::stringstream LOG_STREAM(T first, Args... args) {
+    std::stringstream ss;
+    ss << first << LOG_STREAM(args...).str();
+    return ss;
+}
+
+template <typename... Args>
+void L_DEBUG(log_helper_t *logHelper, Args... args) {
+    std::stringstream ss = LOG_STREAM(args...);
+    logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, ss.str().c_str());
+}
+
+template <typename... Args>
+void L_INFO(log_helper_t *logHelper, Args... args) {
+    auto ss = LOG_STREAM(args...);
+    logHelper_log(logHelper, OSGI_LOGSERVICE_INFO, ss.str().c_str());
+}
+
+template <typename... Args>
+void L_WARN(log_helper_t *logHelper, Args... args) {
+    auto ss = LOG_STREAM(args...);
+    logHelper_log(logHelper, OSGI_LOGSERVICE_WARNING, ss.str().c_str());
+}
 
+template <typename... Args>
+void L_ERROR(log_helper_t *logHelper, Args... args) {
+    auto ss = LOG_STREAM(args...);
+    logHelper_log((log_helper_pt)logHelper, OSGI_LOGSERVICE_ERROR, 
ss.str().c_str());
+}
 
 static unsigned int rand_range(unsigned int min, unsigned int max);
-static void 
delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender 
*sender);
+static void delay_first_send_for_late_joiners(log_helper_t* logHelper);
 
 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t
 *_ctx,
                                                          log_helper_t 
*_logHelper,
@@ -70,8 +98,8 @@ 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_
     scopeAndTopicFilter = psa_nanomsg_setScopeAndTopicFilter(_scope, _topic);
 
     //setting up nanomsg socket for nanomsg TopicSender
-    int socket = nn_socket(AF_SP, NN_BUS);
-    if (socket == -1) {
+    int nnSock = nn_socket(AF_SP, NN_BUS);
+    if (nnSock == -1) {
         perror("Error for nanomsg_socket");
     }
 
@@ -79,23 +107,20 @@ 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_
     while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
         /* Randomized part due to same bundle publishing on different topics */
         unsigned int port = rand_range(_basePort,_maxPort);
-        size_t len = (size_t)snprintf(nullptr, 0, "tcp://%s:%u", _bindIp, 
port) + 1;
-        char *_url = static_cast<char*>(calloc(len, sizeof(char*)));
-        snprintf(_url, len, "tcp://%s:%u", _bindIp, port);
-
-        len = (size_t)snprintf(nullptr, 0, "tcp://0.0.0.0:%u", port) + 1;
-        char *bindUrl = static_cast<char*>(calloc(len, sizeof(char)));
-        snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port);
-        rv = nn_bind (socket, bindUrl);
+        std::stringstream _url;
+        _url << "tcp://" << _bindIp << ":" << port;
+
+        std::stringstream bindUrl;
+        bindUrl << "tcp://0.0.0.0:" << port;
+
+        rv = nn_bind (nnSock, bindUrl.str().c_str());
         if (rv == -1) {
             perror("Error for nn_bind");
-            free(_url);
         } else {
-            this->url = _url;
-            nanomsg.socket = socket;
+            this->url = _url.str();
+            nanomsg.socket = nnSock;
         }
         retry++;
-        free(bindUrl);
     }
 
     if (!url.empty()) {
@@ -132,7 +157,6 @@ 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() {
     celix_bundleContext_unregisterService(ctx, publisher.svcId);
 
     nn_close(nanomsg.socket);
-
     std::lock_guard<std::mutex> lock(boundedServices.mutex);
     for  (auto &it: boundedServices.map) {
             serializer->destroySerializerMap(serializer->handle, 
it.second.msgTypes);
@@ -170,7 +194,7 @@ void* 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const ce
     } else {
         auto entry = boundedServices.map.emplace(std::piecewise_construct,
                                     std::forward_as_tuple(bndId),
-                                    std::forward_as_tuple(this, bndId, 
logHelper));
+                                    std::forward_as_tuple(scope, topic, bndId, 
nanomsg.socket, logHelper));
         int rc = serializer->createSerializerMap(serializer->handle, 
(celix_bundle_t*)requestingBundle, &entry.first->second.msgTypes);
 
         if (rc == 0) {
@@ -179,11 +203,14 @@ void* 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const ce
             entry.first->second.service.send = [](void *handle, unsigned int 
msgTypeId, const void *msg) {
                 return 
static_cast<pubsub::nanomsg::bounded_service_entry*>(handle)->topicPublicationSend(msgTypeId,
 msg);
             };
-            entry.first->second.service.sendMultipart = NULL; //not supported 
TODO remove
+            entry.first->second.service.sendMultipart = nullptr; //not 
supported TODO remove
             service = &entry.first->second.service;
         } else {
             boundedServices.map.erase(bndId);
-            L_ERROR("Error creating serializer map for NanoMsg TopicSender 
%s/%s", scope, topic);
+            auto x =  LOG_STREAM(12, "hallo");
+            logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, x.str().c_str());
+            log_helper_pt lh = logHelper;
+            L_ERROR(lh, "Error creating serializer map for NanoMsg TopicSender 
", scope, topic);
         }
     }
 
@@ -201,7 +228,7 @@ void 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const c
         if (entry->second.getCount == 0) {
             int rc = serializer->destroySerializerMap(serializer->handle, 
entry->second.msgTypes);
             if (rc != 0) {
-                L_ERROR("Error destroying publisher service, serializer not 
available / cannot get msg serializer map\n");
+                L_ERROR(logHelper, "Error destroying publisher service, 
serializer not available / cannot get msg serializer map\n");
             }
             boundedServices.map.erase(bndId);
         }
@@ -210,31 +237,30 @@ void 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const c
 
 int pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int 
msgTypeId, const void *inMsg) {
     int status;
-    pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = parent;
-    pubsub_msg_serializer_t* msgSer = 
static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes, 
(void*)(uintptr_t)msgTypeId));
+    auto msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes, 
(void*)(uintptr_t)msgTypeId));
 
-    if (msgSer != NULL) {
-        delay_first_send_for_late_joiners(sender);
+    if (msgSer != nullptr) {
+        delay_first_send_for_late_joiners(logHelper);
 
         int major = 0, minor = 0;
 
         pubsub_nanmosg_msg_header_t msg_hdr;// = calloc(1, sizeof(*msg_hdr));
         msg_hdr.type = msgTypeId;
 
-        if (msgSer->msgVersion != NULL) {
+        if (msgSer->msgVersion != nullptr) {
             version_getMajor(msgSer->msgVersion, &major);
             version_getMinor(msgSer->msgVersion, &minor);
             msg_hdr.major = (unsigned char) major;
             msg_hdr.minor = (unsigned char) minor;
         }
 
-        void *serializedOutput = NULL;
+        void *serializedOutput = nullptr;
         size_t serializedOutputLen = 0;
         status = msgSer->serialize(msgSer, inMsg, &serializedOutput, 
&serializedOutputLen);
         if (status == CELIX_SUCCESS) {
             nn_iovec data[2];
 
-            nn_msghdr msg;
+            nn_msghdr msg{};
             msg.msg_iov = data;
             msg.msg_iovlen = 2;
             msg.msg_iov[0].iov_base = static_cast<void*>(&msg_hdr);
@@ -244,31 +270,34 @@ int 
pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int ms
             msg.msg_control = nullptr;
             msg.msg_controllen = 0;
             errno = 0;
-            int rc = nn_sendmsg(sender->nanomsg.socket, &msg, 0 );
+            int rc = nn_sendmsg(nanoMsgSocket, &msg, 0 );
             free(serializedOutput);
             if (rc < 0) {
-                L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, 
strerror(errno));
+                L_WARN(logHelper, "[PSA_ZMQ_TS] Error sending zmsg, rc: ", rc, 
", error: ",  strerror(errno));
             } else {
-                L_INFO("[PSA_ZMQ_TS] Send message with size %d\n",  rc);
-                L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor 
%d\n",  msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor);
+                L_INFO(logHelper, "[PSA_ZMQ_TS] Send message with size ",  rc, 
"\n");
+                L_INFO(logHelper, "[PSA_ZMQ_TS] Send message ID ", 
msg_hdr.type,
+                        " major: ", (int)msg_hdr.major,
+                        " minor: ",  (int)msg_hdr.minor,"\n");
             }
         } else {
-            L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for 
scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic);
+            L_WARN(logHelper, "[PSA_ZMQ_TS] Error serialize message of type ", 
msgSer->msgName,
+                    " for scope/topic ", scope.c_str(), "/", 
topic.c_str(),"\n");
         }
     } else {
         status = CELIX_SERVICE_EXCEPTION;
-        L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id 
%i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
+        L_WARN(logHelper, "[PSA_ZMQ_TS] Error cannot serialize message with 
msg type id ", msgTypeId,
+                " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n");
     }
     return status;
 }
 
-static void 
delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender 
*sender) {
+static void delay_first_send_for_late_joiners(log_helper_t* logHelper) {
 
     static bool firstSend = true;
 
     if(firstSend){
-        auto logHelper = sender->logHelper;
-        L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
+        L_INFO(logHelper, "PSA_UDP_MC_TP: Delaying first send for late 
joiners...\n");
         sleep(FIRST_SEND_DELAY_IN_SECONDS);
         firstSend = false;
     }

http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
index 1883a41..6a3254e 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h
@@ -27,24 +27,28 @@
 
 namespace pubsub {
     namespace nanomsg {
-        class pubsub_nanomsg_topic_sender;
 
         class bounded_service_entry {
         public:
             bounded_service_entry(
-                    pubsub::nanomsg::pubsub_nanomsg_topic_sender *_parent,
+                    std::string &_scope,
+                    std::string &_topic,
                     long _bndId,
-                    log_helper_t* _logHelper) : parent{_parent}, 
bndId{_bndId}, logHelper{_logHelper} {
+                    int _nanoMsgSocket,
+                    log_helper_t* _logHelper) : scope{_scope}, topic{_topic}, 
bndId{_bndId}, nanoMsgSocket{_nanoMsgSocket}, logHelper{_logHelper} {
 
             }
-
+            bounded_service_entry(const bounded_service_entry&) = delete;
+            bounded_service_entry &operator=(const bounded_service_entry&) = 
delete;
             int topicPublicationSend(unsigned int msgTypeId, const void 
*inMsg);
 
-            pubsub::nanomsg::pubsub_nanomsg_topic_sender *parent{};
             pubsub_publisher_t service{};
+            std::string scope;
+            std::string topic;
             long bndId{};
             hash_map_t *msgTypes{};
             int getCount{1};
+            int nanoMsgSocket{};
             log_helper_t *logHelper{};
         } ;
 

Reply via email to