nanomsg celix-map replaced by std::map

Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/8658738d
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/8658738d
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/8658738d

Branch: refs/heads/nanomsg
Commit: 8658738d5e9a905eb0642ea605d36e50dc24730a
Parents: 0abbf43
Author: Erjan Altena <[email protected]>
Authored: Wed Nov 21 21:28:32 2018 +0100
Committer: Erjan Altena <[email protected]>
Committed: Wed Nov 21 21:28:32 2018 +0100

----------------------------------------------------------------------
 .../src/pubsub_nanomsg_topic_receiver.cc        | 128 +++++++++----------
 .../src/pubsub_nanomsg_topic_receiver.h         |  37 +++++-
 2 files changed, 95 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/8658738d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index 88886c6..8acf6b1 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@ -63,18 +63,11 @@
 #define L_ERROR printf
 
 
-typedef struct psa_zmq_requested_connection_entry {
-    char *url;
-    bool connected;
-    int id;
-} psa_nanomsg_requested_connection_entry_t;
 
 
 
-static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, 
const celix_properties_t *props,
-                                                      const celix_bundle_t 
*owner);
-static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void 
*svc, const celix_properties_t *props,
-                                                         const celix_bundle_t 
*owner);
+//static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void 
*svc, const celix_properties_t *props,
+//                                                         const 
celix_bundle_t *owner);
 
 
 pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
@@ -83,7 +76,6 @@ 
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         const char *_topic,
         long _serializerSvcId,
         pubsub_serializer_service_t *_serializer) : 
m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} {
-    //pubsub_nanomsg_topic_receiver_t *receiver = 
static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver)));
     ctx = _ctx;
     logHelper = _logHelper;
     serializer = _serializer;
@@ -107,14 +99,13 @@ 
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
 
         char subscribeFilter[5];
         psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter);
-        //zsock_set_subscribe(receiver->nanoMsgSocket, subscribeFilter);
 
         m_scope = strndup(m_scope, 1024 * 1024);
         m_topic = strndup(m_topic, 1024 * 1024);
 
         subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
         std::cout << "#### Creating subscirbers.map!! " << subscribers.map << 
"\n";
-        requestedConnections.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+        //requestedConnections.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 
         int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, 
m_topic);
         char buf[size + 1];
@@ -124,8 +115,12 @@ 
pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
         opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
         opts.filter.filter = buf;
         opts.callbackHandle = this;
-        opts.addWithOwner = pubsub_nanomsgTopicReceiver_addSubscriber;
-        opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber;
+        opts.addWithOwner = [](void *handle, void *svc, const 
celix_properties_t *props, const celix_bundle_t *svcOwner) {
+            static_cast<topic_receiver*>(handle)->addSubscriber(svc, props, 
svcOwner);
+        };
+        opts.removeWithOwner = [](void *handle, void *svc, const 
celix_properties_t *props, const celix_bundle_t *svcOwner) {
+            static_cast<topic_receiver*>(handle)->removeSubscriber(svc, props, 
svcOwner);
+        };
 
         subscriberTrackerId = 
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
         recvThread.running = true;
@@ -159,18 +154,18 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() {
         }
 
 
-        {
-            std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-            iter = hashMapIterator_construct(requestedConnections.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                psa_nanomsg_requested_connection_entry_t *entry = 
static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
-                if (entry != NULL) {
-                    free(entry->url);
-                    free(entry);
-                }
-            }
-            hashMap_destroy(requestedConnections.map, false, false);
-        }
+//        {
+//            std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
+//            iter = hashMapIterator_construct(requestedConnections.map);
+//            while (hashMapIterator_hasNext(&iter)) {
+//                psa_nanomsg_requested_connection_entry_t *entry = 
static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter));
+//                if (entry != NULL) {
+//                    free(entry->url);
+//                    free(entry);
+//                }
+//            }
+//            hashMap_destroy(requestedConnections.map, false, false);
+//        }
 
         //celixThreadMutex_destroy(&receiver->subscribers.mutex);
         //celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
