Nanomsg
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/707b8e54 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/707b8e54 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/707b8e54 Branch: refs/heads/nanomsg Commit: 707b8e54aeda61c5eba4b8a3fa65059ec0f5821a Parents: b6a0337 Author: Erjan Altena <[email protected]> Authored: Sat Dec 1 09:26:44 2018 +0100 Committer: Erjan Altena <[email protected]> Committed: Sat Dec 1 09:26:44 2018 +0100 ---------------------------------------------------------------------- .../src/pubsub_nanomsg_admin.cc | 66 ++++++++------------ .../src/pubsub_nanomsg_admin.h | 4 +- 2 files changed, 29 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/707b8e54/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc index cc17ebb..ba47723 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc @@ -28,6 +28,7 @@ #include <netdb.h> #include <ifaddrs.h> #include <pubsub_endpoint.h> +#include <algorithm> #include "pubsub_utils.h" #include "pubsub_nanomsg_admin.h" @@ -96,11 +97,11 @@ pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel pubsub_nanomsg_admin::~pubsub_nanomsg_admin() { //note assuming al psa register services and service tracker are removed. { - std::lock_guard<std::mutex> lock(topicSenders.mutex); - for (auto kv : topicSenders.map) { - auto *sender = kv.second; - delete (sender); - } +// std::lock_guard<std::mutex> lock(topicSenders.mutex); +// for (auto &kv : topicSenders.map) { +// auto &sender = kv.second; +// delete (sender); +// } } { @@ -224,11 +225,6 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t serializers.map.emplace(std::piecewise_construct, std::forward_as_tuple(svcId), std::forward_as_tuple(serType, svcId, static_cast<pubsub_serializer_service_t*>(svc))); -// auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t))); -// entry->serType = serType; -// entry->svcId = svcId; -// entry->svc = static_cast<pubsub_serializer_service_t*>(svc); -// serializers.map[svcId] = entry; } } } @@ -250,11 +246,12 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper auto &entry = kvsm->second; { std::lock_guard<std::mutex> senderLock(topicSenders.mutex); - for (auto &kv: topicSenders.map) { - auto *sender = kv.second; - if (sender != nullptr && entry.svcId == sender->getSerializerSvcId()) { - topicSenders.map.erase(kv.first); - delete (sender); + for(auto it = topicSenders.map.begin(); it != topicSenders.map.end(); /*nothing*/) { + auto &sender = it->second; + if (entry.svcId == sender.getSerializerSvcId()) { + it = topicSenders.map.erase(it); + } else { + ++it; } } } @@ -320,32 +317,30 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c celix_properties_t *newEndpoint = nullptr; char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = nullptr; std::lock_guard<std::mutex> serializerLock(serializers.mutex); std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); - sender = topicSenders.map.find(key)->second; - if (sender == nullptr) { + auto sender = topicSenders.map.find(key); + if (sender == topicSenders.map.end()) { psa_nanomsg_serializer_entry_t *serEntry = nullptr; auto kv = serializers.map.find(serializerSvcId); if (kv != serializers.map.end()) { serEntry = &kv->second; } if (serEntry != nullptr) { - sender = new pubsub::nanomsg::pubsub_nanomsg_topic_sender(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress, - basePort, maxPort); - } - if (sender != nullptr) { + auto e = topicSenders.map.emplace(std::piecewise_construct, + std::forward_as_tuple(key), + std::forward_as_tuple(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress, + basePort, maxPort)); const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE; const char *serType = serEntry->serType; newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, nullptr); - celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, sender->getUrl().c_str()); + celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, e.first->second.getUrl().c_str()); //if available also set container name const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr); if (cn != nullptr) { celix_properties_set(newEndpoint, "container_name", cn); } - topicSenders.map[key] = sender; } else { L_ERROR("[PSA NANOMSG] Error creating a TopicSender"); free(key); @@ -354,9 +349,6 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c free(key); L_ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic %s/%s!", scope, topic); } - if (sender != nullptr && newEndpoint != nullptr) { - //TODO connect endpoints to sender, NOTE is this needed for a nanomsg topic sender? - } if (newEndpoint != nullptr && outPublisherEndpoint != nullptr) { *outPublisherEndpoint = newEndpoint; @@ -373,12 +365,8 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); - auto kv = topicSenders.map.find(key); - if (kv != topicSenders.map.end()) { - pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kv->second; - //TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender? - delete (sender); - } else { + ; + if (topicSenders.map.erase(key) == 0) { L_ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists", scope, topic); } free(key); @@ -558,14 +546,14 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut { std::lock_guard<std::mutex> serializerLock(serializers.mutex); std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); - for (auto kvts: topicSenders.map) { - pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kvts.second; - long serSvcId = sender->getSerializerSvcId(); + for (auto &senderEntry: topicSenders.map) { + auto &sender = senderEntry.second; + long serSvcId = sender.getSerializerSvcId(); auto kvs = serializers.map.find(serSvcId); const char* serType = ( kvs == serializers.map.end() ? "!Error" : kvs->second.serType); - const auto scope = sender->getScope(); - const auto topic = sender->getTopic(); - const auto url = sender->getUrl(); + const auto scope = sender.getScope(); + const auto topic = sender.getTopic(); + const auto url = sender.getUrl(); fprintf(out, "|- Topic Sender %s/%s\n", scope.c_str(), topic.c_str()); fprintf(out, " |- serializer type = %s\n", serType); fprintf(out, " |- url = %s\n", url.c_str()); http://git-wip-us.apache.org/repos/asf/celix/blob/707b8e54/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h index 8ec35e5..8ed5ddd 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h @@ -119,9 +119,9 @@ private: pubsub_serializer_service_t *svc; } psa_nanomsg_serializer_entry_t; ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{}; - ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender*> topicSenders{}; + ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender> topicSenders{}; ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{}; - ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{}; + ProtectedMap<const std::string, celix_properties_t *> discoveredEndpoints{}; }; #ifdef __cplusplus
