nanomsg topicreceiver changed to class
Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c19a5bd8 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c19a5bd8 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c19a5bd8 Branch: refs/heads/nanomsg Commit: c19a5bd85b5c514794c6b38a1080b5382b8ca1ad Parents: 95633eb Author: Erjan Altena <[email protected]> Authored: Mon Nov 19 20:10:50 2018 +0100 Committer: Erjan Altena <[email protected]> Committed: Mon Nov 19 20:10:50 2018 +0100 ---------------------------------------------------------------------- .../src/pubsub_nanomsg_admin.cc | 44 ++--- .../src/pubsub_nanomsg_admin.h | 6 +- .../src/pubsub_nanomsg_topic_receiver.cc | 162 +++++++++---------- .../src/pubsub_nanomsg_topic_receiver.h | 77 +++++++-- 4 files changed, 169 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc index 6c15ec8..3e788ae 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc @@ -111,7 +111,7 @@ pubsub_nanomsg_admin::~pubsub_nanomsg_admin() { { std::lock_guard<std::mutex> lock(topicReceivers.mutex); for (auto kv: topicReceivers.map) { - pubsub_nanoMsgTopicReceiver_destroy(kv.second); + delete kv.second; } } @@ -274,10 +274,10 @@ void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_proper std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex); for (auto kv : topicReceivers.map){ auto *receiver = kv.second; - if (receiver != nullptr && entry->svcId == pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver)) { + if (receiver != nullptr && entry->svcId == receiver->serializerSvcId()) { char *key = kv.first; topicReceivers.map.erase(key); - pubsub_nanoMsgTopicReceiver_destroy(receiver); + delete receiver; free(key); } } @@ -409,7 +409,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const celix_properties_t *newEndpoint = nullptr; char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - pubsub_nanomsg_topic_receiver_t * receiver = nullptr; + pubsub::nanomsg::topic_receiver * receiver = nullptr; { std::lock_guard<std::mutex> serializerLock(serializers.mutex); std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); @@ -421,7 +421,7 @@ celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const char *scope, const auto kvs = serializers.map.find(serializerSvcId); if (kvs != serializers.map.end()) { auto serEntry = kvs->second; - receiver = pubsub_nanoMsgTopicReceiver_create(ctx, log, scope, topic, serializerSvcId, serEntry->svc); + receiver = new pubsub::nanomsg::topic_receiver(ctx, log, scope, topic, serializerSvcId, serEntry->svc); } else { L_ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender %s/%s", scope, topic); } @@ -471,24 +471,24 @@ celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, co free(key); if (entry != topicReceivers.map.end()) { char *receiverKey = entry->first; - pubsub_nanomsg_topic_receiver_t *receiver = entry->second; + pubsub::nanomsg::topic_receiver *receiver = entry->second; topicReceivers.map.erase(receiverKey); free(receiverKey); - pubsub_nanoMsgTopicReceiver_destroy(receiver); + delete receiver; } celix_status_t status = CELIX_SUCCESS; return status; } -celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver, +celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver, const celix_properties_t *endpoint) { //note can be called with discoveredEndpoint.mutex lock celix_status_t status = CELIX_SUCCESS; - const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver); - const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver); + const char *scope = receiver->scope(); + const char *topic = receiver->topic(); const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr); const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr); @@ -503,7 +503,7 @@ celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub_nanomsg_to if (eScope != nullptr && eTopic != nullptr && strncmp(eScope, scope, 1024 * 1024) == 0 && strncmp(eTopic, topic, 1024 * 1024) == 0) { - pubsub_nanoMsgTopicReceiver_connectTo(receiver, url); + receiver->connectTo(url); } } @@ -516,7 +516,7 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { std::lock_guard<std::mutex> threadLock(topicReceivers.mutex); for (auto entry: topicReceivers.map) { - pubsub_nanomsg_topic_receiver_t *receiver = entry.second; + pubsub::nanomsg::topic_receiver *receiver = entry.second; connectEndpointToReceiver(receiver, endpoint); } } @@ -532,13 +532,13 @@ celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpo } -celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver, +celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver, const celix_properties_t *endpoint) { //note can be called with discoveredEndpoint.mutex lock celix_status_t status = CELIX_SUCCESS; - const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver); - const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver); + const char *scope = receiver->scope(); + const char *topic = receiver->topic(); const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, nullptr); const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, nullptr); @@ -551,7 +551,7 @@ celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub_nanom if (eScope != nullptr && eTopic != nullptr && strncmp(eScope, scope, 1024 * 1024) == 0 && strncmp(eTopic, topic, 1024 * 1024) == 0) { - pubsub_nanoMsgTopicReceiver_disconnectFrom(receiver, url); + receiver->disconnectFrom(url); } } @@ -564,7 +564,7 @@ celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *en if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); for (auto entry : topicReceivers.map) { - pubsub_nanomsg_topic_receiver_t *receiver = entry.second; + pubsub::nanomsg::topic_receiver *receiver = entry.second; disconnectEndpointFromReceiver(receiver, endpoint); } } @@ -605,16 +605,16 @@ celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribut std::lock_guard<std::mutex> serializerLock(serializers.mutex); std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); for (auto entry : topicReceivers.map) { - pubsub_nanomsg_topic_receiver_t *receiver = entry.second; - long serSvcId = pubsub_nanoMsgTopicReceiver_serializerSvcId(receiver); + pubsub::nanomsg::topic_receiver *receiver = entry.second; + long serSvcId = receiver->serializerSvcId(); auto kv = serializers.map.find(serSvcId); const char *serType = kv->second == nullptr ? "!Error!" : kv->second->serType; - const char *scope = pubsub_nanoMsgTopicReceiver_scope(receiver); - const char *topic = pubsub_nanoMsgTopicReceiver_topic(receiver); + const char *scope = receiver->scope(); + const char *topic = receiver->topic(); std::vector<std::string> connected{}; std::vector<std::string> unconnected{}; - pubsub_nanoMsgTopicReceiver_listConnections(receiver, connected, unconnected); + receiver->listConnections(connected, unconnected); fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic); fprintf(out, " |- serializer type = %s\n", serType); http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h index c34a310..3e680b6 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h @@ -84,10 +84,10 @@ private: celix_status_t executeCommand(char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))); - celix_status_t connectEndpointToReceiver(pubsub_nanomsg_topic_receiver_t *receiver, + celix_status_t connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver, const celix_properties_t *endpoint); - celix_status_t disconnectEndpointFromReceiver(pubsub_nanomsg_topic_receiver_t *receiver, + celix_status_t disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver, const celix_properties_t *endpoint); celix_bundle_context_t *ctx; log_helper_t *log; @@ -117,7 +117,7 @@ private: } psa_nanomsg_serializer_entry_t; ProtectedMap<long, psa_nanomsg_serializer_entry_t*> serializers{}; ProtectedMap<char*, pubsub_nanomsg_topic_sender_t*> topicSenders{}; - ProtectedMap<char*, pubsub_nanomsg_topic_receiver_t*> topicReceivers{}; + ProtectedMap<char*, pubsub::nanomsg::topic_receiver*> topicReceivers{}; ProtectedMap<const char*, celix_properties_t *> discoveredEndpoints{}; }; http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc index 42f6423..889d79d 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc @@ -109,92 +109,96 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void *svc static void* psa_nanomsg_recvThread(void *data); -pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx, - log_helper_t *logHelper, const char *scope, - const char *topic, long serializerSvcId, - pubsub_serializer_service_t *serializer) { - pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver))); - receiver->ctx = ctx; - receiver->logHelper = logHelper; - receiver->serializerSvcId = serializerSvcId; - receiver->serializer = serializer; - psa_nanomsg_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter); - - - receiver->nanoMsgSocket = nn_socket(AF_SP, NN_BUS); - if (receiver->nanoMsgSocket < 0) { - free(receiver); - receiver = NULL; - L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", scope, topic); +//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx, +// log_helper_t *logHelper, const char *scope, +// const char *topic, long serializerSvcId, +// pubsub_serializer_service_t *serializer) { +pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, + log_helper_t *_logHelper, + const char *_scope, + const char *_topic, + long _serializerSvcId, + pubsub_serializer_service_t *_serializer) : m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} { + //pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(calloc(1, sizeof(*receiver))); + ctx = _ctx; + logHelper = _logHelper; + serializer = _serializer; + psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, m_scopeAndTopicFilter); + + m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS); + if (m_nanoMsgSocket < 0) { + // TODO throw error or something + //free(receiver); + //receiver = NULL; + L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s", m_scope, m_topic); } else { int timeout = PSA_NANOMSG_RECV_TIMEOUT; - if (nn_setsockopt(receiver->nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, + if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof (timeout)) < 0) { - free(receiver); - receiver = NULL; - L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", scope, topic); + // TODO throw error or something + //free(receiver); + //receiver = NULL; + L_ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for %s/%s, set sockopt RECV_TIMEO failed", m_scope, m_topic); } char subscribeFilter[5]; - psa_nanomsg_setScopeAndTopicFilter(scope, topic, subscribeFilter); + psa_nanomsg_setScopeAndTopicFilter(m_scope, m_topic, subscribeFilter); //zsock_set_subscribe(receiver->nanoMsgSocket, subscribeFilter); - receiver->scope = strndup(scope, 1024 * 1024); - receiver->topic = strndup(topic, 1024 * 1024); + m_scope = strndup(m_scope, 1024 * 1024); + m_topic = strndup(m_topic, 1024 * 1024); - receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); - receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); + requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); + int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic); char buf[size + 1]; - snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); + snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, m_topic); celix_service_tracking_options_t opts{}; opts.filter.ignoreServiceLanguage = true; opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; opts.filter.filter = buf; - opts.callbackHandle = receiver; + opts.callbackHandle = this; opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber; opts.removeWithOwner = pubsub_nanoMsgTopicReceiver_removeSubscriber; - receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); - receiver->recvThread.running = true; - celixThread_create(&receiver->recvThread.thread, NULL, psa_nanomsg_recvThread, receiver); + subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + recvThread.running = true; + celixThread_create(&recvThread.thread, NULL, psa_nanomsg_recvThread, this); std::stringstream namestream; - namestream << "NANOMSG TR " << scope << "/" << topic; - celixThread_setName(&receiver->recvThread.thread, namestream.str().c_str()); + namestream << "NANOMSG TR " << m_scope << "/" << m_topic; + celixThread_setName(&recvThread.thread, namestream.str().c_str()); } - return receiver; } -void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver) { - if (receiver != NULL) { +pubsub::nanomsg::topic_receiver::~topic_receiver() { { - std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex); - receiver->recvThread.running = false; + std::lock_guard<std::mutex> _lock(recvThread.mutex); + recvThread.running = false; } - celixThread_join(receiver->recvThread.thread, NULL); + celixThread_join(recvThread.thread, NULL); - celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId); + celix_bundleContext_stopTracker(ctx, subscriberTrackerId); hash_map_iterator_t iter=hash_map_iterator_t(); { - std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex); - iter = hashMapIterator_construct(receiver->subscribers.map); + std::lock_guard<std::mutex> _lock(subscribers.mutex); + iter = hashMapIterator_construct(subscribers.map); while (hashMapIterator_hasNext(&iter)) { psa_nanomsg_subscriber_entry_t *entry = static_cast<psa_nanomsg_subscriber_entry_t*>(hashMapIterator_nextValue(&iter)); if (entry != NULL) { - receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes); + serializer->destroySerializerMap(serializer->handle, entry->msgTypes); free(entry); } } - hashMap_destroy(receiver->subscribers.map, false, false); + hashMap_destroy(subscribers.map, false, false); } { - std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex); - iter = hashMapIterator_construct(receiver->requestedConnections.map); + std::lock_guard<std::mutex> _lock(requestedConnections.mutex); + iter = hashMapIterator_construct(requestedConnections.map); while (hashMapIterator_hasNext(&iter)) { psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMapIterator_nextValue(&iter)); if (entry != NULL) { @@ -202,37 +206,34 @@ void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiv free(entry); } } - hashMap_destroy(receiver->requestedConnections.map, false, false); + hashMap_destroy(requestedConnections.map, false, false); } //celixThreadMutex_destroy(&receiver->subscribers.mutex); //celixThreadMutex_destroy(&receiver->requestedConnections.mutex); //celixThreadMutex_destroy(&receiver->recvThread.mutex); - nn_close(receiver->nanoMsgSocket); + nn_close(m_nanoMsgSocket); - free(receiver->scope); - free(receiver->topic); - } - free(receiver); + free((void*)m_scope); + free((void*)m_topic); } -const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver) { - return receiver->scope; +const char* pubsub::nanomsg::topic_receiver::scope() const { + return m_scope; } -const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver) { - return receiver->topic; +const char* pubsub::nanomsg::topic_receiver::topic() const { + return m_topic; } -long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver) { - return receiver->serializerSvcId; +long pubsub::nanomsg::topic_receiver::serializerSvcId() const { + return m_serializerSvcId; } -void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver, - std::vector<std::string> &connectedUrls, +void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls) { - std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); + std::lock_guard<std::mutex> _lock(requestedConnections.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(requestedConnections.map); while (hashMapIterator_hasNext(&iter)) { psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t *>(hashMapIterator_nextValue(&iter)); if (entry->connected) { @@ -244,21 +245,19 @@ void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t } -void pubsub_nanoMsgTopicReceiver_connectTo( - pubsub_nanomsg_topic_receiver_t *receiver, - const char *url) { - L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", receiver->scope, receiver->topic, url); +void pubsub::nanomsg::topic_receiver::connectTo(const char *url) { + L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", m_scope, m_topic, url); - std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex); - psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(receiver->requestedConnections.map, url)); + std::lock_guard<std::mutex> _lock(requestedConnections.mutex); + psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_get(requestedConnections.map, url)); if (entry == NULL) { entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(calloc(1, sizeof(*entry))); entry->url = strndup(url, 1024*1024); entry->connected = false; - hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry); + hashMap_put(requestedConnections.map, (void*)entry->url, entry); } if (!entry->connected) { - int connection_id = nn_connect(receiver->nanoMsgSocket, url); + int connection_id = nn_connect(m_nanoMsgSocket, url); if (connection_id >= 0) { entry->connected = true; entry->id = connection_id; @@ -268,13 +267,13 @@ void pubsub_nanoMsgTopicReceiver_connectTo( } } -void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url) { - L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", receiver->scope, receiver->topic, url); +void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) { + L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", m_scope, m_topic, url); - std::lock_guard<std::mutex> _lock(receiver->requestedConnections.mutex); - psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(receiver->requestedConnections.map, url)); + std::lock_guard<std::mutex> _lock(requestedConnections.mutex); + psa_nanomsg_requested_connection_entry_t *entry = static_cast<psa_nanomsg_requested_connection_entry_t*>(hashMap_remove(requestedConnections.map, url)); if (entry != NULL && entry->connected) { - if (nn_shutdown(receiver->nanoMsgSocket, entry->id) == 0) { + if (nn_shutdown(m_nanoMsgSocket, entry->id) == 0) { entry->connected = false; } else { L_WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url %s, id %d. (%s)", url, entry->id, strerror(errno)); @@ -287,7 +286,7 @@ void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t } static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { - pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle); + pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle); long bndId = celix_bundle_getId(bnd); const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); @@ -318,7 +317,7 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd) { - pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(handle); + pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(handle); long bndId = celix_bundle_getId(bnd); @@ -338,7 +337,7 @@ static void pubsub_nanoMsgTopicReceiver_removeSubscriber(void *handle, void */*s } } -static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) { +static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver *receiver, psa_nanomsg_subscriber_entry_t* entry, const pubsub_nanmosg_msg_header_t *hdr, const char* payload, size_t payloadSize) { pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type))); pubsub_subscriber_t *svc = entry->svc; @@ -362,7 +361,7 @@ static inline void processMsgForSubscriberEntry(pubsub_nanomsg_topic_receiver_t } } -static inline void processMsg(pubsub_nanomsg_topic_receiver_t *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) { +static inline void processMsg(pubsub_nanomsg_topic_receiver *receiver, const pubsub_nanmosg_msg_header_t *hdr, const char *payload, size_t payloadSize) { std::lock_guard<std::mutex> _lock(receiver->subscribers.mutex); hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); while (hashMapIterator_hasNext(&iter)) { @@ -377,8 +376,9 @@ struct Message { pubsub_nanmosg_msg_header_t header; char payload[]; }; + static void* psa_nanomsg_recvThread(void *data) { - pubsub_nanomsg_topic_receiver_t *receiver = static_cast<pubsub_nanomsg_topic_receiver_t*>(data); + pubsub_nanomsg_topic_receiver *receiver = static_cast<pubsub_nanomsg_topic_receiver*>(data); bool running{}; { std::lock_guard<std::mutex> _lock(receiver->recvThread.mutex); http://git-wip-us.apache.org/repos/asf/celix/blob/c19a5bd8/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h index d584db8..6cd216b 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h @@ -24,23 +24,72 @@ #include "log_helper.h" #include "celix_bundle_context.h" -typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t; +//typedef struct pubsub_nanomsg_topic_receiver pubsub_nanomsg_topic_receiver_t; +namespace pubsub { + namespace nanomsg { + class topic_receiver { + public: + topic_receiver(celix_bundle_context_t + *ctx, + log_helper_t *logHelper, + const char *scope, + const char *topic, + long serializerSvcId, pubsub_serializer_service_t + *serializer); + topic_receiver(const topic_receiver &) = delete; + topic_receiver & operator=(const topic_receiver &) = delete; + ~topic_receiver(); -pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx, - log_helper_t *logHelper, const char *scope, - const char *topic, long serializerSvcId, - pubsub_serializer_service_t *serializer); -void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver); + const char* scope() const; + const char* topic() const; + long serializerSvcId() const; + void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls); + void connectTo(const char *url); + void disconnectFrom(const char *url); + private: + celix_bundle_context_t *ctx{nullptr}; + log_helper_t *logHelper{nullptr}; + long m_serializerSvcId{0}; + pubsub_serializer_service_t *serializer{nullptr}; + const char *m_scope{nullptr}; + const char *m_topic{nullptr}; + char m_scopeAndTopicFilter[5]; -const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver); -const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver); + int m_nanoMsgSocket{0}; -long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver); -void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver, - std::vector<std::string> &connectedUrls, - std::vector<std::string> &unconnectedUrls); + struct { + celix_thread_t thread; + std::mutex mutex; + bool running; + } recvThread{}; -void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url); -void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url); + struct { + std::mutex mutex; + hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t* + } requestedConnections{}; + + long subscriberTrackerId{0}; + struct { + std::mutex mutex; + hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t + } subscribers{}; + }; + }} +//pubsub_nanomsg_topic_receiver_t* pubsub_nanoMsgTopicReceiver_create(celix_bundle_context_t *ctx, +// log_helper_t *logHelper, const char *scope, +// const char *topic, long serializerSvcId, +// pubsub_serializer_service_t *serializer); +//void pubsub_nanoMsgTopicReceiver_destroy(pubsub_nanomsg_topic_receiver_t *receiver); + +//const char* pubsub_nanoMsgTopicReceiver_scope(pubsub_nanomsg_topic_receiver_t *receiver); +//const char* pubsub_nanoMsgTopicReceiver_topic(pubsub_nanomsg_topic_receiver_t *receiver); + +//long pubsub_nanoMsgTopicReceiver_serializerSvcId(pubsub_nanomsg_topic_receiver_t *receiver); +//void pubsub_nanoMsgTopicReceiver_listConnections(pubsub_nanomsg_topic_receiver_t *receiver, +// std::vector<std::string> &connectedUrls, +// std::vector<std::string> &unconnectedUrls); + +//void pubsub_nanoMsgTopicReceiver_connectTo(pubsub_nanomsg_topic_receiver_t *receiver, const char *url); +//void pubsub_nanoMsgTopicReceiver_disconnectFrom(pubsub_nanomsg_topic_receiver_t *receiver, const char *url); #endif //CELIX_PUBSUB_NANOMSG_TOPIC_RECEIVER_H