@@ -196,13 +191,11 @@ long pubsub::nanomsg::topic_receiver::serializerSvcId() 
const {
 void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> 
&connectedUrls,
                                                  std::vector<std::string> 
&unconnectedUrls) {
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-    hash_map_iterator_t iter = 
hashMapIterator_construct(requestedConnections.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_nanomsg_requested_connection_entry_t *entry = 
static_cast<psa_nanomsg_requested_connection_entry_t 
*>(hashMapIterator_nextValue(&iter));
-        if (entry->connected) {
-            connectedUrls.push_back(std::string(entry->url));
+    for (auto entry : requestedConnections.map) {
+        if (entry.second.isConnected()) {
+            connectedUrls.push_back(std::string(entry.second.getUrl()));
         } else {
-            unconnectedUrls.push_back(std::string(entry->url));
+            unconnectedUrls.push_back(std::string(entry.second.getUrl()));
         }
     }
 }
@@ -212,18 +205,18 @@ void pubsub::nanomsg::topic_receiver::connectTo(const 
char *url) {
     L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, 
m_topic, url);
 
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-    psa_nanomsg_requested_connection_entry_t *entry = 
static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(requestedConnections.map,
 url));
-    if (entry == NULL) {
-        entry = 
static_cast<psa_nanomsg_requested_connection_entry_t*>(calloc(1, 
sizeof(*entry)));
-        entry->url = strndup(url, 1024*1024);
-        entry->connected = false;
-        hashMap_put(requestedConnections.map, (void*)entry->url, entry);
+    auto entry  = requestedConnections.map.find(url);
+    if (entry == requestedConnections.map.end()) {
+        requestedConnections.map.emplace(
+                std::piecewise_construct,
+                std::forward_as_tuple(std::string(url)),
+                std::forward_as_tuple(url, -1));
     }
-    if (!entry->connected) {
+    if (!entry->second.isConnected()) {
         int connection_id = nn_connect(m_nanoMsgSocket, url);
         if (connection_id >= 0) {
-            entry->connected = true;
-            entry->id = connection_id;
+            entry->second.setConnected(true);
+            entry->second.setId(connection_id);
         } else {
             L_WARN("[PSA_NANOMSG] Error connecting to NANOMSG url %s. (%s)", 
url, strerror(errno));
         }
@@ -234,33 +227,34 @@ void 
pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
     L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", 
m_scope, m_topic, url);
 
     std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
-    psa_nanomsg_requested_connection_entry_t *entry = 
static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(requestedConnections.map,
 url));
-    if (entry != NULL && entry->connected) {
-        if (nn_shutdown(m_nanoMsgSocket, entry->id) == 0) {
-            entry->connected = false;
-        } else {
-            L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id 
%d. (%s)", url, entry->id, strerror(errno));
+    auto entry = requestedConnections.map.find(url);
+    if (entry != requestedConnections.map.end()) {
+        if (entry->second.isConnected()) {
+            if (nn_shutdown(m_nanoMsgSocket, entry->second.getId()) == 0) {
+                entry->second.setConnected(false);
+            } else {
+                L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, 
id %d. (%s)", url, entry->second.getId(),
+                       strerror(errno));
+            }
         }
-    }
-    if (entry != NULL) {
-        free(entry->url);
-        free(entry);
+        requestedConnections.map.erase(url);
+        std::cerr << "REMOVING connection " << url << std::endl;
+    } else {
+        std::cerr << "Disconnecting from unknown URL " << url << std::endl;
     }
 }
 
-static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, 
const celix_properties_t *props,
+void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const 
celix_properties_t *props,
                                                       const celix_bundle_t 
*bnd) {
-    auto *receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
-
     long bndId = celix_bundle_getId(bnd);
     const char *subScope = celix_properties_get(props, 
PUBSUB_SUBSCRIBER_SCOPE, "default");
-    if (strncmp(subScope, receiver->m_scope, strlen(receiver->m_scope)) != 0) {
+    if (strncmp(subScope, m_scope, strlen(m_scope)) != 0) {
         //not the same scope. ignore
         return;
     }
 
-    std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
-    psa_nanomsg_subscriber_entry_t *entry = 
static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map,
 (void*)bndId));
+    std::lock_guard<std::mutex> _lock(subscribers.mutex);
+    psa_nanomsg_subscriber_entry_t *entry = 
static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, 
(void*)bndId));
     if (entry != NULL) {
         entry->usageCount += 1;
     } else {
@@ -269,33 +263,31 @@ static void 
pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, c
         entry->usageCount = 1;
         entry->svc = static_cast<pubsub_subscriber_t*>(svc);
 
-        int rc = 
receiver->serializer->createSerializerMap(receiver->serializer->handle, 
(celix_bundle_t*)bnd, &entry->msgTypes);
+        int rc = serializer->createSerializerMap(serializer->handle, 
(celix_bundle_t*)bnd, &entry->msgTypes);
         if (rc == 0) {
-            hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
+            hashMap_put(subscribers.map, (void*)bndId, entry);
         } else {
-            L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for 
TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
+            L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for 
TopicReceiver %s/%s", m_scope, m_topic);
             free(entry);
         }
     }
 }
 
