Nanomsg
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/cdefb0d6 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/cdefb0d6 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/cdefb0d6 Branch: refs/heads/nanomsg Commit: cdefb0d665b27a41f360599598ff489b322b4405 Parents: 883abee Author: Erjan Altena <[email protected]> Authored: Mon Nov 26 20:23:11 2018 +0100 Committer: Erjan Altena <[email protected]> Committed: Mon Nov 26 20:23:11 2018 +0100 ---------------------------------------------------------------------- .../log_service/loghelper_include/log_helper.h | 2 +- bundles/log_service/src/log_helper.c | 2 +- .../src/pubsub_nanomsg_admin.cc | 62 ++++++++------------ .../src/pubsub_nanomsg_admin.h | 10 ++-- .../src/pubsub_nanomsg_topic_receiver.cc | 10 ++-- .../src/pubsub_nanomsg_topic_sender.cc | 11 +--- 6 files changed, 42 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/log_service/loghelper_include/log_helper.h ---------------------------------------------------------------------- diff --git a/bundles/log_service/loghelper_include/log_helper.h b/bundles/log_service/loghelper_include/log_helper.h index 28e6877..af058eb 100644 --- a/bundles/log_service/loghelper_include/log_helper.h +++ b/bundles/log_service/loghelper_include/log_helper.h @@ -33,7 +33,7 @@ celix_status_t logHelper_create(bundle_context_pt context, log_helper_pt* log_he celix_status_t logHelper_start(log_helper_pt loghelper); celix_status_t logHelper_stop(log_helper_pt loghelper); celix_status_t logHelper_destroy(log_helper_pt* loghelper); -celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... ); +celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, const char* message, ... ); #ifdef __cplusplus } #endif http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/log_service/src/log_helper.c ---------------------------------------------------------------------- diff --git a/bundles/log_service/src/log_helper.c b/bundles/log_service/src/log_helper.c index 6570357..e9939ed 100644 --- a/bundles/log_service/src/log_helper.c +++ b/bundles/log_service/src/log_helper.c @@ -156,7 +156,7 @@ celix_status_t logHelper_destroy(log_helper_pt* loghelper) { -celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... ) +celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, const char* message, ... ) { celix_status_t status = CELIX_SUCCESS; va_list listPointer; http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc index 030441d..cf516ee 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc @@ -32,20 +32,15 @@ #include "pubsub_utils.h" #include "pubsub_nanomsg_admin.h" #include "pubsub_psa_nanomsg_constants.h" -/* -//#define L_DEBUG(...) \ -// logHelper_log(psa->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) -//#define L_INFO(...) \ -// logHelper_log(psa->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__) -//#define L_WARN(...) \ -// logHelper_log(psa->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__) -//#define L_ERROR(...) \ -// logHelper_log(psa->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) -*/ -#define L_DEBUG printf -#define L_INFO printf -#define L_WARN printf -#define L_ERROR printf + +#define L_DEBUG(...) \ + logHelper_log(log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) +#define L_INFO(...) \ + logHelper_log(log, OSGI_LOGSERVICE_INFO, __VA_ARGS__) +#define L_WARN(...) \ + logHelper_log(log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__) +#define L_ERROR(...) \ + logHelper_log(log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) @@ -125,10 +120,7 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() { { std::lock_guard<std::mutex> lock(serializers.mutex); - // todo: do not use pointer but type in map - for(auto kv: serializers.map) { - free(kv.second); - } + serializers.map.clear(); } free(ipAddress); @@ -229,11 +221,14 @@ void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t std::lock_guard<std::mutex> lock(serializers.mutex); auto it = serializers.map.find(svcId); if (it == serializers.map.end()) { - auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t))); - entry->serType = serType; - entry->svcId = svcId; - entry->svc = static_cast<pubsub_serializer_service_t*>(svc); - serializers.map[svcId] = entry; + serializers.map.emplace(std::piecewise_construct, + std::forward_as_tuple(svcId), + std::forward_as_tuple(serType, svcId, static_cast<pubsub_serializer_service_t*>(svc))); +// auto entry = static_cast<psa_nanomsg_serializer_entry_t*>(calloc(1, sizeof(psa_nanomsg_serializer_entry_t))); +// entry->serType = serType; +// entry->svcId = svcId; +// entry->svc = static_cast<pubsub_serializer_service_t*>(svc); +// serializers.map[svcId] = entry; } } } @@ -250,18 +245,14 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper std::lock_guard<std::mutex> lock(serializers.mutex); - psa_nanomsg_serializer_entry_t* entry = nullptr; auto kvsm = serializers.map.find(svcId); if (kvsm != serializers.map.end()) { - entry = kvsm->second; - } - serializers.map.erase(svcId); - if (entry != nullptr) { + auto &entry = kvsm->second; { std::lock_guard<std::mutex> senderLock(topicSenders.mutex); for (auto kv: topicSenders.map) { auto *sender = kv.second; - if (sender != nullptr && entry->svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) { + if (sender != nullptr && entry.svcId == pubsub_nanoMsgTopicSender_serializerSvcId(sender)) { char *key = kv.first; topicSenders.map.erase(kv.first); pubsub_nanoMsgTopicSender_destroy(sender); @@ -274,7 +265,7 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex); for (auto kv : topicReceivers.map){ auto *receiver = kv.second; - if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) { + if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) { auto key = kv.first; topicReceivers.map.erase(key); delete receiver; @@ -282,7 +273,6 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper } } - free(entry); } } @@ -340,7 +330,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c psa_nanomsg_serializer_entry_t *serEntry = nullptr; auto kv = serializers.map.find(serializerSvcId); if (kv != serializers.map.end()) { - serEntry = kv->second; + serEntry = &kv->second; } if (serEntry != nullptr) { sender = pubsub_nanoMsgTopicSender_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc, ipAddress, @@ -418,13 +408,13 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope auto kvs = serializers.map.find(serializerSvcId); if (kvs != serializers.map.end()) { auto serEntry = kvs->second; - receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc); + receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry.svc); } else { L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope.c_str(), topic.c_str()); } if (receiver != nullptr) { const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE; - const char *serType = kvs->second->serType; + const char *serType = kvs->second.serType; newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, nullptr); //if available also set container name @@ -577,7 +567,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut pubsub_nanomsg_topic_sender_t *sender = kvts.second; long serSvcId = pubsub_nanoMsgTopicSender_serializerSvcId(sender); auto kvs = serializers.map.find(serSvcId); - const char *serType = kvs->second == nullptr ? "!Error!" : kvs->second->serType; + const char* serType = ( kvs == serializers.map.end() ? "!Error" : kvs->second.serType); const char *scope = pubsub_nanoMsgTopicSender_scope(sender); const char *topic = pubsub_nanoMsgTopicSender_topic(sender); const char *url = pubsub_nanoMsgTopicSender_url(sender); @@ -596,7 +586,7 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut pubsub::nanomsg::topic_receiver *receiver = entry.second; long serSvcId = receiver->serializerSvcId(); auto kv = serializers.map.find(serSvcId); - const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType; + const char *serType = (kv == serializers.map.end() ? "!Error!" : kv->second.serType); auto scope = receiver->scope(); auto topic = receiver->topic(); http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h index b33a3c0..689ae20 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h @@ -43,8 +43,6 @@ #define PUBSUB_NANOMSG_DEFAULT_IP "127.0.0.1" -//typedef struct pubsub_nanomsg_admin pubsub_nanomsg_admin_t; - template <typename key, typename value> struct ProtectedMap { std::mutex mutex{}; @@ -111,11 +109,16 @@ private: bool verbose{}; typedef struct psa_nanomsg_serializer_entry { + psa_nanomsg_serializer_entry(const char*_serType, long _svcId, pubsub_serializer_service_t *_svc) : + serType{_serType}, svcId{_svcId}, svc{_svc} { + + } + const char *serType; long svcId; pubsub_serializer_service_t *svc; } psa_nanomsg_serializer_entry_t; - ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{}; + ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{}; ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{}; ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{}; ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{}; @@ -131,4 +134,3 @@ extern "C" { #endif //CELIX_PUBSUB_ZMQ_ADMIN_H - http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc index db8469b..9f77a4c 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc @@ -158,7 +158,7 @@ void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> & void pubsub::nanomsg::topic_receiver::connectTo(const char *url) { - L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope.c_str(), m_topic.c_str(), url); + L_DEBUG("[PSA_NANOMSG] TopicReceiver %s/%s connecting to nanomsg url %s", m_scope.c_str(), m_topic.c_str(), url); std::lock_guard<std::mutex> _lock(requestedConnections.mutex); auto entry = requestedConnections.map.find(url); @@ -181,7 +181,7 @@ void pubsub::nanomsg::topic_receiver::connectTo(const char *url) { } void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) { - L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope.c_str(), m_topic.c_str(), url); + L_DEBUG("[PSA NANOMSG] TopicReceiver %s/%s disconnect from nanomsg url %s", m_scope.c_str(), m_topic.c_str(), url); std::lock_guard<std::mutex> _lock(requestedConnections.mutex); auto entry = requestedConnections.map.find(url); @@ -316,13 +316,13 @@ void pubsub::nanomsg::topic_receiver::recvThread_exec() { processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header)); nn_freemsg(msg); } else if (recvBytes >= 0) { - L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes); + L_ERROR("[PSA_NANOMSG_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes); } else if (errno == EAGAIN || errno == ETIMEDOUT) { //nop } else if (errno == EINTR) { - L_DEBUG("[PSA_ZMQ_TR] zmsg_recv interrupted"); + L_DEBUG("[PSA_NANOMSG_TR] nn_recvmsg interrupted"); } else { - L_WARN("[PSA_ZMQ_TR] Error receiving zmq message: errno %d: %s\n", errno, strerror(errno)); + L_WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno %d: %s\n", errno, strerror(errno)); } } // while http://git-wip-us.apache.org/repos/asf/celix/blob/cdefb0d6/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc index ff0d4f7..d5ed28f 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc @@ -37,9 +37,9 @@ #include "pubsub_psa_nanomsg_constants.h" #include "pubsub_nanomsg_common.h" -#define FIRST_SEND_DELAY_IN_SECONDS 2 +#define FIRST_SEND_DELAY_IN_SECONDS 2 #define NANOMSG_BIND_MAX_RETRY 10 -/* + #define L_DEBUG(...) \ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) #define L_INFO(...) \ @@ -48,11 +48,6 @@ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__) #define L_ERROR(...) \ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) -*/ -#define L_DEBUG printf -#define L_INFO printf -#define L_WARN printf -#define L_ERROR printf struct pubsub_nanomsg_topic_sender { celix_bundle_context_t *ctx; @@ -349,7 +344,7 @@ static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId return status; } -static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t */*sender*/) { +static void delay_first_send_for_late_joiners(pubsub_nanomsg_topic_sender_t *sender) { static bool firstSend = true;
