nanomsg Topic receiver to class
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/0abbf432 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/0abbf432 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/0abbf432 Branch: refs/heads/nanomsg Commit: 0abbf4323838b5823b6f275ab418438727dfe289 Parents: c19a5bd Author: Erjan Altena <[email protected]> Authored: Wed Nov 21 20:34:12 2018 +0100 Committer: Erjan Altena <[email protected]> Committed: Wed Nov 21 21:10:41 2018 +0100 ---------------------------------------------------------------------- .../src/pubsub_nanomsg_common.h | 4 +- .../src/pubsub_nanomsg_topic_receiver.cc | 90 ++++++-------------- .../src/pubsub_nanomsg_topic_receiver.h | 42 +++++---- 3 files changed, 47 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h index 28293a8..3d5d48d 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h @@ -37,14 +37,14 @@ */ -struct pubsub_zmq_msg_header { +struct pubsub_nanomsg_msg_header { //header unsigned int type; unsigned char major; unsigned char minor; }; -typedef struct pubsub_zmq_msg_header pubsub_nanmosg_msg_header_t; +typedef struct pubsub_nanomsg_msg_header pubsub_nanmosg_msg_header_t; int psa_nanoMsg_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId); http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc index 889d79d..88886c6 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc @@ -17,6 +17,7 @@ *under the License. */ +#include <iostream> #include <mutex> #include <memory.h> #include <vector> @@ -61,34 +62,6 @@ #define L_WARN printf #define L_ERROR printf -struct pubsub_nanomsg_topic_receiver { - celix_bundle_context_t *ctx; - log_helper_t *logHelper; - long serializerSvcId; - pubsub_serializer_service_t *serializer; - char *scope; - char *topic; - char scopeAndTopicFilter[5]; - - int nanoMsgSocket; - - struct { - celix_thread_t thread; - std::mutex mutex; - bool running; - } recvThread; - - struct { - std::mutex mutex; - hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t* - } requestedConnections; - - long subscriberTrackerId; - struct { - std::mutex mutex; - hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t - } subscribers; -}; typedef struct psa_zmq_requested_connection_entry { char *url; @@ -96,23 +69,14 @@ typedef struct psa_zmq_requested_connection_entry { int id; } psa_nanomsg_requested_connection_entry_t; -typedef struct psa_zmq_subscriber_entry { - int usageCount; - hash_map_t *msgTypes; //map from serializer svc - pubsub_subscriber_t *svc; -} psa_nanomsg_subscriber_entry_t; -static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner); +static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, + const celix_bundle_t *owner); static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner); -static void* psa_nanomsg_recvThread(void *data); -//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx, -// log_helper_t *logHelper, const char *scope, -// const char *topic, long serializerSvcId, -// pubsub_serializer_service_t *serializer) { pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, log_helper_t *_logHelper, const char *_scope, @@ -149,6 +113,7 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, m_topic = strndup(m_topic, 1024 * 1024); subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); + std::cout << "#### Creating subscirbers.map!! " << subscribers.map << "\n"; requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic); @@ -159,15 +124,13 @@ pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; opts.filter.filter = buf; opts.callbackHandle = this; - opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber; + opts.addWithOwner = pubsub_nanomsgTopicReceiver_addSubscriber; opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber; subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); recvThread.running = true; - celixThread_create(&recvThread.thread, NULL, psa_nanomsg_recvThread, this); - std::stringstream namestream; - namestream << "NANOMSG TR " << m_scope << "/" << m_topic; - celixThread_setName(&recvThread.thread, namestream.str().c_str()); + + recvThread.thread = std::thread([this]() {this->recvThread_exec();}); } } @@ -177,7 +140,7 @@ pubsub::nanomsg::topic_receiver::~topic_receiver() { std::lock_guard<std::mutex> _lock(recvThread.mutex); recvThread.running = false; } - celixThread_join(recvThread.thread, NULL); + recvThread.thread.join(); celix_bundleContext_stopTracker(ctx, subscriberTrackerId); @@ -285,12 +248,13 @@ void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) { } } -static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { - pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle); +static void pubsub_nanomsgTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, + const celix_bundle_t *bnd) { + auto *receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle); long bndId = celix_bundle_getId(bnd); const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); - if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) { + if (strncmp(subScope, receiver->m_scope, strlen(receiver->m_scope)) != 0) { //not the same scope. ignore return; } @@ -309,7 +273,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const if (rc == 0) { hashMap_put(receiver->subscribers.map, (void*)bndId, entry); } else { - L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic); + L_ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic); free(entry); } } @@ -317,7 +281,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd) { - pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle); + auto receiver = static_cast<pubsub::nanomsg::topic_receiver*>(handle); long bndId = celix_bundle_getId(bnd); @@ -331,13 +295,13 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*s hashMap_remove(receiver->subscribers.map, (void*)bndId); int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes); if (rc != 0) { - L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic); + L_ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->m_scope, receiver->m_topic); } free(entry); } } -static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) { +void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) { pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type))); pubsub_subscriber_t *svc = entry->svc; @@ -353,7 +317,7 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *r msgSer->freeMsg(msgSer->handle, deserializedMsg); } } else { - L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic); + //L_WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, scope, topic); } } } else { @@ -361,13 +325,13 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *r } } -static inline void processMsg(pubsub_nanomsg_topic_receiver *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) { - std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); +void pubsub::nanomsg::topic_receiver::processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) { + std::lock_guard<std::mutex> _lock(subscribers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(subscribers.map); while (hashMapIterator_hasNext(&iter)) { psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter)); if (entry != NULL) { - processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize); + processMsgForSubscriberEntry(entry, hdr, payload, payloadSize); } } } @@ -377,12 +341,11 @@ struct Message { char payload[]; }; -static void* psa_nanomsg_recvThread(void *data) { - pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(data); +void pubsub::nanomsg::topic_receiver::recvThread_exec() { bool running{}; { - std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex); - running = receiver->recvThread.running; + std::lock_guard<std::mutex> _lock(recvThread.mutex); + running = recvThread.running; } while (running) { Message *msg = nullptr; @@ -400,9 +363,9 @@ static void* psa_nanomsg_recvThread(void *data) { msgHdr.msg_controllen = 0; errno = 0; - int recvBytes = nn_recvmsg(receiver->nanoMsgSocket, &msgHdr, 0); + int recvBytes = nn_recvmsg(m_nanoMsgSocket, &msgHdr, 0); if (msg && static_cast<unsigned long>(recvBytes) >= sizeof(pubsub_nanmosg_msg_header_t)) { - processMsg(receiver, &msg->header, msg->payload, recvBytes-sizeof(msg->header)); + processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header)); nn_freemsg(msg); } else if (recvBytes >= 0) { L_ERROR("[PSA_ZMQ_TR] Error receiving nanmosg msg, size (%d) smaller than header\n", recvBytes); @@ -415,5 +378,4 @@ static void* psa_nanomsg_recvThread(void *data) { } } // while - return NULL; } http://git-wip-us.apache.org/repos/asf/celix/blob/0abbf432/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h index 6cd216b..f977917 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h @@ -16,15 +16,24 @@ *specific language governing permissions and limitations *under the License. */ -#ifndef CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H -#define CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H +#pragma once #include <string> #include <vector> +#include <thread> +#include <mutex> #include "pubsub_serializer.h" #include "log_helper.h" #include "celix_bundle_context.h" +#include "pubsub_nanomsg_common.h" +#include "pubsub/subscriber.h" + +typedef struct psa_zmq_subscriber_entry { + int usageCount; + hash_map_t *msgTypes; //map from serializer svc + pubsub_subscriber_t *svc; +} psa_nanomsg_subscriber_entry_t; + -//typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t; namespace pubsub { namespace nanomsg { class topic_receiver { @@ -46,7 +55,10 @@ namespace pubsub { void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls); void connectTo(const char *url); void disconnectFrom(const char *url); - private: + void recvThread_exec(); + void processMsg(const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize); + void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize); + //private: celix_bundle_context_t *ctx{nullptr}; log_helper_t *logHelper{nullptr}; long m_serializerSvcId{0}; @@ -58,7 +70,7 @@ namespace pubsub { int m_nanoMsgSocket{0}; struct { - celix_thread_t thread; + std::thread thread; std::mutex mutex; bool running; } recvThread{}; @@ -74,22 +86,6 @@ namespace pubsub { hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t } subscribers{}; }; - }} -//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx, -// log_helper_t *logHelper, const char *scope, -// const char *topic, long serializerSvcId, -// pubsub_serializer_service_t *serializer); -//void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver); - -//const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver); -//const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver); - -//long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver); -//void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver, -// std::vector<std::string> &connectedUrls, -// std::vector<std::string> &unconnectedUrls); - -//void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url); -//void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url); + } +} -#endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
