NanoMsg
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b6a03372 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b6a03372 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b6a03372 Branch: refs/heads/nanomsg Commit: b6a0337233c48c48bf48a86274682ba8329f91f3 Parents: 15f268d Author: Erjan Altena <[email protected]> Authored: Thu Nov 29 20:39:40 2018 +0100 Committer: Erjan Altena <[email protected]> Committed: Thu Nov 29 20:39:40 2018 +0100 ---------------------------------------------------------------------- .../src/pubsub_nanomsg_admin.cc | 7 +- .../src/pubsub_nanomsg_admin.h | 2 +- .../src/pubsub_nanomsg_topic_receiver.cc | 34 +++--- .../src/pubsub_nanomsg_topic_receiver.h | 4 +- .../src/pubsub_nanomsg_topic_sender.cc | 119 ++++++++++++------- .../src/pubsub_nanomsg_topic_sender.h | 14 ++- 6 files changed, 106 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc index 9fdf2a3..cc17ebb 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc @@ -250,13 +250,11 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper auto &entry = kvsm->second; { std::lock_guard<std::mutex> senderLock(topicSenders.mutex); - for (auto kv: topicSenders.map) { + for (auto &kv: topicSenders.map) { auto *sender = kv.second; if (sender != nullptr && entry.svcId == sender->getSerializerSvcId()) { - char *key = kv.first; topicSenders.map.erase(kv.first); delete (sender); - free(key); } } } @@ -377,9 +375,7 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, cons std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); auto kv = topicSenders.map.find(key); if (kv != topicSenders.map.end()) { - char *mapKey = kv->first; pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = kv->second; - free(mapKey); //TODO disconnect endpoints to sender. note is this needed for a nanomsg topic sender? delete (sender); } else { @@ -557,7 +553,6 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) { celix_status_t status = CELIX_SUCCESS; - fprintf(out, "\n"); fprintf(out, "Topic Senders:\n"); { http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h index 7c2e9a0..8ec35e5 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h @@ -119,7 +119,7 @@ private: pubsub_serializer_service_t *svc; } psa_nanomsg_serializer_entry_t; ProtectedMap<long, psa_nanomsg_serializer_entry_t> serializers{}; - ProtectedMap<char*, pubsub::nanomsg::pubsub_nanomsg_topic_sender*> topicSenders{}; + ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender*> topicSenders{}; ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{}; ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{}; }; http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc index 30c2af7..83d1caf 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc @@ -72,7 +72,6 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, ctx = _ctx; logHelper = _logHelper; serializer = _serializer; - m_scopeAndTopicFilter = psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic); m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS); if (m_nanoMsgSocket < 0) { @@ -88,20 +87,7 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, auto subscriberFilter = psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic); - int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic.c_str()); - char buf[size + 1]; - snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic.c_str()); - celix_service_tracking_options_t opts{}; - opts.filter.ignoreServiceLanguage = true; - opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; - opts.filter.filter = buf; - opts.callbackHandle = this; - opts.addWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) { - static_cast<topic_receiver*>(handle)->addSubscriber(svc, props, svcOwner); - }; - opts.removeWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) { - static_cast<topic_receiver*>(handle)->removeSubscriber(svc, props, svcOwner); - }; + auto opts = createOptions(); subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); recvThread.running = true; @@ -110,6 +96,24 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, } } +celix_service_tracking_options_t pubsub::nanomsg::topic_receiver::createOptions() { + std::stringstream filter_str; + + filter_str << "(" << PUBSUB_SUBSCRIBER_TOPIC << "=" << m_topic << ")"; + celix_service_tracking_options_t opts{}; + opts.filter.ignoreServiceLanguage = true; + opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; + opts.filter.filter = strdup(filter_str.str().c_str()); // TODO : memory leak ?? + opts.callbackHandle = this; + opts.addWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) { + static_cast<pubsub::nanomsg::topic_receiver*>(handle)->addSubscriber(svc, props, svcOwner); + }; + opts.removeWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) { + static_cast<pubsub::nanomsg::topic_receiver*>(handle)->removeSubscriber(svc, props, svcOwner); + }; + return opts; +} + pubsub::nanomsg::topic_receiver::~topic_receiver() { { http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h index 02c462e..6b0950a 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h @@ -30,7 +30,7 @@ struct psa_nanomsg_subscriber_entry { psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) : - svc{_svc}, usageCount{_usageCount} { + svc{_svc}, usageCount{_usageCount} { } pubsub_subscriber_t *svc{}; int usageCount; @@ -92,6 +92,7 @@ namespace pubsub { void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize); void addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); void removeSubscriber(void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd); + celix_service_tracking_options_t createOptions(); private: celix_bundle_context_t *ctx{nullptr}; @@ -100,7 +101,6 @@ namespace pubsub { pubsub_serializer_service_t *serializer{nullptr}; const std::string m_scope{}; const std::string m_topic{}; - std::string m_scopeAndTopicFilter{}; int m_nanoMsgSocket{0}; http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc index 9a253b2..7521d78 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc @@ -19,7 +19,7 @@ #include <memory.h> #include <iostream> - +#include <sstream> #include <stdlib.h> #include <utils.h> #include <arpa/inet.h> @@ -38,18 +38,46 @@ #define FIRST_SEND_DELAY_IN_SECONDS 2 #define NANOMSG_BIND_MAX_RETRY 10 -#define L_DEBUG(...) \ - logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) -#define L_INFO(...) \ - logHelper_log(logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__) -#define L_WARN(...) \ - logHelper_log(logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__) -#define L_ERROR(...) \ - logHelper_log(logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) +template <typename T> +std::stringstream LOG_STREAM(T first) { + std::stringstream ss; + ss << first; + return ss; +} + +template <typename T, typename... Args> +std::stringstream LOG_STREAM(T first, Args... args) { + std::stringstream ss; + ss << first << LOG_STREAM(args...).str(); + return ss; +} + +template <typename... Args> +void L_DEBUG(log_helper_t *logHelper, Args... args) { + std::stringstream ss = LOG_STREAM(args...); + logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, ss.str().c_str()); +} + +template <typename... Args> +void L_INFO(log_helper_t *logHelper, Args... args) { + auto ss = LOG_STREAM(args...); + logHelper_log(logHelper, OSGI_LOGSERVICE_INFO, ss.str().c_str()); +} + +template <typename... Args> +void L_WARN(log_helper_t *logHelper, Args... args) { + auto ss = LOG_STREAM(args...); + logHelper_log(logHelper, OSGI_LOGSERVICE_WARNING, ss.str().c_str()); +} +template <typename... Args> +void L_ERROR(log_helper_t *logHelper, Args... args) { + auto ss = LOG_STREAM(args...); + logHelper_log((log_helper_pt)logHelper, OSGI_LOGSERVICE_ERROR, ss.str().c_str()); +} static unsigned int rand_range(unsigned int min, unsigned int max); -static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender); +static void delay_first_send_for_late_joiners(log_helper_t* logHelper); pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, log_helper_t *_logHelper, @@ -70,8 +98,8 @@ pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_ scopeAndTopicFilter = psa_nanomsg_setScopeAndTopicFilter(_scope, _topic); //setting up nanomsg socket for nanomsg TopicSender - int socket = nn_socket(AF_SP, NN_BUS); - if (socket == -1) { + int nnSock = nn_socket(AF_SP, NN_BUS); + if (nnSock == -1) { perror("Error for nanomsg_socket"); } @@ -79,23 +107,20 @@ pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_ while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) { /* Randomized part due to same bundle publishing on different topics */ unsigned int port = rand_range(_basePort,_maxPort); - size_t len = (size_t)snprintf(nullptr, 0, "tcp://%s:%u", _bindIp, port) + 1; - char *_url = static_cast<char*>(calloc(len, sizeof(char*))); - snprintf(_url, len, "tcp://%s:%u", _bindIp, port); - - len = (size_t)snprintf(nullptr, 0, "tcp://0.0.0.0:%u", port) + 1; - char *bindUrl = static_cast<char*>(calloc(len, sizeof(char))); - snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port); - rv = nn_bind (socket, bindUrl); + std::stringstream _url; + _url << "tcp://" << _bindIp << ":" << port; + + std::stringstream bindUrl; + bindUrl << "tcp://0.0.0.0:" << port; + + rv = nn_bind (nnSock, bindUrl.str().c_str()); if (rv == -1) { perror("Error for nn_bind"); - free(_url); } else { - this->url = _url; - nanomsg.socket = socket; + this->url = _url.str(); + nanomsg.socket = nnSock; } retry++; - free(bindUrl); } if (!url.empty()) { @@ -132,7 +157,6 @@ pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() { celix_bundleContext_unregisterService(ctx, publisher.svcId); nn_close(nanomsg.socket); - std::lock_guard<std::mutex> lock(boundedServices.mutex); for (auto &it: boundedServices.map) { serializer->destroySerializerMap(serializer->handle, it.second.msgTypes); @@ -170,7 +194,7 @@ void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const ce } else { auto entry = boundedServices.map.emplace(std::piecewise_construct, std::forward_as_tuple(bndId), - std::forward_as_tuple(this, bndId, logHelper)); + std::forward_as_tuple(scope, topic, bndId, nanomsg.socket, logHelper)); int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry.first->second.msgTypes); if (rc == 0) { @@ -179,11 +203,14 @@ void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const ce entry.first->second.service.send = [](void *handle, unsigned int msgTypeId, const void *msg) { return static_cast<pubsub::nanomsg::bounded_service_entry*>(handle)->topicPublicationSend(msgTypeId, msg); }; - entry.first->second.service.sendMultipart = NULL; //not supported TODO remove + entry.first->second.service.sendMultipart = nullptr; //not supported TODO remove service = &entry.first->second.service; } else { boundedServices.map.erase(bndId); - L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", scope, topic); + auto x = LOG_STREAM(12, "hallo"); + logHelper_log(logHelper, OSGI_LOGSERVICE_DEBUG, x.str().c_str()); + log_helper_pt lh = logHelper; + L_ERROR(lh, "Error creating serializer map for NanoMsg TopicSender ", scope, topic); } } @@ -201,7 +228,7 @@ void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const c if (entry->second.getCount == 0) { int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes); if (rc != 0) { - L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n"); + L_ERROR(logHelper, "Error destroying publisher service, serializer not available / cannot get msg serializer map\n"); } boundedServices.map.erase(bndId); } @@ -210,31 +237,30 @@ void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const c int pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int msgTypeId, const void *inMsg) { int status; - pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = parent; - pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes, (void*)(uintptr_t)msgTypeId)); + auto msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes, (void*)(uintptr_t)msgTypeId)); - if (msgSer != NULL) { - delay_first_send_for_late_joiners(sender); + if (msgSer != nullptr) { + delay_first_send_for_late_joiners(logHelper); int major = 0, minor = 0; pubsub_nanmosg_msg_header_t msg_hdr;// = calloc(1, sizeof(*msg_hdr)); msg_hdr.type = msgTypeId; - if (msgSer->msgVersion != NULL) { + if (msgSer->msgVersion != nullptr) { version_getMajor(msgSer->msgVersion, &major); version_getMinor(msgSer->msgVersion, &minor); msg_hdr.major = (unsigned char) major; msg_hdr.minor = (unsigned char) minor; } - void *serializedOutput = NULL; + void *serializedOutput = nullptr; size_t serializedOutputLen = 0; status = msgSer->serialize(msgSer, inMsg, &serializedOutput, &serializedOutputLen); if (status == CELIX_SUCCESS) { nn_iovec data[2]; - nn_msghdr msg; + nn_msghdr msg{}; msg.msg_iov = data; msg.msg_iovlen = 2; msg.msg_iov[0].iov_base = static_cast<void*>(&msg_hdr); @@ -244,31 +270,34 @@ int pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int ms msg.msg_control = nullptr; msg.msg_controllen = 0; errno = 0; - int rc = nn_sendmsg(sender->nanomsg.socket, &msg, 0 ); + int rc = nn_sendmsg(nanoMsgSocket, &msg, 0 ); free(serializedOutput); if (rc < 0) { - L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno)); + L_WARN(logHelper, "[PSA_ZMQ_TS] Error sending zmsg, rc: ", rc, ", error: ", strerror(errno)); } else { - L_INFO("[PSA_ZMQ_TS] Send message with size %d\n", rc); - L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n", msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor); + L_INFO(logHelper, "[PSA_ZMQ_TS] Send message with size ", rc, "\n"); + L_INFO(logHelper, "[PSA_ZMQ_TS] Send message ID ", msg_hdr.type, + " major: ", (int)msg_hdr.major, + " minor: ", (int)msg_hdr.minor,"\n"); } } else { - L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic); + L_WARN(logHelper, "[PSA_ZMQ_TS] Error serialize message of type ", msgSer->msgName, + " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n"); } } else { status = CELIX_SERVICE_EXCEPTION; - L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic); + L_WARN(logHelper, "[PSA_ZMQ_TS] Error cannot serialize message with msg type id ", msgTypeId, + " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n"); } return status; } -static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender) { +static void delay_first_send_for_late_joiners(log_helper_t* logHelper) { static bool firstSend = true; if(firstSend){ - auto logHelper = sender->logHelper; - L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); + L_INFO(logHelper, "PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); sleep(FIRST_SEND_DELAY_IN_SECONDS); firstSend = false; } http://git-wip-us.apache.org/repos/asf/celix/blob/b6a03372/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h index 1883a41..6a3254e 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h @@ -27,24 +27,28 @@ namespace pubsub { namespace nanomsg { - class pubsub_nanomsg_topic_sender; class bounded_service_entry { public: bounded_service_entry( - pubsub::nanomsg::pubsub_nanomsg_topic_sender *_parent, + std::string &_scope, + std::string &_topic, long _bndId, - log_helper_t* _logHelper) : parent{_parent}, bndId{_bndId}, logHelper{_logHelper} { + int _nanoMsgSocket, + log_helper_t* _logHelper) : scope{_scope}, topic{_topic}, bndId{_bndId}, nanoMsgSocket{_nanoMsgSocket}, logHelper{_logHelper} { } - + bounded_service_entry(const bounded_service_entry&) = delete; + bounded_service_entry &operator=(const bounded_service_entry&) = delete; int topicPublicationSend(unsigned int msgTypeId, const void *inMsg); - pubsub::nanomsg::pubsub_nanomsg_topic_sender *parent{}; pubsub_publisher_t service{}; + std::string scope; + std::string topic; long bndId{}; hash_map_t *msgTypes{}; int getCount{1}; + int nanoMsgSocket{}; log_helper_t *logHelper{}; } ;
