Nanomsg

Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/707b8e54
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/707b8e54
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/707b8e54

Branch: refs/heads/nanomsg
Commit: 707b8e54aeda61c5eba4b8a3fa65059ec0f5821a
Parents: b6a0337
Author: Erjan Altena <[email protected]>
Authored: Sat Dec 1 09:26:44 2018 +0100
Committer: Erjan Altena <[email protected]>
Committed: Sat Dec 1 09:26:44 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_admin.cc                 | 66 ++++++++------------
 .../src/pubsub_nanomsg_admin.h                  |  4 +-
 2 files changed, 29 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/707b8e54/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index cc17ebb..ba47723 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -28,6 +28,7 @@
 #include <netdb.h>
 #include <ifaddrs.h>
 #include <pubsub_endpoint.h>
+#include <algorithm>
 
 #include "pubsub_utils.h"
 #include "pubsub_nanomsg_admin.h"
@@ -96,11 +97,11 @@ 
pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel
 pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
     //note assuming al psa register services and service tracker are removed.
     {
-        std::lock_guard<std::mutex> lock(topicSenders.mutex);
-        for (auto kv : topicSenders.map) {
-            auto *sender = kv.second;
-            delete (sender);
-        }
+//        std::lock_guard<std::mutex> lock(topicSenders.mutex);
+//        for (auto &kv : topicSenders.map) {
+//            auto &sender = kv.second;
+//            delete (sender);
+//        }
     }
 
     {
@@ -224,11 +225,6 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, 
const celix_properties_t
             serializers.map.emplace(std::piecewise_construct,
                     std::forward_as_tuple(svcId),
                     std::forward_as_tuple(serType, svcId, 
static_cast<pubsub_serializer_service_t*>(svc)));
-//            auto entry = 
static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, 
sizeof(psa_nanomsg_serializer_entry_t)));
-//            entry->serType = serType;
-//            entry->svcId = svcId;
-//            entry->svc = static_cast<pubsub_serializer_service_t*>(svc);
-//            serializers.map[svcId] = entry;
         }
     }
 }
@@ -250,11 +246,12 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void 
*/*svc*/, const celix_proper
         auto &entry = kvsm->second;
         {
             std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
-            for (auto &kv: topicSenders.map) {
-                auto *sender = kv.second;
-                if (sender != nullptr && entry.svcId == 
sender->getSerializerSvcId()) {
-                    topicSenders.map.erase(kv.first);
-                    delete (sender);
+            for(auto it = topicSenders.map.begin(); it != 
topicSenders.map.end(); /*nothing*/) {
+                auto &sender = it->second;
+                if (entry.svcId == sender.getSerializerSvcId()) {
+                    it = topicSenders.map.erase(it);
+                } else {
+                    ++it;
                 }
             }
         }
@@ -320,32 +317,30 @@ celix_status_t 
pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c
     celix_properties_t *newEndpoint = nullptr;
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-    pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = nullptr;
     std::lock_guard<std::mutex> serializerLock(serializers.mutex);
     std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
-    sender = topicSenders.map.find(key)->second;
-    if (sender == nullptr) {
+    auto sender = topicSenders.map.find(key);
+    if (sender == topicSenders.map.end()) {
         psa_nanomsg_serializer_entry_t *serEntry = nullptr;
         auto kv = serializers.map.find(serializerSvcId);
         if (kv != serializers.map.end()) {
             serEntry = &kv->second;
         }
         if (serEntry != nullptr) {
-            sender = new pubsub::nanomsg::pubsub_nanomsg_topic_sender(ctx, 
log, scope, topic, serializerSvcId, serEntry->svc, ipAddress,
-                                                      basePort, maxPort);
-        }
-        if (sender != nullptr) {
+            auto e = topicSenders.map.emplace(std::piecewise_construct,
+                    std::forward_as_tuple(key),
+                    std::forward_as_tuple(ctx, log, scope, topic, 
serializerSvcId, serEntry->svc, ipAddress,
+                                          basePort, maxPort));
             const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
             const char *serType = serEntry->serType;
             newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, 
PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType,
                                                 nullptr);
-            celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, 
sender->getUrl().c_str());
+            celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, 
e.first->second.getUrl().c_str());
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(ctx, 
"CELIX_CONTAINER_NAME", nullptr);
             if (cn != nullptr) {
                 celix_properties_set(newEndpoint, "container_name", cn);
             }
-            topicSenders.map[key] = sender;
         } else {
             L_ERROR("[PSA NANOMSG] Error creating a TopicSender");
             free(key);
@@ -354,9 +349,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const 
char *scope, const c
         free(key);
         L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for 
scope/topic %s/%s!", scope, topic);
     }
-    if (sender != nullptr && newEndpoint != nullptr) {
-        //TODO connect endpoints to sender, NOTE is this needed for a nanomsg 
topic sender?
-    }
 
     if (newEndpoint != nullptr && outPublisherEndpoint != nullptr) {
         *outPublisherEndpoint = newEndpoint;
@@ -373,12 +365,8 @@ celix_status_t 
pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
     std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
-    auto kv = topicSenders.map.find(key);
-    if (kv != topicSenders.map.end()) {
-        pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kv->second;
-        //TODO disconnect endpoints to sender. note is this needed for a 
nanomsg topic sender?
-        delete (sender);
-    } else {
+    ;
+    if (topicSenders.map.erase(key) == 0) {
         L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic 
%s/%s. Does not exists", scope, topic);
     }
     free(key);
@@ -558,14 +546,14 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char 
*commandLine __attribut
     {
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
-        for (auto kvts: topicSenders.map) {
-            pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kvts.second;
-            long serSvcId = sender->getSerializerSvcId();
+        for (auto &senderEntry: topicSenders.map) {
+            auto &sender = senderEntry.second;
+            long serSvcId = sender.getSerializerSvcId();
             auto kvs = serializers.map.find(serSvcId);
             const char* serType = ( kvs == serializers.map.end() ? "!Error" :  
kvs->second.serType);
-            const auto scope = sender->getScope();
-            const auto topic = sender->getTopic();
-            const auto url = sender->getUrl();
+            const auto scope = sender.getScope();
+            const auto topic = sender.getTopic();
+            const auto url = sender.getUrl();
             fprintf(out, "|- Topic Sender %s/%s\n", scope.c_str(), 
topic.c_str());
             fprintf(out, "   |- serializer type = %s\n", serType);
             fprintf(out, "   |- url             = %s\n", url.c_str());

http://git-wip-us.apache.org/repos/asf/celix/blob/707b8e54/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 8ec35e5..8ed5ddd 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -119,9 +119,9 @@ private:
         pubsub_serializer_service_t *svc;
     } psa_nanomsg_serializer_entry_t;
     ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{};
-    ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender*> 
topicSenders{};
+    ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender> 
topicSenders{};
     ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> 
topicReceivers{};
-    ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
+    ProtectedMap<const std::string, celix_properties_t *> 
discoveredEndpoints{};
 };
 
 #ifdef __cplusplus

Reply via email to