-static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void 
*/*svc*/,
+void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
                                                          const 
celix_properties_t */*props*/, const celix_bundle_t *bnd) {
-    auto receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle);
-
     long bndId = celix_bundle_getId(bnd);
 
-    std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex);
-    psa_nanomsg_subscriber_entry_t *entry = 
static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map,
 (void*)bndId));
+    std::lock_guard<std::mutex> _lock(subscribers.mutex);
+    psa_nanomsg_subscriber_entry_t *entry = 
static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, 
(void*)bndId));
     if (entry != NULL) {
         entry->usageCount -= 1;
     }
     if (entry != NULL && entry->usageCount <= 0) {
         //remove entry
-        hashMap_remove(receiver->subscribers.map, (void*)bndId);
-        int rc = 
receiver->serializer->destroySerializerMap(receiver->serializer->handle, 
entry->msgTypes);
+        hashMap_remove(subscribers.map, (void*)bndId);
+        int rc = serializer->destroySerializerMap(serializer->handle, 
entry->msgTypes);
         if (rc != 0) {
-            L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for 
TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic);
+            L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for 
TopicReceiver %s/%s", m_scope, m_topic);
         }
         free(entry);
     }

http://git-wip-us.apache.org/repos/asf/celix/blob/8658738d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
index f977917..3398fb1 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h
@@ -21,6 +21,7 @@
 #include <vector>
 #include <thread>
 #include <mutex>
+#include <map>
 #include "pubsub_serializer.h"
 #include "log_helper.h"
 #include "celix_bundle_context.h"
@@ -33,6 +34,34 @@ typedef struct psa_zmq_subscriber_entry {
     pubsub_subscriber_t *svc;
 } psa_nanomsg_subscriber_entry_t;
 
+typedef struct psa_zmq_requested_connection_entry {
+public:
+    psa_zmq_requested_connection_entry(std::string _url, int _id, bool 
_connected=false):
+    url{_url}, id{_id}, connected{_connected} {
+    }
+    bool isConnected() const {
+        return connected;
+    }
+
+    int getId() const {
+        return id;
+    }
+
+    void setId(int _id) {
+        id = _id;
+    }
+    void setConnected(bool c) {
+        connected = c;
+    }
+
+    const std::string &getUrl() const {
+        return url;
+    }
+private:
+    std::string url;
+    int id;
+    bool connected;
+} psa_nanomsg_requested_connection_entry_t;
 
 namespace pubsub {
     namespace nanomsg {
@@ -58,7 +87,10 @@ namespace pubsub {
             void recvThread_exec();
             void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char 
*payload, size_t payloadSize);
             void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* 
entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t 
payloadSize);
-        //private:
+            void addSubscriber(void *svc, const celix_properties_t *props, 
const celix_bundle_t *bnd);
+            void removeSubscriber(void */*svc*/, const celix_properties_t 
*/*props*/, const celix_bundle_t *bnd);
+
+        private:
             celix_bundle_context_t *ctx{nullptr};
             log_helper_t *logHelper{nullptr};
             long m_serializerSvcId{0};
@@ -77,7 +109,8 @@ namespace pubsub {
 
             struct {
                 std::mutex mutex;
-                hash_map_t *map; //key = zmq url, value = 
psa_zmq_requested_connection_entry_t*
+                std::map<std::string, 
psa_nanomsg_requested_connection_entry_t> map;
+                //hash_map_t *map; //key = zmq url, value = 
psa_zmq_requested_connection_entry_t*
             } requestedConnections{};
 
             long subscriberTrackerId{0};

Reply via email to