Repository: celix Updated Branches: refs/heads/nanomsg 3009e6470 -> 7c141424d
Removed celix-maps from nanomsg admin Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/95633eb9 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/95633eb9 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/95633eb9 Branch: refs/heads/nanomsg Commit: 95633eb954a09232867277d7cd0d71c30d49a012 Parents: 3009e64 Author: Erjan Altena <[email protected]> Authored: Sat Nov 3 19:40:27 2018 +0100 Committer: Erjan Altena <[email protected]> Committed: Sat Nov 3 19:40:27 2018 +0100 ---------------------------------------------------------------------- .../src/pubsub_nanomsg_admin.cc | 148 +++++++------------ .../src/pubsub_nanomsg_admin.h | 34 ++--- 2 files changed, 67 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/95633eb9/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc index 9fe91d9..6c15ec8 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc @@ -32,8 +32,6 @@ #include "pubsub_utils.h" #include "pubsub_nanomsg_admin.h" #include "pubsub_psa_nanomsg_constants.h" -#include "pubsub_nanomsg_topic_sender.h" -#include "pubsub_nanomsg_topic_receiver.h" /* //#define L_DEBUG(...) \ // logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) @@ -50,11 +48,6 @@ #define L_ERROR printf -typedef struct psa_nanomsg_serializer_entry { - const char *serType; - long svcId; - pubsub_serializer_service_t *svc; -} psa_nanomsg_serializer_entry_t; static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip); @@ -103,41 +96,29 @@ pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx, log_hel defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE); qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE); qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE); - - //serializers.map = hashMap_create(nullptr, nullptr, nullptr, nullptr); - - topicSenders.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr); - - topicReceivers.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr); - - discoveredEndpoints.map = hashMap_create(utils_stringHash, nullptr, utils_stringEquals, nullptr); } pubsub_nanomsg_admin::~pubsub_nanomsg_admin() { //note assuming al psa register services and service tracker are removed. { std::lock_guard<std::mutex> lock(topicSenders.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue(&iter)); + for (auto kv : topicSenders.map) { + auto *sender = kv.second; pubsub_nanoMsgTopicSender_destroy(sender); } } { std::lock_guard<std::mutex> lock(topicReceivers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - auto *recv = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter)); - pubsub_nanoMsgTopicReceiver_destroy(recv); + for (auto kv: topicReceivers.map) { + pubsub_nanoMsgTopicReceiver_destroy(kv.second); } } { std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map); - while (hashMapIterator_hasNext(&iter)) { - auto *ep = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter)); + for (auto entry : discoveredEndpoints.map) { + auto *ep = entry.second; celix_properties_destroy(ep); } } @@ -150,12 +131,6 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() { } } - hashMap_destroy(topicSenders.map, true, false); - - hashMap_destroy(topicReceivers.map, true, false); - - hashMap_destroy(discoveredEndpoints.map, false, false); - free(ipAddress); } @@ -276,21 +251,19 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper std::lock_guard<std::mutex> lock(serializers.mutex); psa_nanomsg_serializer_entry_t* entry = nullptr; - auto kv = serializers.map.find(svcId); - if (kv != serializers.map.end()) { - entry = kv->second; + auto kvsm = serializers.map.find(svcId); + if (kvsm != serializers.map.end()) { + entry = kvsm->second; } serializers.map.erase(svcId); if (entry != nullptr) { { std::lock_guard<std::mutex> senderLock(topicSenders.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter); - auto *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapEntry_getValue(senderEntry)); + for (auto kv: topicSenders.map) { + auto *sender = kv.second; if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) { - char *key = static_cast<char *>(hashMapEntry_getKey(senderEntry)); - hashMapIterator_remove(&iter); + char *key = kv.first; + topicSenders.map.erase(kv.first); pubsub_nanoMsgTopicSender_destroy(sender); free(key); } @@ -299,13 +272,11 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper { std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter); - auto *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(senderEntry)); + for (auto kv : topicReceivers.map){ + auto *receiver = kv.second; if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) { - char *key = static_cast<char*>(hashMapEntry_getKey(senderEntry)); - hashMapIterator_remove(&iter); + char *key = kv.first; + topicReceivers.map.erase(key); pubsub_nanoMsgTopicReceiver_destroy(receiver); free(key); } @@ -365,7 +336,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c pubsub_nanomsg_topic_sender_t *sender = nullptr; std::lock_guard<std::mutex> serializerLock(serializers.mutex); std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); - sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMap_get(topicSenders.map, key)); + sender = topicSenders.map.find(key)->second; if (sender == nullptr) { //auto *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get(serializers.map, // (void *) serializerSvcId)); @@ -389,7 +360,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c if (cn != nullptr) { celix_properties_set(newEndpoint, "container_name", cn); } - hashMap_put(topicSenders.map, key, sender); + topicSenders.map[key] = sender; } else { L_ERROR("[PSA NANOMSG] Error creating a TopicSender"); free(key); @@ -417,10 +388,10 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); - hash_map_entry_t *entry = hashMap_getEntry(topicSenders.map, key); - if (entry != nullptr) { - char *mapKey = static_cast<char*>(hashMapEntry_getKey(entry)); - pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t*>(hashMap_remove(topicSenders.map, key)); + auto kv = topicSenders.map.find(key); + if (kv != topicSenders.map.end()) { + char *mapKey = kv->first; + pubsub_nanomsg_topic_sender_t *sender = kv->second; free(mapKey); //TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender? pubsub_nanoMsgTopicSender_destroy(sender); @@ -442,18 +413,21 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const { std::lock_guard<std::mutex> serializerLock(serializers.mutex); std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); - receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMap_get(topicReceivers.map, key)); + auto trkv = topicReceivers.map.find(key); + if (trkv != topicReceivers.map.end()) { + receiver = trkv->second; + } if (receiver == nullptr) { - auto kv = serializers.map.find(serializerSvcId); - if (kv != serializers.map.end()) { - auto serEntry = kv->second; + auto kvs = serializers.map.find(serializerSvcId); + if (kvs != serializers.map.end()) { + auto serEntry = kvs->second; receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc); } else { L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic); } if (receiver != nullptr) { const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE; - const char *serType = kv->second->serType; + const char *serType = kvs->second->serType; newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, nullptr); //if available also set container name @@ -461,7 +435,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const if (cn != nullptr) { celix_properties_set(newEndpoint, "container_name", cn); } - hashMap_put(topicReceivers.map, key, receiver); + topicReceivers.map[key] = receiver; } else { L_ERROR("[PSA NANOMSG] Error creating a TopicReceiver."); free(key); @@ -473,9 +447,8 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const } if (receiver != nullptr && newEndpoint != nullptr) { std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(discoveredEndpoints.map); - while (hashMapIterator_hasNext(&iter)) { - auto *endpoint = static_cast<celix_properties_t*>(hashMapIterator_nextValue(&iter)); + for (auto entry : discoveredEndpoints.map) { + auto *endpoint = entry.second; const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr); if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { connectEndpointToReceiver(receiver, endpoint); @@ -494,12 +467,12 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) { char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); - hash_map_entry_t *entry = hashMap_getEntry(topicReceivers.map, key); + auto entry = topicReceivers.map.find(key); free(key); - if (entry != nullptr) { - char *receiverKey = static_cast<char*>(hashMapEntry_getKey(entry)); - pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapEntry_getValue(entry)); - hashMap_remove(topicReceivers.map, receiverKey); + if (entry != topicReceivers.map.end()) { + char *receiverKey = entry->first; + pubsub_nanomsg_topic_receiver_t *receiver = entry->second; + topicReceivers.map.erase(receiverKey); free(receiverKey); pubsub_nanoMsgTopicReceiver_destroy(receiver); @@ -542,17 +515,17 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { std::lock_guard<std::mutex> threadLock(topicReceivers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter)); + for (auto entry: topicReceivers.map) { + pubsub_nanomsg_topic_receiver_t *receiver = entry.second; connectEndpointToReceiver(receiver, endpoint); } } std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); celix_properties_t *cpy = celix_properties_copy(endpoint); + //TODO : check if properties are never deleted before map. const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr); - hashMap_put(discoveredEndpoints.map, (void*)uuid, cpy); + discoveredEndpoints.map[uuid] = cpy; celix_status_t status = CELIX_SUCCESS; return status; @@ -590,24 +563,17 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(hashMapIterator_nextValue(&iter)); + for (auto entry : topicReceivers.map) { + pubsub_nanomsg_topic_receiver_t *receiver = entry.second; disconnectEndpointFromReceiver(receiver, endpoint); } } - celix_properties_t *found = nullptr; { std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr); - found = static_cast<celix_properties_t*>(hashMap_remove(discoveredEndpoints.map, (void*)uuid)); - } - if (found != nullptr) { - celix_properties_destroy(found); + discoveredEndpoints.map.erase(uuid); } - - celix_status_t status = CELIX_SUCCESS; - return status; + return CELIX_SUCCESS;; } celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribute__((unused)), FILE *out, @@ -619,15 +585,11 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut { std::lock_guard<std::mutex> serializerLock(serializers.mutex); std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - pubsub_nanomsg_topic_sender_t *sender = static_cast<pubsub_nanomsg_topic_sender_t *>(hashMapIterator_nextValue( - &iter)); + for (auto kvts: topicSenders.map) { + pubsub_nanomsg_topic_sender_t *sender = kvts.second; long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender); - auto kv = serializers.map.find(serSvcId); - //psa_nanomsg_serializer_entry_t *serEntry = static_cast<psa_nanomsg_serializer_entry_t *>(hashMap_get( - // serializers.map, (void *) serSvcId)); - const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType; + auto kvs = serializers.map.find(serSvcId); + const char *serType = kvs->second == nullptr ? "!Error!" : kvs->second->serType; const char *scope = pubsub_nanoMsgTopicSender_scope(sender); const char *topic = pubsub_nanoMsgTopicSender_topic(sender); const char *url = pubsub_nanoMsgTopicSender_url(sender); @@ -640,12 +602,10 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut { fprintf(out, "\n"); fprintf(out, "\nTopic Receivers:\n"); - std::lock_guard<std::mutex> serialerLock(serializers.mutex); + std::lock_guard<std::mutex> serializerLock(serializers.mutex); std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t *>(hashMapIterator_nextValue( - &iter)); + for (auto entry : topicReceivers.map) { + pubsub_nanomsg_topic_receiver_t *receiver = entry.second; long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver); auto kv = serializers.map.find(serSvcId); const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType; http://git-wip-us.apache.org/repos/asf/celix/blob/95633eb9/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h index 98314b3..c34a310 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h @@ -29,6 +29,8 @@ #include <pubsub_serializer.h> #include "../../../shell/shell/include/command.h" +#include "pubsub_nanomsg_topic_sender.h" +#include "pubsub_nanomsg_topic_receiver.h" #define PUBSUB_NANOMSG_ADMIN_TYPE "zmq" #define PUBSUB_NANOMSG_URL_KEY "zmq.url" @@ -42,6 +44,13 @@ #define PUBSUB_NANOMSG_DEFAULT_IP "127.0.0.1" //typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t; + +template <typename key, typename value> +struct ProtectedMap { + std::mutex mutex{}; + std::map<key, value> map{}; +}; + class pubsub_nanomsg_admin { public: pubsub_nanomsg_admin(celix_bundle_context_t *ctx, log_helper_t *logHelper); @@ -106,27 +115,10 @@ private: long svcId; pubsub_serializer_service_t *svc; } psa_nanomsg_serializer_entry_t; - struct { - std::mutex mutex; - std::map<long, psa_nanomsg_serializer_entry_t*> map; - //hash_map_t *map; //key = svcId, value = psa_nanomsg_serializer_entry_t* - } serializers{}; - - struct { - std::mutex mutex; - hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t* - } topicSenders{}; - - struct { - std::mutex mutex; - hash_map_t *map; //key = scope:topic key, value = pubsub_nanomsg_topic_sender_t* - } topicReceivers{}; - - struct { - std::mutex mutex; - hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint) - } discoveredEndpoints{}; - + ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{}; + ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{}; + ProtectedMap<char*, pubsub_nanomsg_topic_receiver_t*> topicReceivers{}; + ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{}; }; #ifdef __cplusplus
