nanomsg celix-map replaced by std::map
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/8658738d Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/8658738d Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/8658738d Branch: refs/heads/nanomsg Commit: 8658738d5e9a905eb0642ea605d36e50dc24730a Parents: 0abbf43 Author: Erjan Altena <[email protected]> Authored: Wed Nov 21 21:28:32 2018 +0100 Committer: Erjan Altena <[email protected]> Committed: Wed Nov 21 21:28:32 2018 +0100 ---------------------------------------------------------------------- .../src/pubsub_nanomsg_topic_receiver.cc | 128 +++++++++---------- .../src/pubsub_nanomsg_topic_receiver.h | 37 +++++- 2 files changed, 95 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/8658738d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc index 88886c6..8acf6b1 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc @@ -63,18 +63,11 @@ #define L_ERROR printf -typedef struct psa_zmq_requested_connection_entry { - char *url; - bool connected; - int id; -} psa_nanomsg_requested_connection_entry_t; -static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, - const celix_bundle_t *owner); -static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, - const celix_bundle_t *owner); +//static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, +// const celix_bundle_t *owner); pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, @@ -83,7 +76,6 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, const char *_topic, long _serializerSvcId, pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} { - //pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver))); ctx = _ctx; logHelper = _logHelper; serializer = _serializer; @@ -107,14 +99,13 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, char subscribeFilter[5]; psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter); - //zsock_set_subscribe(receiver->nanoMsgSocket, subscribeFilter); m_scope = strndup(m_scope, 1024 * 1024); m_topic = strndup(m_topic, 1024 * 1024); subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n"; - requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + //requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic); char buf[size + 1]; @@ -124,8 +115,12 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; opts.filter.filter = buf; opts.callbackHandle = this; - opts.addWithOwner = pubsub_nanomsgTopicReceiver_addSubscriber; - opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber; + opts.addWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) { + static_cast<topic_receiver*>(handle)->addSubscriber(svc, props, svcOwner); + }; + opts.removeWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) { + static_cast<topic_receiver*>(handle)->removeSubscriber(svc, props, svcOwner); + }; subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); recvThread.running = true; @@ -159,18 +154,18 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() { } - { - std::lock_guard<std::mutex> _lock(requestedConnections.mutex); - iter = hashMapIterator_construct(requestedConnections.map); - while (hashMapIterator_hasNext(&iter)) { - psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter)); - if (entry != NULL) { - free(entry->url); - free(entry); - } - } - hashMap_destroy(requestedConnections.map, false, false); - } +// { +// std::lock_guard<std::mutex> _lock(requestedConnections.mutex); +// iter = hashMapIterator_construct(requestedConnections.map); +// while (hashMapIterator_hasNext(&iter)) { +// psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter)); +// if (entry != NULL) { +// free(entry->url); +// free(entry); +// } +// } +// hashMap_destroy(requestedConnections.map, false, false); +// } //celixThreadMutex_destroy(&receiver->subscribers.mutex); //celixThreadMutex_destroy(&receiver->requestedConnections.mutex); @@ -196,13 +191,11 @@ long pubsub::nanomsg::topic_receiver::serializerSvcId() const { void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls) { std::lock_guard<std::mutex> _lock(requestedConnections.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(requestedConnections.map); - while (hashMapIterator_hasNext(&iter)) { - psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t *>(hashMapIterator_nextValue(&iter)); - if (entry->connected) { - connectedUrls.push_back(std::string(entry->url)); + for (auto entry : requestedConnections.map) { + if (entry.second.isConnected()) { + connectedUrls.push_back(std::string(entry.second.getUrl())); } else { - unconnectedUrls.push_back(std::string(entry->url)); + unconnectedUrls.push_back(std::string(entry.second.getUrl())); } } } @@ -212,18 +205,18 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) { L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url); std::lock_guard<std::mutex> _lock(requestedConnections.mutex); - psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(requestedConnections.map, url)); - if (entry == NULL) { - entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(calloc(1, sizeof(*entry))); - entry->url = strndup(url, 1024*1024); - entry->connected = false; - hashMap_put(requestedConnections.map, (void*)entry->url, entry); + auto entry = requestedConnections.map.find(url); + if (entry == requestedConnections.map.end()) { + requestedConnections.map.emplace( + std::piecewise_construct, + std::forward_as_tuple(std::string(url)), + std::forward_as_tuple(url, -1)); } - if (!entry->connected) { + if (!entry->second.isConnected()) { int connection_id = nn_connect(m_nanoMsgSocket, url); if (connection_id >= 0) { - entry->connected = true; - entry->id = connection_id; + entry->second.setConnected(true); + entry->second.setId(connection_id); } else { L_WARN("[PSA_NANOMSG] Error connecting to NANOMSG url %s. (%s)", url, strerror(errno)); } @@ -234,33 +227,34 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) { L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url); std::lock_guard<std::mutex> _lock(requestedConnections.mutex); - psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(requestedConnections.map, url)); - if (entry != NULL && entry->connected) { - if (nn_shutdown(m_nanoMsgSocket, entry->id) == 0) { - entry->connected = false; - } else { - L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->id, strerror(errno)); + auto entry = requestedConnections.map.find(url); + if (entry != requestedConnections.map.end()) { + if (entry->second.isConnected()) { + if (nn_shutdown(m_nanoMsgSocket, entry->second.getId()) == 0) { + entry->second.setConnected(false); + } else { + L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->second.getId(), + strerror(errno)); + } } - } - if (entry != NULL) { - free(entry->url); - free(entry); + requestedConnections.map.erase(url); + std::cerr << "REMOVING connection " << url << std::endl; + } else { + std::cerr << "Disconnecting from unknown URL " << url << std::endl; } } -static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, +void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { - auto *receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle); - long bndId = celix_bundle_getId(bnd); const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); - if (strncmp(subScope, receiver->m_scope, strlen(receiver->m_scope)) != 0) { + if (strncmp(subScope, m_scope, strlen(m_scope)) != 0) { //not the same scope. ignore return; } - std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex); - psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map, (void*)bndId)); + std::lock_guard<std::mutex> _lock(subscribers.mutex); + psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId)); if (entry != NULL) { entry->usageCount += 1; } else { @@ -269,33 +263,31 @@ static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, c entry->usageCount = 1; entry->svc = static_cast<pubsub_subscriber_t*>(svc); - int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes); + int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->msgTypes); if (rc == 0) { - hashMap_put(receiver->subscribers.map, (void*)bndId, entry); + hashMap_put(subscribers.map, (void*)bndId, entry); } else { - L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic); + L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", m_scope, m_topic); free(entry); } } } -static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/, +void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd) { - auto receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle); - long bndId = celix_bundle_getId(bnd); - std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex); - psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(receiver->subscribers.map, (void*)bndId)); + std::lock_guard<std::mutex> _lock(subscribers.mutex); + psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMap_get(subscribers.map, (void*)bndId)); if (entry != NULL) { entry->usageCount -= 1; } if (entry != NULL && entry->usageCount <= 0) { //remove entry - hashMap_remove(receiver->subscribers.map, (void*)bndId); - int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes); + hashMap_remove(subscribers.map, (void*)bndId); + int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes); if (rc != 0) { - L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic); + L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", m_scope, m_topic); } free(entry); } http://git-wip-us.apache.org/repos/asf/celix/blob/8658738d/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h index f977917..3398fb1 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h @@ -21,6 +21,7 @@ #include <vector> #include <thread> #include <mutex> +#include <map> #include "pubsub_serializer.h" #include "log_helper.h" #include "celix_bundle_context.h" @@ -33,6 +34,34 @@ typedef struct psa_zmq_subscriber_entry { pubsub_subscriber_t *svc; } psa_nanomsg_subscriber_entry_t; +typedef struct psa_zmq_requested_connection_entry { +public: + psa_zmq_requested_connection_entry(std::string _url, int _id, bool _connected=false): + url{_url}, id{_id}, connected{_connected} { + } + bool isConnected() const { + return connected; + } + + int getId() const { + return id; + } + + void setId(int _id) { + id = _id; + } + void setConnected(bool c) { + connected = c; + } + + const std::string &getUrl() const { + return url; + } +private: + std::string url; + int id; + bool connected; +} psa_nanomsg_requested_connection_entry_t; namespace pubsub { namespace nanomsg { @@ -58,7 +87,10 @@ namespace pubsub { void recvThread_exec(); void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize); void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize); - //private: + void addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); + void removeSubscriber(void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd); + + private: celix_bundle_context_t *ctx{nullptr}; log_helper_t *logHelper{nullptr}; long m_serializerSvcId{0}; @@ -77,7 +109,8 @@ namespace pubsub { struct { std::mutex mutex; - hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t* + std::map<std::string, psa_nanomsg_requested_connection_entry_t> map; + //hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t* } requestedConnections{}; long subscriberTrackerId{0};
