Repository: celix
Updated Branches:
  refs/heads/nanomsg 3009e6470 -> 7c141424d


Removed celix-maps from nanomsg admin


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/95633eb9
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/95633eb9
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/95633eb9

Branch: refs/heads/nanomsg
Commit: 95633eb954a09232867277d7cd0d71c30d49a012
Parents: 3009e64
Author: Erjan Altena <[email protected]>
Authored: Sat Nov 3 19:40:27 2018 +0100
Committer: Erjan Altena <[email protected]>
Committed: Sat Nov 3 19:40:27 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_admin.cc                 | 148 +++++++------------
 .../src/pubsub_nanomsg_admin.h                  |  34 ++---
 2 files changed, 67 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/95633eb9/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 9fe91d9..6c15ec8 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -32,8 +32,6 @@
 #include "pubsub_utils.h"
 #include "pubsub_nanomsg_admin.h"
 #include "pubsub_psa_nanomsg_constants.h"
-#include "pubsub_nanomsg_topic_sender.h"
-#include "pubsub_nanomsg_topic_receiver.h"
 /*
 //#define L_DEBUG(...) \
 //    logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
@@ -50,11 +48,6 @@
 #define L_ERROR printf
 
 
-typedef struct psa_nanomsg_serializer_entry {
-    const char *serType;
-    long svcId;
-    pubsub_serializer_service_t *svc;
-} psa_nanomsg_serializer_entry_t;
 
 static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip);
 
@@ -103,41 +96,29 @@ 
pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel
     defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
     qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
     qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
-
-    //serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr);
-
-    topicSenders.map = hashMap_create(utils_stringHash, nullptr, 
utils_stringEquals, nullptr);
-
-    topicReceivers.map = hashMap_create(utils_stringHash, nullptr, 
utils_stringEquals, nullptr);
-
-    discoveredEndpoints.map = hashMap_create(utils_stringHash, nullptr, 
utils_stringEquals, nullptr);
 }
 
 pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
     //note assuming al psa register services and service tracker are removed.
     {
         std::lock_guard<std::mutex> lock(topicSenders.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            auto *sender = static_cast<pubsub_nanomsg_topic_sender_t 
*>(hashMapIterator_nextValue(&iter));
+        for (auto kv : topicSenders.map) {
+            auto *sender = kv.second;
             pubsub_nanoMsgTopicSender_destroy(sender);
         }
     }
 
     {
         std::lock_guard<std::mutex> lock(topicReceivers.mutex);
-        hash_map_iterator_t iter = 
hashMapIterator_construct(topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            auto *recv = 
static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
-            pubsub_nanoMsgTopicReceiver_destroy(recv);
+        for (auto kv: topicReceivers.map) {
+            pubsub_nanoMsgTopicReceiver_destroy(kv.second);
         }
     }
 
     {
         std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
-        hash_map_iterator_t iter = 
hashMapIterator_construct(discoveredEndpoints.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            auto *ep = 
static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+        for (auto entry : discoveredEndpoints.map) {
+            auto *ep = entry.second;
             celix_properties_destroy(ep);
         }
     }
@@ -150,12 +131,6 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
         }
     }
 
-    hashMap_destroy(topicSenders.map, true, false);
-
-    hashMap_destroy(topicReceivers.map, true, false);
-
-    hashMap_destroy(discoveredEndpoints.map, false, false);
-
     free(ipAddress);
 
 }
@@ -276,21 +251,19 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void 
*/*svc*/, const celix_proper
     std::lock_guard<std::mutex> lock(serializers.mutex);
 
     psa_nanomsg_serializer_entry_t* entry = nullptr;
