This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/celix.git
commit 363d491f4ae2e977735c94d56c0ade5c799c6ce0 Merge: b3d28d1 d1d001e Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Jan 7 11:47:33 2019 +0100 Merge branch 'feature/CELIX-454-pubsub-disc' into develop .travis.yml | 1 + bundles/log_service/src/log_helper.c | 52 +- bundles/pubsub/CMakeLists.txt | 2 +- bundles/pubsub/examples/CMakeLists.txt | 42 +- .../pubsub/examples/mp_pubsub/common/include/ew.h | 53 - .../pubsub/examples/mp_pubsub/common/include/ide.h | 49 - .../examples/mp_pubsub/common/include/kinematics.h | 55 - .../mp_pubsub/msg_descriptors/msg_ew.descriptor | 9 - .../mp_pubsub/msg_descriptors/msg_ide.descriptor | 9 - .../msg_descriptors/msg_kinematics.descriptor | 10 - .../examples/mp_pubsub/publisher/CMakeLists.txt | 43 - .../private/include/mp_publisher_private.h | 58 - .../publisher/private/src/mp_pub_activator.c | 144 -- .../mp_pubsub/publisher/private/src/mp_publisher.c | 160 --- .../examples/mp_pubsub/subscriber/CMakeLists.txt | 43 - .../private/include/mp_subscriber_private.h | 50 - .../subscriber/private/src/mp_sub_activator.c | 108 -- .../subscriber/private/src/mp_subscriber.c | 119 -- .../private/include/pubsub_subscriber_private.h | 2 +- .../subscriber/private/src/pubsub_subscriber.c | 2 +- bundles/pubsub/mock/src/publisher_mock.cc | 14 - .../src/pubsub_nanomsg_admin.cc | 39 +- .../src/pubsub_nanomsg_admin.h | 35 +- .../src/pubsub_nanomsg_topic_receiver.cc | 2 +- .../src/pubsub_nanomsg_topic_sender.cc | 1 - .../pubsub/pubsub_admin_udp_mc/src/psa_activator.c | 6 +- .../src/pubsub_psa_udpmc_constants.h | 29 +- .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c | 67 +- .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h | 21 +- .../src/pubsub_udpmc_topic_receiver.c | 242 +++- .../src/pubsub_udpmc_topic_receiver.h | 8 +- .../src/pubsub_udpmc_topic_sender.c | 36 +- .../src/pubsub_udpmc_topic_sender.h | 5 +- .../pubsub/pubsub_admin_zmq/src/psa_activator.c | 6 +- .../src/pubsub_psa_zmq_constants.h | 42 +- .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 50 +- .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h | 27 +- .../src/pubsub_zmq_topic_receiver.c | 127 +- .../src/pubsub_zmq_topic_receiver.h | 1 + .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 68 +- .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h | 2 + .../sync.h => pubsub_api/include/pubsub/api.h} | 13 +- .../pubsub/pubsub_api/include/pubsub/publisher.h | 21 +- .../pubsub/pubsub_api/include/pubsub/subscriber.h | 20 +- .../pubsub/pubsub_discovery/src/psd_activator.c | 2 +- .../pubsub_discovery/src/pubsub_discovery_impl.c | 14 +- .../pubsub_discovery/src/pubsub_discovery_impl.h | 2 +- .../src/ps_json_serializer_activator.c | 8 +- .../src/pubsub_serializer_impl.c | 76 +- .../src/pubsub_serializer_impl.h | 18 +- bundles/pubsub/pubsub_spi/include/pubsub_admin.h | 18 +- .../pubsub/pubsub_spi/include/pubsub_listeners.h | 7 +- .../pubsub/pubsub_spi/include/pubsub_serializer.h | 1 + bundles/pubsub/pubsub_spi/include/pubsub_utils.h | 39 +- bundles/pubsub/pubsub_spi/src/pubsub_utils.c | 4 +- bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c | 12 +- .../src/pubsub_topology_manager.c | 1429 +++++++++++--------- .../src/pubsub_topology_manager.h | 2 +- bundles/pubsub/test/CMakeLists.txt | 146 +- .../{msg_descriptors => meta_data}/msg.descriptor | 2 +- .../meta_data/ping.properties} | 14 +- .../pubsub/test/msg_descriptors/sync.descriptor | 9 - bundles/pubsub/test/test/sut_activator.c | 123 +- bundles/pubsub/test/test/test_runner.cc | 19 + bundles/pubsub/test/test/tst_activator.cc | 120 ++ bundles/pubsub/test/test/tst_activator.cpp | 221 --- cmake/cmake_celix/ContainerPackaging.cmake | 6 +- libs/framework/include/celix_bundle_activator.h | 2 +- libs/framework/include/celix_log.h | 2 + libs/framework/private/test/bundle_cache_test.cpp | 2 + libs/framework/private/test/bundle_test.cpp | 36 +- libs/framework/src/bundle.c | 15 +- libs/framework/src/bundle_context.c | 3 +- libs/framework/src/bundle_private.h | 1 + libs/framework/src/celix_log.c | 31 +- libs/utils/CMakeLists.txt | 9 +- libs/utils/src/properties.c | 4 +- 77 files changed, 1892 insertions(+), 2398 deletions(-) diff --cc bundles/pubsub/examples/CMakeLists.txt index 126db2d,2e43e6d..dd9ca54 --- a/bundles/pubsub/examples/CMakeLists.txt +++ b/bundles/pubsub/examples/CMakeLists.txt @@@ -251,104 -223,6 +223,92 @@@ if (BUILD_PUBSUB_PSA_ZMQ etcd USE_TERM ) - - #Runtime starting a multipart (multiple message in one send) publish and subscriber for zmq - add_celix_runtime(pubsub_rt_multipart_zmq - NAME zmq_multipart - GROUP pubsub - CONTAINERS - pubsub_mp_subscriber_zmq - pubsub_mp_publisher_zmq - COMMANDS - etcd - USE_TERM - ) endif () +endif() +if (BUILD_PUBSUB_PSA_NANOMSG) + add_celix_container("pubsub_publisher1_nanomsg" + GROUP "pubsub" + BUNDLES + Celix::shell + Celix::shell_tui + Celix::pubsub_serializer_json + Celix::pubsub_discovery_etcd + Celix::pubsub_topology_manager + Celix::pubsub_admin_nanomsg + celix_pubsub_poi_publisher + PROPERTIES + PSA_NANOMSG_VERBOSE=true + PUBSUB_ETCD_DISCOVERY_VERBOSE=true + PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true + ) + target_link_libraries(pubsub_publisher1_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS}) + + add_celix_container("pubsub_publisher2_nanomsg" + GROUP "pubsub" + BUNDLES + Celix::shell + Celix::shell_tui + Celix::pubsub_serializer_json + Celix::pubsub_discovery_etcd + Celix::pubsub_topology_manager + Celix::pubsub_admin_nanomsg + celix_pubsub_poi_publisher + PROPERTIES + PSA_NANOMSG_VERBOSE=true + PUBSUB_ETCD_DISCOVERY_VERBOSE=true + PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true + ) + target_link_libraries(pubsub_publisher2_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS}) + + add_celix_container(pubsub_subscriber1_nanomsg + GROUP "pubsub" + BUNDLES + Celix::shell + Celix::shell_tui + Celix::pubsub_serializer_json + Celix::pubsub_discovery_etcd + Celix::pubsub_topology_manager + Celix::pubsub_admin_nanomsg + celix_pubsub_poi_subscriber + PROPERTIES + PSA_NANOMSG_VERBOSE=true + PUBSUB_ETCD_DISCOVERY_VERBOSE=true + PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true + ) + target_link_libraries(pubsub_subscriber1_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS}) + + add_celix_container(pubsub_subscriber2_nanomsg + GROUP "pubsub" + BUNDLES + Celix::shell + Celix::shell_tui + Celix::pubsub_serializer_json + Celix::pubsub_discovery_etcd + Celix::pubsub_topology_manager + Celix::pubsub_admin_nanomsg + celix_pubsub_poi_subscriber + PROPERTIES + PSA_NANOMSG_VERBOSE=true + PUBSUB_ETCD_DISCOVERY_VERBOSE=true + PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true + ) + target_link_libraries(pubsub_subscriber2_nanomsg PRIVATE ${PUBSUB_CONTAINER_LIBS}) + + + if (ETCD_CMD AND XTERM_CMD) + #Runtime starting a publish and 2 subscribers for zmq + add_celix_runtime(pubsub_rt_nanomsg - NAME zmq ++ NAME nanomsg + GROUP pubsub + CONTAINERS + pubsub_publisher1_nanomsg + pubsub_publisher2_nanomsg + pubsub_subscriber1_nanomsg + pubsub_subscriber2_nanomsg + COMMANDS + etcd + USE_TERM + ) + endif () endif() diff --cc bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc index 8f3c229,0000000..f44a067 mode 100644,000000..100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc @@@ -1,609 -1,0 +1,614 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <string> +#include <vector> +#include <functional> +#include <memory.h> +#include <iostream> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> +#include <ifaddrs.h> +#include <pubsub_endpoint.h> +#include <algorithm> + +#include "pubsub_utils.h" +#include "pubsub_nanomsg_admin.h" +#include "pubsub_psa_nanomsg_constants.h" + + +static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip); + +pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx): + ctx{_ctx}, + L{ctx, "pubsub_nanomsg_admin"} { + verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT); + fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, nullptr); + + char *ip = nullptr; + const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_IP_KEY , nullptr); + if (confIp != nullptr) { + ip = strndup(confIp, 1024); + } + + if (ip == nullptr) { + //TODO try to get ip from subnet (CIDR) + } + + if (ip == nullptr) { + //try to get ip from itf + const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_ITF_KEY, nullptr); + nanoMsg_getIpAddress(interface, &ip); + } + + if (ip == nullptr) { + L.WARN("[PSA_NANOMSG] Could not determine IP address for PSA, using default ip (", PUBSUB_NANOMSG_DEFAULT_IP, ")"); + ip = strndup(PUBSUB_NANOMSG_DEFAULT_IP, 1024); + } + + ipAddress = ip; + if (verbose) { + L.INFO("[PSA_NANOMSG] Using ", ip, " for service annunciation."); + } + + + long _basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_BASE_PORT, PSA_NANOMSG_DEFAULT_BASE_PORT); + long _maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_MAX_PORT, PSA_NANOMSG_DEFAULT_MAX_PORT); + basePort = (unsigned int)_basePort; + maxPort = (unsigned int)_maxPort; + if (verbose) { + L.INFO("[PSA_NANOMSG] Using base till max port: ", _basePort, " till ", _maxPort); + } + + + defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE); + qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE); + qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE); +} + +pubsub_nanomsg_admin::~pubsub_nanomsg_admin() { + //note assuming al psa register services and service tracker are removed. + { +// std::lock_guard<std::mutex> lock(topicSenders.mutex); +// for (auto &kv : topicSenders.map) { +// auto &sender = kv.second; +// delete (sender); +// } + } + + { + std::lock_guard<std::mutex> lock(topicReceivers.mutex); + for (auto &kv: topicReceivers.map) { + delete kv.second; + } + } + + { + std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex); + for (auto &entry : discoveredEndpoints.map) { + auto *ep = entry.second; + celix_properties_destroy(ep); + } + } + + { + std::lock_guard<std::mutex> lock(serializers.mutex); + serializers.map.clear(); + } + + free(ipAddress); + +} + +void pubsub_nanomsg_admin::start() { + adminService.handle = this; - adminService.matchPublisher = [](void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long *serializerSvcId) { ++ adminService.matchPublisher = [](void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *score, long *serializerSvcId) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->matchPublisher(svcRequesterBndId, svcFilter,score, serializerSvcId); ++ return me->matchPublisher(svcRequesterBndId, svcFilter, outTopicProperties, score, serializerSvcId); + }; - adminService.matchSubscriber = [](void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, double *score, long *serializerSvcId) { ++ adminService.matchSubscriber = [](void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *score, long *serializerSvcId) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->matchSubscriber(svcProviderBndId, svcProperties, score, serializerSvcId); ++ return me->matchSubscriber(svcProviderBndId, svcProperties, outTopicProperties, score, serializerSvcId); + }; - adminService.matchEndpoint = [](void *handle, const celix_properties_t *endpoint, bool *match) { ++ adminService.matchDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint, bool *match) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); + return me->matchEndpoint(endpoint, match); + }; - adminService.setupTopicSender = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint) { ++ adminService.setupTopicSender = [](void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->setupTopicSender(scope, topic, serializerSvcId, publisherEndpoint); ++ return me->setupTopicSender(scope, topic, topicProperties, serializerSvcId, publisherEndpoint); + }; + adminService.teardownTopicSender = [](void *handle, const char *scope, const char *topic) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); + return me->teardownTopicSender(scope, topic); + }; - adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, long serializerSvcId, celix_properties_t **subscriberEndpoint) { ++ adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->setupTopicReceiver(std::string(scope), std::string(topic),serializerSvcId, subscriberEndpoint); ++ return me->setupTopicReceiver(std::string(scope), std::string(topic), topicProperties, serializerSvcId, subscriberEndpoint); + }; + + adminService.teardownTopicReceiver = [] (void *handle, const char *scope, const char *topic) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); + return me->teardownTopicReceiver(scope, topic); + }; - adminService.addEndpoint = [](void *handle, const celix_properties_t *endpoint) { ++ adminService.addDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); + return me->addEndpoint(endpoint); + }; - adminService.removeEndpoint = [](void *handle, const celix_properties_t *endpoint) { ++ adminService.removeDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); + return me->removeEndpoint(endpoint); + }; + + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_NANOMSG_ADMIN_TYPE); + + adminSvcId = celix_bundleContext_registerService(ctx, static_cast<void*>(&adminService), PUBSUB_ADMIN_SERVICE_NAME, props); + + + celix_service_tracking_options_t opts{}; + opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME; + opts.filter.ignoreServiceLanguage = true; + opts.callbackHandle = this; + opts.addWithProperties = [](void *handle, void *svc, const celix_properties_t *props) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); + me->addSerializerSvc(svc, props); + }; + opts.removeWithProperties = [](void *handle, void *svc, const celix_properties_t *props) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); + me->removeSerializerSvc(svc, props); + }; + serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + + //register shell command service + cmdSvc.handle = this; + cmdSvc.executeCommand = [](void *handle, char * commandLine, FILE *outStream, FILE *errorStream) { + auto me = static_cast<pubsub_nanomsg_admin*>(handle); + return me->executeCommand(commandLine, outStream, errorStream); + }; + + celix_properties_t* shellProps = celix_properties_create(); + celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg"); + celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg"); + celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the nanomsg PSA"); + cmdSvcId = celix_bundleContext_registerService(ctx, &cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps); + +} + +void pubsub_nanomsg_admin::stop() { + celix_bundleContext_unregisterService(ctx, adminSvcId); + celix_bundleContext_unregisterService(ctx, cmdSvcId); + celix_bundleContext_stopTracker(ctx, serializersTrackerId); +} + +void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t *props) { + const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, nullptr); + long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); + + if (serType == nullptr) { + L.INFO("[PSA_NANOMSG] Ignoring serializer service without ", PUBSUB_SERIALIZER_TYPE_KEY, " property"); + return; + } + + { + std::lock_guard<std::mutex> lock(serializers.mutex); + auto it = serializers.map.find(svcId); + if (it == serializers.map.end()) { + serializers.map.emplace(std::piecewise_construct, + std::forward_as_tuple(svcId), + std::forward_as_tuple(serType, svcId, static_cast<pubsub_serializer_service_t*>(svc))); + } + } +} + + +void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_properties_t *props) { + long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); + + //remove serializer + // 1) First find entry and + // 2) loop and destroy all topic sender using the serializer and + // 3) loop and destroy all topic receivers using the serializer + // Note that it is the responsibility of the topology manager to create new topic senders/receivers + + std::lock_guard<std::mutex> lock(serializers.mutex); + + auto kvsm = serializers.map.find(svcId); + if (kvsm != serializers.map.end()) { + auto &entry = kvsm->second; + { + std::lock_guard<std::mutex> senderLock(topicSenders.mutex); + for(auto it = topicSenders.map.begin(); it != topicSenders.map.end(); /*nothing*/) { + auto &sender = it->second; + if (entry.svcId == sender.getSerializerSvcId()) { + it = topicSenders.map.erase(it); + } else { + ++it; + } + } + } + + { + std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex); + for (auto iter = topicReceivers.map.begin(); iter != topicReceivers.map.end();) { + auto *receiver = iter->second; + if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) { + auto key = iter->first; + topicReceivers.map.erase(iter++); + delete receiver; + } else { + ++iter; + } + } + } + + } +} + - celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter, ++celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, + double *outScore, long *outSerializerSvcId) { + L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchPublisher"); + celix_status_t status = CELIX_SUCCESS; + double score = pubsub_utils_matchPublisher(ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE, - qosSampleScore, qosControlScore, defaultScore, outSerializerSvcId); ++ qosSampleScore, qosControlScore, defaultScore, outTopicProperties, outSerializerSvcId); + *outScore = score; + + return status; +} + - celix_status_t pubsub_nanomsg_admin::matchSubscriber(long svcProviderBndId, - const celix_properties_t *svcProperties, double *outScore, - long *outSerializerSvcId) { ++celix_status_t pubsub_nanomsg_admin::matchSubscriber( ++ long svcProviderBndId, ++ const celix_properties_t *svcProperties, ++ celix_properties_t **outTopicProperties, ++ double *outScore, ++ long *outSerializerSvcId) { + L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchSubscriber"); + celix_status_t status = CELIX_SUCCESS; + double score = pubsub_utils_matchSubscriber(ctx, svcProviderBndId, svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE, - qosSampleScore, qosControlScore, defaultScore, outSerializerSvcId); ++ qosSampleScore, qosControlScore, defaultScore, outTopicProperties, outSerializerSvcId); + if (outScore != nullptr) { + *outScore = score; + } + return status; +} + +celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t *endpoint, bool *outMatch) { + L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchEndpoint"); + celix_status_t status = CELIX_SUCCESS; + bool match = pubsub_utils_matchEndpoint(ctx, endpoint, PUBSUB_NANOMSG_ADMIN_TYPE, nullptr); + if (outMatch != nullptr) { + *outMatch = match; + } + return status; +} + +celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const char *topic, ++ const celix_properties_t */*topicProperties*/, + long serializerSvcId, celix_properties_t **outPublisherEndpoint) { + celix_status_t status = CELIX_SUCCESS; + + //1) Create TopicSender + //2) Store TopicSender + //3) Connect existing endpoints + //4) set outPublisherEndpoint + + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + std::lock_guard<std::mutex> serializerLock(serializers.mutex); + std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); + auto sender = topicSenders.map.find(key); + if (sender == topicSenders.map.end()) { + //psa_nanomsg_serializer_entry *serEntry = nullptr; + auto kv = serializers.map.find(serializerSvcId); + if (kv != serializers.map.end()) { + auto &serEntry = kv->second; + auto e = topicSenders.map.emplace(std::piecewise_construct, + std::forward_as_tuple(key), + std::forward_as_tuple(ctx, scope, topic, serializerSvcId, serEntry.svc, ipAddress, + basePort, maxPort)); + celix_properties_t *newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, + PUBSUB_NANOMSG_ADMIN_TYPE, serEntry.serType, nullptr); + celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, e.first->second.getUrl().c_str()); + //if available also set container name + const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr); + if (cn != nullptr) { + celix_properties_set(newEndpoint, "container_name", cn); + } + if (newEndpoint != nullptr && outPublisherEndpoint != nullptr) { + *outPublisherEndpoint = newEndpoint; + } + } else { + L.ERROR("[PSA NANOMSG] Error creating a TopicSender"); + } + } else { + L.ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic ", scope,"/", topic); + } + free(key); + + return status; +} + +celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, const char *topic) { + celix_status_t status = CELIX_SUCCESS; + + //1) Find and remove TopicSender from map + //2) destroy topic sender + + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); + if (topicSenders.map.erase(key) == 0) { + L.ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic ", scope, "/", topic, " Does not exists"); + } + free(key); + + return status; +} + +celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope, const std::string &topic, ++ const celix_properties_t */*topicProperties*/, + long serializerSvcId, celix_properties_t **outSubscriberEndpoint) { + + celix_properties_t *newEndpoint = nullptr; + + auto key = pubsubEndpoint_createScopeTopicKey(scope.c_str(), topic.c_str()); + pubsub::nanomsg::topic_receiver * receiver = nullptr; + { + std::lock_guard<std::mutex> serializerLock(serializers.mutex); + std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); + auto trkv = topicReceivers.map.find(key); + if (trkv != topicReceivers.map.end()) { + receiver = trkv->second; + } + if (receiver == nullptr) { + auto kvs = serializers.map.find(serializerSvcId); + if (kvs != serializers.map.end()) { + auto serEntry = kvs->second; + receiver = new pubsub::nanomsg::topic_receiver(ctx, scope, topic, serializerSvcId, serEntry.svc); + } else { + L.ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender ", scope, "/", topic); + } + if (receiver != nullptr) { + const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE; + const char *serType = kvs->second.serType; + newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, + serType, nullptr); + //if available also set container name + const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr); + if (cn != nullptr) { + celix_properties_set(newEndpoint, "container_name", cn); + } + topicReceivers.map[key] = receiver; + } else { + L.ERROR("[PSA NANOMSG] Error creating a TopicReceiver."); + } + } else { + L.ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic ", scope, "/", topic); + } + } + if (receiver != nullptr && newEndpoint != nullptr) { + std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); + for (auto entry : discoveredEndpoints.map) { + auto *endpoint = entry.second; + const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr); + if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { + connectEndpointToReceiver(receiver, endpoint); + } + } + } + + if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) { + *outSubscriberEndpoint = newEndpoint; + } + free(key); + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) { + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); + auto entry = topicReceivers.map.find(key); + free(key); + if (entry != topicReceivers.map.end()) { + auto receiverKey = entry->first; + pubsub::nanomsg::topic_receiver *receiver = entry->second; + topicReceivers.map.erase(receiverKey); + + delete receiver; + } + + celix_status_t status = CELIX_SUCCESS; + return status; +} + +celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver, + const celix_properties_t *endpoint) { + //note can be called with discoveredEndpoint.mutex lock + celix_status_t status = CELIX_SUCCESS; + + auto scope = receiver->scope(); + auto topic = receiver->topic(); + + std::string eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, ""); + std::string eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, ""); + const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr); + + if (url == nullptr) { +// L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type); + status = CELIX_BUNDLE_EXCEPTION; + } else { + if ((eScope == scope) && (eTopic == topic)) { + receiver->connectTo(url); + } + } + + return status; +} + +celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpoint) { + const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr); + + if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { + std::lock_guard<std::mutex> threadLock(topicReceivers.mutex); + for (auto &entry: topicReceivers.map) { + pubsub::nanomsg::topic_receiver *receiver = entry.second; + connectEndpointToReceiver(receiver, endpoint); + } + } + + std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); + celix_properties_t *cpy = celix_properties_copy(endpoint); + //TODO : check if properties are never deleted before map. + const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr); + discoveredEndpoints.map[uuid] = cpy; + + celix_status_t status = CELIX_SUCCESS; + return status; +} + + +celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver, + const celix_properties_t *endpoint) { + //note can be called with discoveredEndpoint.mutex lock + celix_status_t status = CELIX_SUCCESS; + + auto scope = receiver->scope(); + auto topic = receiver->topic(); + + auto eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, ""); + auto eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, ""); + const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr); + + if (url == nullptr) { + L.WARN("[PSA NANOMSG] Error got endpoint without nanomsg url"); + status = CELIX_BUNDLE_EXCEPTION; + } else { + if ((eScope == scope) && (eTopic == topic)) { + receiver->disconnectFrom(url); + } + } + + return status; +} + +celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *endpoint) { + const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr); + + if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { + std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); + for (auto &entry : topicReceivers.map) { + pubsub::nanomsg::topic_receiver *receiver = entry.second; + disconnectEndpointFromReceiver(receiver, endpoint); + } + } + { + std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); + const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr); + discoveredEndpoints.map.erase(uuid); + } + return CELIX_SUCCESS;; +} + +celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine __attribute__((unused)), FILE *out, + FILE *errStream __attribute__((unused))) { + celix_status_t status = CELIX_SUCCESS; + fprintf(out, "\n"); + fprintf(out, "Topic Senders:\n"); + { + std::lock_guard<std::mutex> serializerLock(serializers.mutex); + std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); + for (auto &senderEntry: topicSenders.map) { + auto &sender = senderEntry.second; + long serSvcId = sender.getSerializerSvcId(); + auto kvs = serializers.map.find(serSvcId); + const char* serType = ( kvs == serializers.map.end() ? "!Error" : kvs->second.serType); + const auto scope = sender.getScope(); + const auto topic = sender.getTopic(); + const auto url = sender.getUrl(); + fprintf(out, "|- Topic Sender %s/%s\n", scope.c_str(), topic.c_str()); + fprintf(out, " |- serializer type = %s\n", serType); + fprintf(out, " |- url = %s\n", url.c_str()); + } + } + + { + fprintf(out, "\n"); + fprintf(out, "\nTopic Receivers:\n"); + std::lock_guard<std::mutex> serializerLock(serializers.mutex); + std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); + for (auto &entry : topicReceivers.map) { + pubsub::nanomsg::topic_receiver *receiver = entry.second; + long serSvcId = receiver->serializerSvcId(); + auto kv = serializers.map.find(serSvcId); + const char *serType = (kv == serializers.map.end() ? "!Error!" : kv->second.serType); + auto scope = receiver->scope(); + auto topic = receiver->topic(); + + std::vector<std::string> connected{}; + std::vector<std::string> unconnected{}; + receiver->listConnections(connected, unconnected); + + fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str()); + fprintf(out, " |- serializer type = %s\n", serType); + for (auto &url : connected) { + fprintf(out, " |- connected url = %s\n", url.c_str()); + } + for (auto &url : unconnected) { + fprintf(out, " |- unconnected url = %s\n", url.c_str()); + } + } + } + fprintf(out, "\n"); + + return status; +} + +#ifndef ANDROID +static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip) { + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + struct ifaddrs *ifaddr, *ifa; + char host[NI_MAXHOST]; + + if (getifaddrs(&ifaddr) != -1) + { + for (ifa = ifaddr; ifa != nullptr && status != CELIX_SUCCESS; ifa = ifa->ifa_next) + { + if (ifa->ifa_addr == nullptr) + continue; + + if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, nullptr, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) { + if (interface == nullptr) { + *ip = strdup(host); + status = CELIX_SUCCESS; + } + else if (strcmp(ifa->ifa_name, interface) == 0) { + *ip = strdup(host); + status = CELIX_SUCCESS; + } + } + } + + freeifaddrs(ifaddr); + } + + return status; +} +#endif diff --cc bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h index 768accc,0000000..dccf280 mode 100644,000000..100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h @@@ -1,136 -1,0 +1,153 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef CELIX_PUBSUB_ZMQ_ADMIN_H +#define CELIX_PUBSUB_ZMQ_ADMIN_H + +#include <mutex> +#include <map> +#include <pubsub_admin.h> +#include "celix_api.h" +#include "pubsub_nanomsg_topic_receiver.h" +#include <pubsub_serializer.h> +#include "LogHelper.h" +#include "command.h" +#include "pubsub_nanomsg_topic_sender.h" +#include "pubsub_nanomsg_topic_receiver.h" + +#define PUBSUB_NANOMSG_ADMIN_TYPE "zmq" +#define PUBSUB_NANOMSG_URL_KEY "zmq.url" + +#define PUBSUB_NANOMSG_VERBOSE_KEY "PSA_ZMQ_VERBOSE" +#define PUBSUB_NANOMSG_VERBOSE_DEFAULT true + +#define PUBSUB_NANOMSG_PSA_IP_KEY "PSA_IP" +#define PUBSUB_NANOMSG_PSA_ITF_KEY "PSA_INTERFACE" + +#define PUBSUB_NANOMSG_DEFAULT_IP "127.0.0.1" + +template <typename key, typename value> +struct ProtectedMap { + std::mutex mutex{}; + std::map<key, value> map{}; +}; + +class pubsub_nanomsg_admin { +public: + pubsub_nanomsg_admin(celix_bundle_context_t *ctx); + pubsub_nanomsg_admin(const pubsub_nanomsg_admin&) = delete; + pubsub_nanomsg_admin& operator=(const pubsub_nanomsg_admin&) = delete; + ~pubsub_nanomsg_admin(); + void start(); + void stop(); + +private: + void addSerializerSvc(void *svc, const celix_properties_t *props); + void removeSerializerSvc(void */*svc*/, const celix_properties_t *props); - celix_status_t matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter, - double *score, long *serializerSvcId); - celix_status_t matchSubscriber(long svcProviderBndId, - const celix_properties_t *svcProperties, double *score, - long *serializerSvcId); ++ celix_status_t matchPublisher( ++ long svcRequesterBndId, ++ const celix_filter_t *svcFilter, ++ celix_properties_t **outTopicProperties, ++ double *outScore, ++ long *outsSerializerSvcId); ++ celix_status_t matchSubscriber( ++ long svcProviderBndId, ++ const celix_properties_t *svcProperties, ++ celix_properties_t **outTopicProperties, ++ double *outScope, ++ long *outSerializerSvcId); + celix_status_t matchEndpoint(const celix_properties_t *endpoint, bool *match); + - celix_status_t setupTopicSender(const char *scope, const char *topic, - long serializerSvcId, celix_properties_t **publisherEndpoint); ++ celix_status_t setupTopicSender( ++ const char *scope, ++ const char *topic, ++ const celix_properties_t *topicProperties, ++ long serializerSvcId, ++ celix_properties_t **outPublisherEndpoint); ++ + celix_status_t teardownTopicSender(const char *scope, const char *topic); + - celix_status_t setupTopicReceiver(const std::string &scope, const std::string &topic, - long serializerSvcId, celix_properties_t **subscriberEndpoint); ++ celix_status_t setupTopicReceiver( ++ const std::string &scope, ++ const std::string &topic, ++ const celix_properties_t *topicProperties, ++ long serializerSvcId, ++ celix_properties_t **outSubscriberEndpoint); ++ + celix_status_t teardownTopicReceiver(const char *scope, const char *topic); + + celix_status_t addEndpoint(const celix_properties_t *endpoint); + celix_status_t removeEndpoint(const celix_properties_t *endpoint); + + celix_status_t executeCommand(char *commandLine __attribute__((unused)), FILE *out, + FILE *errStream __attribute__((unused))); + + celix_status_t connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver, + const celix_properties_t *endpoint); + + celix_status_t disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver, + const celix_properties_t *endpoint); + celix_bundle_context_t *ctx; + celix::pubsub::nanomsg::LogHelper L; + pubsub_admin_service_t adminService{}; + long adminSvcId = -1L; + long cmdSvcId = -1L; + command_service_t cmdSvc{}; + long serializersTrackerId = -1L; + + const char *fwUUID{}; + + char* ipAddress{}; + + unsigned int basePort{}; + unsigned int maxPort{}; + + double qosSampleScore{}; + double qosControlScore{}; + double defaultScore{}; + + bool verbose{}; + + class psa_nanomsg_serializer_entry { + public: + psa_nanomsg_serializer_entry(const char*_serType, long _svcId, pubsub_serializer_service_t *_svc) : + serType{_serType}, svcId{_svcId}, svc{_svc} { + + } + + const char *serType; + long svcId; + pubsub_serializer_service_t *svc; + }; + ProtectedMap<long, psa_nanomsg_serializer_entry> serializers{}; + ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender> topicSenders{}; + ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{}; + ProtectedMap<const std::string, celix_properties_t *> discoveredEndpoints{}; +}; + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __cplusplus +} +#endif + + +#endif //CELIX_PUBSUB_ZMQ_ADMIN_H diff --cc bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc index d35de89,0000000..b27eb76 mode 100644,000000..100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc @@@ -1,319 -1,0 +1,319 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <iostream> +#include <mutex> +#include <memory.h> +#include <vector> +#include <string> +#include <sstream> + +#include <stdlib.h> +#include <assert.h> + +#include <sys/epoll.h> +#include <arpa/inet.h> + +#include <nanomsg/nn.h> +#include <nanomsg/bus.h> + +#include <pubsub_serializer.h> +#include <pubsub/subscriber.h> +#include <pubsub_constants.h> +#include <pubsub_endpoint.h> +#include <LogHelper.h> + +#include "pubsub_nanomsg_topic_receiver.h" +#include "pubsub_psa_nanomsg_constants.h" +#include "pubsub_nanomsg_common.h" +#include "pubsub_topology_manager.h" + +//TODO see if block and wakeup (reset) also works +#define PSA_NANOMSG_RECV_TIMEOUT 100 //100 msec timeout + +/* +#define L_DEBUG(...) \ + logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) +#define L_INFO(...) \ + logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__) +#define L_WARN(...) \ + logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__) +#define L_ERROR(...) \ + logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) +*/ +//#define L_DEBUG printf +//#define L_INFO printf +//#define L_WARN printf +//#define L_ERROR printf + + +pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, + const std::string &_scope, + const std::string &_topic, + long _serializerSvcId, + pubsub_serializer_service_t *_serializer) : L{_ctx, "NANOMSG_topic_receiver"}, m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} { + ctx = _ctx; + serializer = _serializer; + + m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS); + if (m_nanoMsgSocket < 0) { + L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for scope/topic: ", m_scope.c_str(), "/", m_topic.c_str()); + std::bad_alloc{}; + } else { + int timeout = PSA_NANOMSG_RECV_TIMEOUT; + if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, + sizeof (timeout)) < 0) { + L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for ",m_scope, "/",m_topic, ", set sockopt RECV_TIMEO failed"); + std::bad_alloc{}; + } + + auto subscriberFilter = celix::pubsub::nanomsg::setScopeAndTopicFilter(m_scope, m_topic); + + auto opts = createOptions(); + + subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + recvThread.running = true; + free ((void*)opts.filter.filter); + recvThread.thread = std::thread([this]() {this->recvThread_exec();}); + } +} + +celix_service_tracking_options_t pubsub::nanomsg::topic_receiver::createOptions() { + std::stringstream filter_str; + + filter_str << "(" << PUBSUB_SUBSCRIBER_TOPIC << "=" << m_topic << ")"; + celix_service_tracking_options_t opts{}; + opts.filter.ignoreServiceLanguage = true; + opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; + opts.filter.filter = strdup(filter_str.str().c_str()); // TODO : memory leak ?? + opts.callbackHandle = this; + opts.addWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) { + static_cast<pubsub::nanomsg::topic_receiver*>(handle)->addSubscriber(svc, props, svcOwner); + }; + opts.removeWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) { + static_cast<pubsub::nanomsg::topic_receiver*>(handle)->removeSubscriber(svc, props, svcOwner); + }; + return opts; +} + +pubsub::nanomsg::topic_receiver::~topic_receiver() { + + { + std::lock_guard<std::mutex> _lock(recvThread.mutex); + recvThread.running = false; + } + recvThread.thread.join(); + + celix_bundleContext_stopTracker(ctx, subscriberTrackerId); + + { + std::lock_guard<std::mutex> _lock(subscribers.mutex); + for(auto elem : subscribers.map) { + serializer->destroySerializerMap(serializer->handle, elem.second.msgTypes); + } + subscribers.map.clear(); + } + nn_close(m_nanoMsgSocket); + +} + +std::string pubsub::nanomsg::topic_receiver::scope() const { + return m_scope; +} + +std::string pubsub::nanomsg::topic_receiver::topic() const { + return m_topic; +} + +long pubsub::nanomsg::topic_receiver::serializerSvcId() const { + return m_serializerSvcId; +} + +void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls, + std::vector<std::string> &unconnectedUrls) { + std::lock_guard<std::mutex> _lock(requestedConnections.mutex); + for (auto entry : requestedConnections.map) { + if (entry.second.isConnected()) { + connectedUrls.emplace_back(entry.second.getUrl()); + } else { + unconnectedUrls.emplace_back(entry.second.getUrl()); + } + } +} + + +void pubsub::nanomsg::topic_receiver::connectTo(const char *url) { + L.DBG("[PSA_NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " connecting to nanomsg url ", url); + + std::lock_guard<std::mutex> _lock(requestedConnections.mutex); + auto entry = requestedConnections.map.find(url); + if (entry == requestedConnections.map.end()) { + requestedConnections.map.emplace( + std::piecewise_construct, + std::forward_as_tuple(std::string(url)), + std::forward_as_tuple(url, -1)); + entry = requestedConnections.map.find(url); + } + if (!entry->second.isConnected()) { + int connection_id = nn_connect(m_nanoMsgSocket, url); + if (connection_id >= 0) { + entry->second.setConnected(true); + entry->second.setId(connection_id); + } else { + L.WARN("[PSA_NANOMSG] Error connecting to NANOMSG url ", url, " (",strerror(errno), ")"); + } + } +} + +void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) { + L.DBG("[PSA NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " disconnect from nanomsg url ", url); + + std::lock_guard<std::mutex> _lock(requestedConnections.mutex); + auto entry = requestedConnections.map.find(url); + if (entry != requestedConnections.map.end()) { + if (entry->second.isConnected()) { + if (nn_shutdown(m_nanoMsgSocket, entry->second.getId()) == 0) { + entry->second.setConnected(false); + } else { + L.WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url ", url, ", id: ", entry->second.getId(), " (",strerror(errno),")"); + } + } + requestedConnections.map.erase(url); + std::cerr << "REMOVING connection " << url << std::endl; + } else { + std::cerr << "Disconnecting from unknown URL " << url << std::endl; + } +} + +void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_properties_t *props, + const celix_bundle_t *bnd) { + long bndId = celix_bundle_getId(bnd); + std::string subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + if (subScope != m_scope) { + //not the same scope. ignore + return; + } + + std::lock_guard<std::mutex> _lock(subscribers.mutex); + auto entry = subscribers.map.find(bndId); + if (entry != subscribers.map.end()) { + entry->second.usageCount += 1; + } else { + //new create entry + subscribers.map.emplace(std::piecewise_construct, + std::forward_as_tuple(bndId), + std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1)); + entry = subscribers.map.find(bndId); + + int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes); + if (rc != 0) { + L.ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver ", m_scope.c_str(), "/", m_topic.c_str()); + subscribers.map.erase(bndId); + } + } +} + +void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/, + const celix_properties_t */*props*/, const celix_bundle_t *bnd) { + long bndId = celix_bundle_getId(bnd); + + std::lock_guard<std::mutex> _lock(subscribers.mutex); + auto entry = subscribers.map.find(bndId); + if (entry != subscribers.map.end()) { + entry->second.usageCount -= 1; + if (entry->second.usageCount <= 0) { + //remove entry + int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes); + if (rc != 0) { + L.ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver ", m_scope.c_str(), "/",m_topic.c_str(),"\n"); + } + subscribers.map.erase(bndId); + } + } +} + +void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const celix::pubsub::nanomsg::msg_header *hdr, const char* payload, size_t payloadSize) { + pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type))); + pubsub_subscriber_t *svc = entry->svc; + + if (msgSer!= NULL) { + void *deserializedMsg = NULL; + bool validVersion = celix::pubsub::nanomsg::checkVersion(msgSer->msgVersion, hdr); + if (validVersion) { + celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg); + if(status == CELIX_SUCCESS) { + bool release = false; - svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, NULL, &release); ++ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release); + if (release) { + msgSer->freeMsg(msgSer->handle, deserializedMsg); + } + } else { + L.WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type ", msgSer->msgName , "for scope/topic ", scope(), "/", topic()); + } + } + } else { + L.WARN("[PSA_NANOMSG_TR] Cannot find serializer for type id ", hdr->type); + } +} + +void pubsub::nanomsg::topic_receiver::processMsg(const celix::pubsub::nanomsg::msg_header *hdr, const char *payload, size_t payloadSize) { + std::lock_guard<std::mutex> _lock(subscribers.mutex); + for (auto entry : subscribers.map) { + processMsgForSubscriberEntry(&entry.second, hdr, payload, payloadSize); + } +} + +struct Message { + celix::pubsub::nanomsg::msg_header header; + char payload[]; +}; + +void pubsub::nanomsg::topic_receiver::recvThread_exec() { + while (recvThread.running) { + Message *msg = nullptr; + nn_iovec iov[2]; + iov[0].iov_base = &msg; + iov[0].iov_len = NN_MSG; + + nn_msghdr msgHdr; + memset(&msgHdr, 0, sizeof(msgHdr)); + + msgHdr.msg_iov = iov; + msgHdr.msg_iovlen = 1; + + msgHdr.msg_control = nullptr; + msgHdr.msg_controllen = 0; + + errno = 0; + int recvBytes = nn_recvmsg(m_nanoMsgSocket, &msgHdr, 0); + if (msg && static_cast<unsigned long>(recvBytes) >= sizeof(celix::pubsub::nanomsg::msg_header)) { + processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header)); + nn_freemsg(msg); + } else if (recvBytes >= 0) { + L.ERROR("[PSA_NANOMSG_TR] Error receiving nanomsg msg, size (", recvBytes,") smaller than header\n"); + } else if (errno == EAGAIN || errno == ETIMEDOUT) { + // no data: go to next cycle + } else if (errno == EINTR) { + L.DBG("[PSA_NANOMSG_TR] nn_recvmsg interrupted"); + } else { + L.WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno ", errno, " : ", strerror(errno), "\n"); + } + } // while + +} diff --cc bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc index 154bd11,0000000..4190729 mode 100644,000000..100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc @@@ -1,267 -1,0 +1,266 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <memory.h> +#include <iostream> +#include <sstream> +#include <stdlib.h> +#include <utils.h> +#include <arpa/inet.h> +#include <zconf.h> +#include <LogHelper.h> +#include <nanomsg/nn.h> +#include <nanomsg/bus.h> + + +#include <pubsub_constants.h> +#include <pubsub_common.h> +#include "pubsub_nanomsg_topic_sender.h" +#include "pubsub_psa_nanomsg_constants.h" +#include "pubsub_nanomsg_common.h" + +#define FIRST_SEND_DELAY_IN_SECONDS 2 +#define NANOMSG_BIND_MAX_RETRY 10 + +static unsigned int rand_range(unsigned int min, unsigned int max); +static void delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper); + +pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, + const char *_scope, + const char *_topic, + long _serializerSvcId, + pubsub_serializer_service_t *_ser, + const char *_bindIp, + unsigned int _basePort, + unsigned int _maxPort) : + ctx{_ctx}, + L{ctx, "PSA_ZMQ_TS"}, + serializerSvcId {_serializerSvcId}, + serializer{_ser}, + scope{_scope}, + topic{_topic}{ + + scopeAndTopicFilter = celix::pubsub::nanomsg::setScopeAndTopicFilter(_scope, _topic); + + //setting up nanomsg socket for nanomsg TopicSender + int nnSock = nn_socket(AF_SP, NN_BUS); + if (nnSock == -1) { + perror("Error for nanomsg_socket"); + } + + int rv = -1, retry=0; + while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) { + /* Randomized part due to same bundle publishing on different topics */ + unsigned int port = rand_range(_basePort,_maxPort); + std::stringstream _url; + _url << "tcp://" << _bindIp << ":" << port; + + std::stringstream bindUrl; + bindUrl << "tcp://0.0.0.0:" << port; + + rv = nn_bind (nnSock, bindUrl.str().c_str()); + if (rv == -1) { + perror("Error for nn_bind"); + } else { + this->url = _url.str(); + nanomsg.socket = nnSock; + } + retry++; + } + + if (!url.empty()) { + + //register publisher services using a service factory + publisher.factory.handle = this; + publisher.factory.getService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) { + return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService( + requestingBundle, + svcProperties); + }; + publisher.factory.ungetService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) { + return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->ungetPublisherService( + requestingBundle, + svcProperties); + }; + + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic.c_str()); + celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope.c_str()); + + celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; + opts.factory = &publisher.factory; + opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME; + opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION; + opts.properties = props; + + publisher.svcId = celix_bundleContext_registerServiceWithOptions(_ctx, &opts); + } + +} + +pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() { + celix_bundleContext_unregisterService(ctx, publisher.svcId); + + nn_close(nanomsg.socket); + std::lock_guard<std::mutex> lock(boundedServices.mutex); + for (auto &it: boundedServices.map) { + serializer->destroySerializerMap(serializer->handle, it.second.msgTypes); + } + boundedServices.map.clear(); + +} + +long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const { + return serializerSvcId; +} + +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const { + return scope; +} + +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const { + return topic; +} + +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const { + return url; +} + + +void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const celix_bundle_t *requestingBundle, + const celix_properties_t *svcProperties __attribute__((unused))) { + long bndId = celix_bundle_getId(requestingBundle); + void *service{nullptr}; + std::lock_guard<std::mutex> lock(boundedServices.mutex); + auto existingEntry = boundedServices.map.find(bndId); + if (existingEntry != boundedServices.map.end()) { + existingEntry->second.getCount += 1; + service = &existingEntry->second.service; + } else { + auto entry = boundedServices.map.emplace(std::piecewise_construct, + std::forward_as_tuple(bndId), + std::forward_as_tuple(scope, topic, bndId, nanomsg.socket, ctx)); + int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry.first->second.msgTypes); + + if (rc == 0) { + entry.first->second.service.handle = &entry.first->second; + entry.first->second.service.localMsgTypeIdForMsgType = celix::pubsub::nanomsg::localMsgTypeIdForMsgType; + entry.first->second.service.send = [](void *handle, unsigned int msgTypeId, const void *msg) { + return static_cast<pubsub::nanomsg::bounded_service_entry*>(handle)->topicPublicationSend(msgTypeId, msg); + }; - entry.first->second.service.sendMultipart = nullptr; //not supported TODO remove + service = &entry.first->second.service; + } else { + boundedServices.map.erase(bndId); + L.ERROR("Error creating serializer map for NanoMsg TopicSender. Scope: ", scope, ", Topic: ", topic); + } + } + + return service; +} + +void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const celix_bundle_t *requestingBundle, + const celix_properties_t */*svcProperties*/) { + long bndId = celix_bundle_getId(requestingBundle); + + std::lock_guard<std::mutex> lock(boundedServices.mutex); + auto entry = boundedServices.map.find(bndId); + if (entry != boundedServices.map.end()) { + entry->second.getCount -= 1; + if (entry->second.getCount == 0) { + int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes); + if (rc != 0) { + L.ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n"); + } + boundedServices.map.erase(bndId); + } + } +} + +int pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int msgTypeId, const void *inMsg) { + int status; + auto msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes, (void*)(uintptr_t)msgTypeId)); + + if (msgSer != nullptr) { + delay_first_send_for_late_joiners(L); + + int major = 0, minor = 0; + + celix::pubsub::nanomsg::msg_header msg_hdr{}; + msg_hdr.type = msgTypeId; + + if (msgSer->msgVersion != nullptr) { + version_getMajor(msgSer->msgVersion, &major); + version_getMinor(msgSer->msgVersion, &minor); + msg_hdr.major = (unsigned char) major; + msg_hdr.minor = (unsigned char) minor; + } + + void *serializedOutput = nullptr; + size_t serializedOutputLen = 0; + status = msgSer->serialize(msgSer, inMsg, &serializedOutput, &serializedOutputLen); + if (status == CELIX_SUCCESS) { + nn_iovec data[2]; + + nn_msghdr msg{}; + msg.msg_iov = data; + msg.msg_iovlen = 2; + msg.msg_iov[0].iov_base = static_cast<void*>(&msg_hdr); + msg.msg_iov[0].iov_len = sizeof(msg_hdr); + msg.msg_iov[1].iov_base = serializedOutput; + msg.msg_iov[1].iov_len = serializedOutputLen; + msg.msg_control = nullptr; + msg.msg_controllen = 0; + errno = 0; + int rc = nn_sendmsg(nanoMsgSocket, &msg, 0 ); + free(serializedOutput); + if (rc < 0) { + L.WARN("[PSA_ZMQ_TS] Error sending zmsg, rc: ", rc, ", error: ", strerror(errno)); + } else { + L.INFO("[PSA_ZMQ_TS] Send message with size ", rc, "\n"); + L.INFO("[PSA_ZMQ_TS] Send message ID ", msg_hdr.type, + " major: ", (int)msg_hdr.major, + " minor: ", (int)msg_hdr.minor,"\n"); + } + } else { + L.WARN("[PSA_ZMQ_TS] Error serialize message of type ", msgSer->msgName, + " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n"); + } + } else { + status = CELIX_SERVICE_EXCEPTION; + L.WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id ", msgTypeId, + " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n"); + } + return status; +} + +static void delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper) { + + static bool firstSend = true; + + if(firstSend){ + logHelper.INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); + sleep(FIRST_SEND_DELAY_IN_SECONDS); + firstSend = false; + } +} + +static unsigned int rand_range(unsigned int min, unsigned int max){ + double scaled = ((double)random())/((double)RAND_MAX); + return (unsigned int)((max-min+1)*scaled + min); +} diff --cc bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c index 2d36579,d7add8b..b7fc8bd --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c @@@ -576,8 -592,6 +592,7 @@@ celix_status_t pubsub_udpmcAdmin_execut celixThreadMutex_unlock(&psa->serializers.mutex); fprintf(out, "\n"); - //TODO topic receivers/senders connection count + return status; } @@@ -681,4 -695,4 +696,4 @@@ static celix_status_t udpmc_getIpAddres return status; } --#endif ++#endif diff --cc bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h index cada021,469fa9c..c71341b --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h @@@ -22,19 -22,9 +22,8 @@@ #include "celix_api.h" #include "log_helper.h" - - #define PUBSUB_UDPMC_ADMIN_TYPE "udp_mc" - #define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY "udpmc.socket_address" - #define PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY "udpmc.socket_port" - - #define PUBSUB_UDPMC_IP_KEY "PSA_IP" - #define PUBSUB_UDPMC_ITF_KEY "PSA_INTERFACE" - #define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY "PSA_MC_PREFIX" - #define PUBSUB_UDPMC_VERBOSE_KEY "PSA_UDPMC_VERBOSE" - - #define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT "224.100" - #define PUBSUB_UDPMC_VERBOSE_DEFAULT true + #include "pubsub_psa_udpmc_constants.h" - typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t; pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_helper_t *logHelper); diff --cc bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c index 64d7a23,1b7c1db..b5ca4fd --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c @@@ -339,3 -345,7 +345,7 @@@ static unsigned int rand_range(unsigne long pubsub_udpmcTopicSender_serializerSvcId(pubsub_udpmc_topic_sender_t *sender) { return sender->serializerSvcId; } + + bool pubsub_udpmcTopicSender_isStatic(pubsub_udpmc_topic_sender_t *sender) { + return sender->staticallyConfigured; -} ++} diff --cc bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c index a13cb26,8c9e163..ea7653e --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c @@@ -370,6 -373,22 +373,22 @@@ celix_status_t pubsub_zmqAdmin_setupTop newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, NULL); celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender)); + + //if configured use a static discover url + const char *staticDiscUrl = celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_DISCOVER_URL, NULL); + if (staticDiscUrl != NULL) { + celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, staticDiscUrl); + } + celix_properties_setBool(newEndpoint, PUBSUB_ZMQ_STATIC_CONFIGURED, staticBindUrl != NULL || staticDiscUrl != NULL); + + //if url starts with ipc:// constrain discovery to host visibility, else use system visibility + const char *u = celix_properties_get(newEndpoint, PUBSUB_ZMQ_URL_KEY, ""); + if (strncmp("ipc://", u, strlen("ipc://")) == 0) { - celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_HOST_VISIBLITY); ++ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_HOST_VISIBILITY); + } else { - celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBLITY); ++ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY); + } + //if available also set container name const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL); if (cn != NULL) { diff --cc bundles/pubsub/pubsub_spi/include/pubsub_utils.h index e4983b1,9d707f3..396ae6f --- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h @@@ -38,26 -36,40 +38,43 @@@ extern "C" * present a allocated scope string. * The caller is owner of the topic and scope output string. */ - celix_status_t pubsub_getPubSubInfoFromFilter(const char *filterstr, char **topic, char **scope); + celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topic, char **scope); - char *pubsub_getKeysBundleDir(bundle_context_pt ctx); + char* pubsub_getKeysBundleDir(bundle_context_pt ctx); - double - pubsub_utils_matchPublisher(celix_bundle_context_t *ctx, long bundleId, const char *filter, const char *adminType, - double sampleScore, double controlScore, double defaultScore, long *outSerializerSvcId); + double pubsub_utils_matchPublisher( + celix_bundle_context_t *ctx, + long bundleId, + const char *filter, + const char *adminType, + double sampleScore, + double controlScore, + double defaultScore, + celix_properties_t **outTopicProperties, + long *outSerializerSvcId); - double pubsub_utils_matchSubscriber(celix_bundle_context_t *ctx, long svcProviderBundleId, - const celix_properties_t *svcProperties, const char *adminType, double sampleScore, - double controlScore, double defaultScore, long *outSerializerSvcId); + double pubsub_utils_matchSubscriber( + celix_bundle_context_t *ctx, + long svcProviderBundleId, + const celix_properties_t *svcProperties, + const char *adminType, + double sampleScore, + double controlScore, + double defaultScore, + celix_properties_t **outTopicProperties, + long *outSerializerSvcId); - bool pubsub_utils_matchEndpoint(celix_bundle_context_t *ctx, const celix_properties_t *endpoint, const char *adminType, - long *outSerializerSvcId); + bool pubsub_utils_matchEndpoint( + celix_bundle_context_t *ctx, + const celix_properties_t *endpoint, + const char *adminType, + long *outSerializerSvcId); - celix_properties_t *pubsub_utils_getTopicProperties(const celix_bundle_t *bundle, const char *topic, bool isPublisher); + celix_properties_t* pubsub_utils_getTopicProperties(const celix_bundle_t *bundle, const char *topic, bool isPublisher); +#ifdef __cplusplus +} +#endif #endif /* PUBSUB_UTILS_H_ */
