Repository: celix Updated Branches: refs/heads/nanomsg 7c141424d -> 707b8e54a
Nanomsg Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/15f268d1 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/15f268d1 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/15f268d1 Branch: refs/heads/nanomsg Commit: 15f268d131370e55dc2dd4c5b6c247baa522fc58 Parents: 7c14142 Author: Erjan Altena <[email protected]> Authored: Wed Nov 28 21:07:00 2018 +0100 Committer: Erjan Altena <[email protected]> Committed: Wed Nov 28 21:37:36 2018 +0100 ---------------------------------------------------------------------- .../src/pubsub_nanomsg_admin.cc | 12 +- .../src/pubsub_nanomsg_common.cc | 15 +- .../src/pubsub_nanomsg_common.h | 2 +- .../src/pubsub_nanomsg_topic_receiver.cc | 5 +- .../src/pubsub_nanomsg_topic_receiver.h | 2 +- .../src/pubsub_nanomsg_topic_sender.cc | 211 +++++++------------ .../src/pubsub_nanomsg_topic_sender.h | 54 +++-- 7 files changed, 138 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc index 42ed632..9fdf2a3 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc @@ -341,7 +341,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const c const char *serType = serEntry->serType; newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, nullptr); - celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, sender->getUrl()); + celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, sender->getUrl().c_str()); //if available also set container name const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr); if (cn != nullptr) { @@ -568,12 +568,12 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut long serSvcId = sender->getSerializerSvcId(); auto kvs = serializers.map.find(serSvcId); const char* serType = ( kvs == serializers.map.end() ? "!Error" : kvs->second.serType); - const char *scope = sender->getScope(); - const char *topic = sender->getTopic(); - const char *url = sender->getUrl(); - fprintf(out, "|- Topic Sender %s/%s\n", scope, topic); + const auto scope = sender->getScope(); + const auto topic = sender->getTopic(); + const auto url = sender->getUrl(); + fprintf(out, "|- Topic Sender %s/%s\n", scope.c_str(), topic.c_str()); fprintf(out, " |- serializer type = %s\n", serType); - fprintf(out, " |- url = %s\n", url); + fprintf(out, " |- url = %s\n", url.c_str()); } } http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc index 3ecd19c..333cb2d 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc @@ -41,16 +41,15 @@ bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_he return check; } -void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic, char *filter) { - for (int i = 0; i < 5; ++i) { // 5 ?? - filter[i] = '\0'; - } +std::string psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic) { + std::string result(""); if (scope.size() >= 2) { //3 ?? - filter[0] = scope[0]; - filter[1] = scope[1]; + result += scope[0]; + result += scope[1]; } if (topic.size() >= 2) { //3 ?? - filter[2] = topic[0]; - filter[3] = topic[1]; + result += topic[0]; + result += topic[1]; } + return result; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h index 276169f..2c03d4c 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h @@ -49,7 +49,7 @@ typedef struct pubsub_nanomsg_msg_header pubsub_nanmosg_msg_header_t; int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId); -void psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic, char *filter); +std::string psa_nanomsg_setScopeAndTopicFilter(const std::string &scope, const std::string &topic); bool psa_nanomsg_checkVersion(version_pt msgVersion, const pubsub_nanmosg_msg_header_t *hdr); http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc index 9f77a4c..30c2af7 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc @@ -72,7 +72,7 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, ctx = _ctx; logHelper = _logHelper; serializer = _serializer; - psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, m_scopeAndTopicFilter); + m_scopeAndTopicFilter = psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic); m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS); if (m_nanoMsgSocket < 0) { @@ -86,8 +86,7 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, std::bad_alloc{}; } - char subscriberFilter[5]; // 5 ?? - psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscriberFilter); + auto subscriberFilter = psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic); int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic.c_str()); char buf[size + 1]; http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h index 2519e4a..02c462e 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h @@ -100,7 +100,7 @@ namespace pubsub { pubsub_serializer_service_t *serializer{nullptr}; const std::string m_scope{}; const std::string m_topic{}; - char m_scopeAndTopicFilter[5]; + std::string m_scopeAndTopicFilter{}; int m_nanoMsgSocket{0}; http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc index 1c75e71..9a253b2 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc @@ -18,6 +18,7 @@ */ #include <memory.h> +#include <iostream> #include <stdlib.h> #include <utils.h> @@ -29,7 +30,6 @@ #include <pubsub_constants.h> -#include <pubsub/publisher.h> #include <pubsub_common.h> #include "pubsub_nanomsg_topic_sender.h" #include "pubsub_psa_nanomsg_constants.h" @@ -48,23 +48,9 @@ logHelper_log(logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) -typedef struct psa_nanomsg_bounded_service_entry { - pubsub::nanomsg::pubsub_nanomsg_topic_sender *parent; - pubsub_publisher_t service; - long bndId; - hash_map_t *msgTypes; - int getCount; -} psa_nanomsg_bounded_service_entry_t; - -//static void* psa_nanomsg_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, -// const celix_properties_t *svcProperties); -//static void psa_nanomsg_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, -// const celix_properties_t *svcProperties); static unsigned int rand_range(unsigned int min, unsigned int max); static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender); -static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *msg); - pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, log_helper_t *_logHelper, const char *_scope, @@ -77,53 +63,44 @@ pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_ ctx{_ctx}, logHelper{_logHelper}, serializerSvcId {_serializerSvcId}, - serializer{_ser}{ + serializer{_ser}, + scope{_scope}, + topic{_topic}{ - psa_nanomsg_setScopeAndTopicFilter(_scope, _topic, scopeAndTopicFilter); + scopeAndTopicFilter = psa_nanomsg_setScopeAndTopicFilter(_scope, _topic); //setting up nanomsg socket for nanomsg TopicSender - { - - int socket = nn_socket(AF_SP, NN_BUS); - if (socket == -1) { - perror("Error for nanomsg_socket"); - } + int socket = nn_socket(AF_SP, NN_BUS); + if (socket == -1) { + perror("Error for nanomsg_socket"); + } - int rv = -1, retry=0; - while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) { - /* Randomized part due to same bundle publishing on different topics */ - unsigned int port = rand_range(_basePort,_maxPort); - size_t len = (size_t)snprintf(NULL, 0, "tcp://%s:%u", _bindIp, port) + 1; - char *_url = static_cast<char*>(calloc(len, sizeof(char*))); - snprintf(_url, len, "tcp://%s:%u", _bindIp, port); - - len = (size_t)snprintf(NULL, 0, "tcp://0.0.0.0:%u", port) + 1; - char *bindUrl = static_cast<char*>(calloc(len, sizeof(char))); - snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port); - rv = nn_bind (socket, bindUrl); - if (rv == -1) { - perror("Error for nn_bind"); - free(_url); - } else { - this->url = _url; - nanomsg.socket = socket; - } - retry++; - free(bindUrl); + int rv = -1, retry=0; + while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) { + /* Randomized part due to same bundle publishing on different topics */ + unsigned int port = rand_range(_basePort,_maxPort); + size_t len = (size_t)snprintf(nullptr, 0, "tcp://%s:%u", _bindIp, port) + 1; + char *_url = static_cast<char*>(calloc(len, sizeof(char*))); + snprintf(_url, len, "tcp://%s:%u", _bindIp, port); + + len = (size_t)snprintf(nullptr, 0, "tcp://0.0.0.0:%u", port) + 1; + char *bindUrl = static_cast<char*>(calloc(len, sizeof(char))); + snprintf(bindUrl, len, "tcp://0.0.0.0:%u", port); + rv = nn_bind (socket, bindUrl); + if (rv == -1) { + perror("Error for nn_bind"); + free(_url); + } else { + this->url = _url; + nanomsg.socket = socket; } + retry++; + free(bindUrl); } - if (url != NULL) { - scope = strndup(_scope, 1024 * 1024); - topic = strndup(_topic, 1024 * 1024); - - celixThreadMutex_create(&boundedServices.mutex, NULL); - celixThreadMutex_create(&nanomsg.mutex, NULL); - boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL); - } + if (!url.empty()) { - //register publisher services using a service factory - if (url != NULL) { + //register publisher services using a service factory publisher.factory.handle = this; publisher.factory.getService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) { return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService( @@ -137,8 +114,8 @@ pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_ }; celix_properties_t *props = celix_properties_create(); - celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic); - celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope); + celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic.c_str()); + celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope.c_str()); celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; opts.factory = &publisher.factory; @@ -156,39 +133,27 @@ pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() { nn_close(nanomsg.socket); - celixThreadMutex_lock(&boundedServices.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(boundedServices.map); - while (hashMapIterator_hasNext(&iter)) { - psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMapIterator_nextValue(&iter)); - if (entry != NULL) { - serializer->destroySerializerMap(serializer->handle, entry->msgTypes); - free(entry); - } + std::lock_guard<std::mutex> lock(boundedServices.mutex); + for (auto &it: boundedServices.map) { + serializer->destroySerializerMap(serializer->handle, it.second.msgTypes); } - hashMap_destroy(boundedServices.map, false, false); - celixThreadMutex_unlock(&boundedServices.mutex); - - celixThreadMutex_destroy(&boundedServices.mutex); - celixThreadMutex_destroy(&nanomsg.mutex); + boundedServices.map.clear(); - free(scope); - free(topic); - free(url); } long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const { return serializerSvcId; } -const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const { +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const { return scope; } -const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const { +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const { return topic; } -const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const { +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const { return url; } @@ -196,64 +161,57 @@ const char* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const { void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) { long bndId = celix_bundle_getId(requestingBundle); - - celixThreadMutex_lock(&boundedServices.mutex); - psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId)); - if (entry != NULL) { - entry->getCount += 1; + void *service{nullptr}; + std::lock_guard<std::mutex> lock(boundedServices.mutex); + auto existingEntry = boundedServices.map.find(bndId); + if (existingEntry != boundedServices.map.end()) { + existingEntry->second.getCount += 1; + service = &existingEntry->second.service; } else { - entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(calloc(1, sizeof(*entry))); - entry->getCount = 1; - entry->parent = this; - entry->bndId = bndId; + auto entry = boundedServices.map.emplace(std::piecewise_construct, + std::forward_as_tuple(bndId), + std::forward_as_tuple(this, bndId, logHelper)); + int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry.first->second.msgTypes); - int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry->msgTypes); if (rc == 0) { - entry->service.handle = entry; - entry->service.localMsgTypeIdForMsgType = psa_nanoMsg_localMsgTypeIdForMsgType; - entry->service.send = psa_nanomsg_topicPublicationSend; - entry->service.sendMultipart = NULL; //not supported TODO remove - hashMap_put(boundedServices.map, (void*)bndId, entry); + entry.first->second.service.handle = &entry.first->second; + entry.first->second.service.localMsgTypeIdForMsgType = psa_nanoMsg_localMsgTypeIdForMsgType; + entry.first->second.service.send = [](void *handle, unsigned int msgTypeId, const void *msg) { + return static_cast<pubsub::nanomsg::bounded_service_entry*>(handle)->topicPublicationSend(msgTypeId, msg); + }; + entry.first->second.service.sendMultipart = NULL; //not supported TODO remove + service = &entry.first->second.service; } else { + boundedServices.map.erase(bndId); L_ERROR("Error creating serializer map for NanoMsg TopicSender %s/%s", scope, topic); } - - - } - celixThreadMutex_unlock(&boundedServices.mutex); - return &entry->service; + return service; } void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const celix_bundle_t *requestingBundle, - const celix_properties_t *svcProperties __attribute__((unused))) { + const celix_properties_t */*svcProperties*/) { long bndId = celix_bundle_getId(requestingBundle); - celixThreadMutex_lock(&boundedServices.mutex); - psa_nanomsg_bounded_service_entry_t *entry = static_cast<psa_nanomsg_bounded_service_entry_t*>(hashMap_get(boundedServices.map, (void*)bndId)); - if (entry != NULL) { - entry->getCount -= 1; - } - if (entry != NULL && entry->getCount == 0) { - //free entry - hashMap_remove(boundedServices.map, (void*)bndId); - int rc = serializer->destroySerializerMap(serializer->handle, entry->msgTypes); - if (rc != 0) { - L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n"); + std::lock_guard<std::mutex> lock(boundedServices.mutex); + auto entry = boundedServices.map.find(bndId); + if (entry != boundedServices.map.end()) { + entry->second.getCount -= 1; + if (entry->second.getCount == 0) { + int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes); + if (rc != 0) { + L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n"); + } + boundedServices.map.erase(bndId); } - - free(entry); } - celixThreadMutex_unlock(&boundedServices.mutex); } -static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg) { - int status = CELIX_SUCCESS; - psa_nanomsg_bounded_service_entry_t *bound = static_cast<psa_nanomsg_bounded_service_entry_t*>(handle); - pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = bound->parent; - - pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId)); +int pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int msgTypeId, const void *inMsg) { + int status; + pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender = parent; + pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes, (void*)(uintptr_t)msgTypeId)); if (msgSer != NULL) { delay_first_send_for_late_joiners(sender); @@ -285,37 +243,32 @@ static int psa_nanomsg_topicPublicationSend(void *handle, unsigned int msgTypeId msg.msg_iov[1].iov_len = serializedOutputLen; msg.msg_control = nullptr; msg.msg_controllen = 0; - //zmsg_t *msg = zmsg_new(); - //TODO revert to use zmq_msg_init_data (or something like that) for zero copy for the payload - //TODO remove socket mutex .. not needed (initialized during creation) - //zmsg_addstr(msg, sender->scopeAndTopicFilter); - //zmsg_addmem(msg, &msg_hdr, sizeof(msg_hdr)); - //zmsg_addmem(msg, serializedOutput, ); errno = 0; int rc = nn_sendmsg(sender->nanomsg.socket, &msg, 0 ); free(serializedOutput); if (rc < 0) { - //TODO L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno)); + L_WARN("[PSA_ZMQ_TS] Error sending zmsg, rc is %i. %s", rc, strerror(errno)); } else { - //TODO L_INFO("[PSA_ZMQ_TS] Send message with size %d\n", rc); - //TODO L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n", msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor); + L_INFO("[PSA_ZMQ_TS] Send message with size %d\n", rc); + L_INFO("[PSA_ZMQ_TS] Send message ID %d, major %d, minor %d\n", msg_hdr.type, (int)msg_hdr.major, (int)msg_hdr.minor); } } else { - //TODO L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic); + L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgSer->msgName, sender->scope, sender->topic); } } else { status = CELIX_SERVICE_EXCEPTION; - //TODO L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic); + L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic); } return status; } -static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender */*sender*/) { +static void delay_first_send_for_late_joiners(pubsub::nanomsg::pubsub_nanomsg_topic_sender *sender) { static bool firstSend = true; if(firstSend){ - //TODO L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); + auto logHelper = sender->logHelper; + L_INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); sleep(FIRST_SEND_DELAY_IN_SECONDS); firstSend = false; } http://git-wip-us.apache.org/repos/asf/celix/blob/15f268d1/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h index 90ab6ce..1883a41 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h @@ -18,13 +18,37 @@ */ #ifndef CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H #define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H - +#include <mutex> +#include <map> #include "celix_bundle_context.h" #include <log_helper.h> #include <pubsub_serializer.h> +#include <pubsub/publisher.h> namespace pubsub { namespace nanomsg { + class pubsub_nanomsg_topic_sender; + + class bounded_service_entry { + public: + bounded_service_entry( + pubsub::nanomsg::pubsub_nanomsg_topic_sender *_parent, + long _bndId, + log_helper_t* _logHelper) : parent{_parent}, bndId{_bndId}, logHelper{_logHelper} { + + } + + int topicPublicationSend(unsigned int msgTypeId, const void *inMsg); + + pubsub::nanomsg::pubsub_nanomsg_topic_sender *parent{}; + pubsub_publisher_t service{}; + long bndId{}; + hash_map_t *msgTypes{}; + int getCount{1}; + log_helper_t *logHelper{}; + } ; + + class pubsub_nanomsg_topic_sender { public: pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, log_helper_t *_logHelper, const char *_scope, @@ -38,31 +62,30 @@ namespace pubsub { const pubsub_nanomsg_topic_sender &operator=(const pubsub_nanomsg_topic_sender &) = delete; long getSerializerSvcId() const ; - const char *getScope() const ; - const char *getTopic() const ; - const char *getUrl() const; + const std::string &getScope() const ; + const std::string &getTopic() const ; + const std::string &getUrl() const; void* getPublisherService(const celix_bundle_t *requestingBundle, - const celix_properties_t *svcProperties __attribute__((unused))); + const celix_properties_t *svcProperties __attribute__((unused))); void ungetPublisherService(const celix_bundle_t *requestingBundle, - const celix_properties_t *svcProperties __attribute__((unused))); + const celix_properties_t *svcProperties __attribute__((unused))); int topicPublicationSend(unsigned int msgTypeId, const void *inMsg); void delay_first_send_for_late_joiners() ; - - //private: + //private: celix_bundle_context_t *ctx; log_helper_t *logHelper; long serializerSvcId; pubsub_serializer_service_t *serializer; - char *scope{}; - char *topic{}; - char scopeAndTopicFilter[5]; - char *url{}; + std::string scope{}; + std::string topic{}; + std::string scopeAndTopicFilter{}; + std::string url{}; struct { - celix_thread_mutex_t mutex; + std::mutex mutex; int socket; } nanomsg{}; @@ -72,8 +95,9 @@ namespace pubsub { } publisher{}; struct { - celix_thread_mutex_t mutex{}; - hash_map_t *map{}; //key = bndId, value = psa_nanomsg_bounded_service_entry_t + std::mutex mutex{}; + std::map<long, bounded_service_entry> map{}; + //hash_map_t *map{}; //key = bndId, value = psa_nanomsg_bounded_service_entry_t } boundedServices{}; }; }