-    auto kv = serializers.map.find(svcId);
-    if (kv != serializers.map.end()) {
-        entry = kv->second;
+    auto kvsm = serializers.map.find(svcId);
+    if (kvsm != serializers.map.end()) {
+        entry = kvsm->second;
     }
     serializers.map.erase(svcId);
     if (entry != nullptr) {
         {
             std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
-            hash_map_iterator_t iter = 
hashMapIterator_construct(topicSenders.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *senderEntry = 
hashMapIterator_nextEntry(&iter);
-                auto *sender = static_cast<pubsub_nanomsg_topic_sender_t 
*>(hashMapEntry_getValue(senderEntry));
+                for (auto kv: topicSenders.map) {
+                auto *sender = kv.second;
                 if (sender != nullptr && entry->svcId == 
pubsub_nanoMsgTopicSender_serializerSvcId(sender)) {
-                    char *key = static_cast<char 
*>(hashMapEntry_getKey(senderEntry));
-                    hashMapIterator_remove(&iter);
+                    char *key = kv.first;
+                    topicSenders.map.erase(kv.first);
                     pubsub_nanoMsgTopicSender_destroy(sender);
                     free(key);
                 }
@@ -299,13 +272,11 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void 
*/*svc*/, const celix_proper
 
         {
             std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
-            hash_map_iterator_t iter = 
hashMapIterator_construct(topicReceivers.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *senderEntry = 
hashMapIterator_nextEntry(&iter);
-                auto *receiver = 
static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry));
+            for (auto kv : topicReceivers.map){
+                auto *receiver = kv.second;
                 if (receiver != nullptr && entry->svcId == 
pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) {
-                    char *key = 
static_cast<char*>(hashMapEntry_getKey(senderEntry));
-                    hashMapIterator_remove(&iter);
+                    char *key = kv.first;
+                    topicReceivers.map.erase(key);
                     pubsub_nanoMsgTopicReceiver_destroy(receiver);
                     free(key);
                 }
@@ -365,7 +336,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const 
char *scope, const c
     pubsub_nanomsg_topic_sender_t *sender = nullptr;
     std::lock_guard<std::mutex> serializerLock(serializers.mutex);
     std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
-    sender = static_cast<pubsub_nanomsg_topic_sender_t 
*>(hashMap_get(topicSenders.map, key));
+    sender = topicSenders.map.find(key)->second;
     if (sender == nullptr) {
         //auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t 
*>(hashMap_get(serializers.map,
         //                                                                     
      (void *) serializerSvcId));
@@ -389,7 +360,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const 
char *scope, const c
             if (cn != nullptr) {
                 celix_properties_set(newEndpoint, "container_name", cn);
             }
-            hashMap_put(topicSenders.map, key, sender);
+            topicSenders.map[key] = sender;
         } else {
             L_ERROR("[PSA NANOMSG] Error creating a TopicSender");
             free(key);
@@ -417,10 +388,10 @@ celix_status_t 
pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons
 
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
     std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
-    hash_map_entry_t *entry = hashMap_getEntry(topicSenders.map, key);
-    if (entry != nullptr) {
-        char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry));
-        pubsub_nanomsg_topic_sender_t *sender = 
static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_remove(topicSenders.map, 
key));
+    auto kv = topicSenders.map.find(key);
+    if (kv != topicSenders.map.end()) {
+        char *mapKey = kv->first;
+        pubsub_nanomsg_topic_sender_t *sender = kv->second;
         free(mapKey);
         //TODO disconnect endpoints to sender. note is this needed for a 
nanomsg topic sender?
         pubsub_nanoMsgTopicSender_destroy(sender);
@@ -442,18 +413,21 @@ celix_status_t 
pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
     {
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-        receiver = static_cast<pubsub_nanomsg_topic_receiver_t 
*>(hashMap_get(topicReceivers.map, key));
+         auto trkv = topicReceivers.map.find(key);
+         if (trkv != topicReceivers.map.end()) {
+             receiver = trkv->second;
+         }
         if (receiver == nullptr) {
-            auto kv = serializers.map.find(serializerSvcId);
-            if (kv != serializers.map.end()) {
-                auto serEntry = kv->second;
+            auto kvs = serializers.map.find(serializerSvcId);
+            if (kvs != serializers.map.end()) {
+                auto serEntry = kvs->second;
                 receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, 
topic, serializerSvcId, serEntry->svc);
             } else {
                 L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender 
%s/%s", scope, topic);
             }
             if (receiver != nullptr) {
                 const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
-                const char *serType = kv->second->serType;
+                const char *serType = kvs->second->serType;
                 newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, 
PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
                                                     serType, nullptr);
                 //if available also set container name
@@ -461,7 +435,7 @@ celix_status_t 
pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
                 if (cn != nullptr) {
                     celix_properties_set(newEndpoint, "container_name", cn);
                 }
-                hashMap_put(topicReceivers.map, key, receiver);
+                topicReceivers.map[key] = receiver;
             } else {
                 L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
                 free(key);
@@ -473,9 +447,8 @@ celix_status_t 
pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
     }
     if (receiver != nullptr && newEndpoint != nullptr) {
         std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
-        hash_map_iterator_t iter = 
hashMapIterator_construct(discoveredEndpoints.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            auto *endpoint = 
static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter));
+        for (auto entry : discoveredEndpoints.map) {
+            auto *endpoint = entry.second;
             const char *type = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TYPE, nullptr);
             if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, 
type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
                 connectEndpointToReceiver(receiver, endpoint);
@@ -494,12 +467,12 @@ celix_status_t 
pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const
 celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, 
const char *topic) {
     char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
     std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-    hash_map_entry_t *entry = hashMap_getEntry(topicReceivers.map, key);
+    auto entry = topicReceivers.map.find(key);
     free(key);
-    if (entry != nullptr) {
-        char *receiverKey = static_cast<char*>(hashMapEntry_getKey(entry));
-        pubsub_nanomsg_topic_receiver_t *receiver = 
static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(entry));
-        hashMap_remove(topicReceivers.map, receiverKey);
+    if (entry != topicReceivers.map.end()) {
+        char *receiverKey = entry->first;
+        pubsub_nanomsg_topic_receiver_t *receiver = entry->second;
+        topicReceivers.map.erase(receiverKey);
 
         free(receiverKey);
         pubsub_nanoMsgTopicReceiver_destroy(receiver);
@@ -542,17 +515,17 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const 
celix_properties_t *endpo
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, 
strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
-        hash_map_iterator_t iter = 
hashMapIterator_construct(topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_nanomsg_topic_receiver_t *receiver = 
static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+        for (auto entry: topicReceivers.map) {
+            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
             connectEndpointToReceiver(receiver, endpoint);
         }
     }
 
     std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
     celix_properties_t *cpy = celix_properties_copy(endpoint);
+    //TODO : check if properties are never deleted before map.
     const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, 
nullptr);
-    hashMap_put(discoveredEndpoints.map, (void*)uuid, cpy);
+    discoveredEndpoints.map[uuid] = cpy;
 
     celix_status_t  status = CELIX_SUCCESS;
     return status;
@@ -590,24 +563,17 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const 
celix_properties_t *en
 
     if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, 
strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-        hash_map_iterator_t iter = 
hashMapIterator_construct(topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_nanomsg_topic_receiver_t *receiver = 
static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter));
+        for (auto entry : topicReceivers.map) {
+            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
             disconnectEndpointFromReceiver(receiver, endpoint);
         }
     }
-    celix_properties_t *found = nullptr;
     {
         std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
         const char *uuid = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_UUID, nullptr);
-        found = 
static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, 
(void*)uuid));
-    }
-    if (found != nullptr) {
-        celix_properties_destroy(found);
+        discoveredEndpoints.map.erase(uuid);
     }
-
-    celix_status_t  status = CELIX_SUCCESS;
-    return status;
+    return CELIX_SUCCESS;;
 }
 
 celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine 
__attribute__((unused)), FILE *out,
@@ -619,15 +585,11 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char 
*commandLine __attribut
     {
         std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
-        hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_nanomsg_topic_sender_t *sender = 
static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(
-                    &iter));
+        for (auto kvts: topicSenders.map) {
+            pubsub_nanomsg_topic_sender_t *sender = kvts.second;
             long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender);
-            auto kv = serializers.map.find(serSvcId);
-            //psa_nanomsg_serializer_entry_t *serEntry = 
static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(
-            //        serializers.map, (void *) serSvcId));
-            const char *serType = kv->second == nullptr ? "!Error!" : 
kv->second->serType;
+            auto kvs = serializers.map.find(serSvcId);
+            const char *serType = kvs->second == nullptr ? "!Error!" : 
kvs->second->serType;
             const char *scope = pubsub_nanoMsgTopicSender_scope(sender);
             const char *topic = pubsub_nanoMsgTopicSender_topic(sender);
             const char *url = pubsub_nanoMsgTopicSender_url(sender);
@@ -640,12 +602,10 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char 
*commandLine __attribut
     {
         fprintf(out, "\n");
         fprintf(out, "\nTopic Receivers:\n");
-        std::lock_guard<std::mutex> serialerLock(serializers.mutex);
+        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
         std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
-        hash_map_iterator_t iter = 
hashMapIterator_construct(topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_nanomsg_topic_receiver_t *receiver = 
static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMapIterator_nextValue(
-                    &iter));
+        for (auto entry : topicReceivers.map) {
+            pubsub_nanomsg_topic_receiver_t *receiver = entry.second;
             long serSvcId = 
pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver);
             auto kv =  serializers.map.find(serSvcId);
             const char *serType = kv->second == nullptr ? "!Error!" : 
kv->second->serType;

http://git-wip-us.apache.org/repos/asf/celix/blob/95633eb9/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 98314b3..c34a310 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@ -29,6 +29,8 @@
 #include <pubsub_serializer.h>
 
 #include "../../../shell/shell/include/command.h"
+#include "pubsub_nanomsg_topic_sender.h"
+#include "pubsub_nanomsg_topic_receiver.h"
 
 #define PUBSUB_NANOMSG_ADMIN_TYPE       "zmq"
 #define PUBSUB_NANOMSG_URL_KEY          "zmq.url"
@@ -42,6 +44,13 @@
 #define PUBSUB_NANOMSG_DEFAULT_IP       "127.0.0.1"
 
 //typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t;
+
+template <typename key, typename value>
+struct ProtectedMap {
+    std::mutex mutex{};
+    std::map<key, value> map{};
+};
+
 class pubsub_nanomsg_admin {
 public:
     pubsub_nanomsg_admin(celix_bundle_context_t *ctx, log_helper_t *logHelper);
@@ -106,27 +115,10 @@ private:
         long svcId;
         pubsub_serializer_service_t *svc;
     } psa_nanomsg_serializer_entry_t;
-    struct {
-        std::mutex mutex;
-        std::map<long, psa_nanomsg_serializer_entry_t*> map;
-        //hash_map_t *map; //key = svcId, value = 
psa_nanomsg_serializer_entry_t*
-    } serializers{};
-
-    struct {
-        std::mutex mutex;
-        hash_map_t *map; //key = scope:topic key, value = 
pubsub_nanomsg_topic_sender_t*
-    } topicSenders{};
-
-    struct {
-        std::mutex mutex;
-        hash_map_t *map; //key = scope:topic key, value = 
pubsub_nanomsg_topic_sender_t*
-    } topicReceivers{};
-
-    struct {
-        std::mutex mutex;
-        hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* 
(endpoint)
-    } discoveredEndpoints{};
-
+    ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{};
+    ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{};
+    ProtectedMap<char*, pubsub_nanomsg_topic_receiver_t*> topicReceivers{};
+    ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{};
 };
 
 #ifdef __cplusplus

Reply via email to