This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/509-remove-pubsub in repository https://gitbox.apache.org/repos/asf/celix.git
commit efad168a5d2a501cf72e3734b0242fb95bdf406c Author: Pepijn Noltes <[email protected]> AuthorDate: Sun Dec 17 17:14:33 2023 +0100 Remove psa nanomsg from experimental --- .github/workflows/ubuntu.yml | 1 - documents/building/README.md | 4 +- misc/experimental/bundles/CMakeLists.txt | 1 - .../bundles/pubsub_admin_nanomsg/CMakeLists.txt | 46 -- .../bundles/pubsub_admin_nanomsg/src/LogHelper.h | 103 ---- .../src/psa_nanomsg_activator.cc | 87 --- .../src/pubsub_nanomsg_admin.cc | 625 --------------------- .../src/pubsub_nanomsg_admin.h | 154 ----- .../src/pubsub_nanomsg_common.cc | 56 -- .../src/pubsub_nanomsg_common.h | 56 -- .../src/pubsub_nanomsg_topic_receiver.cc | 319 ----------- .../src/pubsub_nanomsg_topic_receiver.h | 127 ----- .../src/pubsub_nanomsg_topic_sender.cc | 265 --------- .../src/pubsub_nanomsg_topic_sender.h | 114 ---- .../src/pubsub_psa_nanomsg_constants.h | 39 -- 15 files changed, 1 insertion(+), 1996 deletions(-) diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml index e1bd8ef0..df0d9f40 100644 --- a/.github/workflows/ubuntu.yml +++ b/.github/workflows/ubuntu.yml @@ -119,7 +119,6 @@ jobs: cmake \ libffi-dev \ libxml2-dev \ - libczmq-dev \ libcpputest-dev \ rapidjson-dev \ libavahi-compat-libdnssd-dev \ diff --git a/documents/building/README.md b/documents/building/README.md index 420150cb..c6435100 100644 --- a/documents/building/README.md +++ b/documents/building/README.md @@ -137,7 +137,6 @@ The following packages (libraries + headers) should be installed on your system: * libffi (for libdfi) * libxml2 (for remote services and bonjour shell) * rapidjson (for C++ remote service discovery) - * libczmq (for PubSubAdmin ZMQ) For Ubuntu 22.04, use the following commands: @@ -155,7 +154,6 @@ sudo apt-get install -yq --no-install-recommends \ libffi-dev \ libzip-dev \ libxml2-dev \ - libczmq-dev \ libcpputest-dev \ rapidjson-dev ``` @@ -163,7 +161,7 @@ sudo apt-get install -yq --no-install-recommends \ For OSX systems with brew installed, use the following commands: ```bash brew update && \ -brew install lcov libffi libzip czmq rapidjson libxml2 cmake jansson && \ +brew install lcov libffi libzip rapidjson libxml2 cmake jansson && \ brew link --force libffi ``` diff --git a/misc/experimental/bundles/CMakeLists.txt b/misc/experimental/bundles/CMakeLists.txt index b96038d7..fdd442fb 100644 --- a/misc/experimental/bundles/CMakeLists.txt +++ b/misc/experimental/bundles/CMakeLists.txt @@ -19,5 +19,4 @@ if (NOT APPLE) #Note note sure if these bundles build on OSX add_subdirectory(config_admin) add_subdirectory(event_admin) - add_subdirectory(pubsub_admin_nanomsg) endif () diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/CMakeLists.txt b/misc/experimental/bundles/pubsub_admin_nanomsg/CMakeLists.txt deleted file mode 100644 index 27e80830..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/CMakeLists.txt +++ /dev/null @@ -1,46 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -if (BUILD_PUBSUB_PSA_NANOMSG) - - find_package(NanoMsg REQUIRED) - - add_celix_bundle(celix_pubsub_admin_nanomsg - BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_nanomsg" - VERSION "1.0.0" - GROUP "Celix/PubSub" - SOURCES - src/psa_nanomsg_activator.cc - src/pubsub_nanomsg_admin.cc - src/pubsub_nanomsg_topic_sender.cc - src/pubsub_nanomsg_topic_receiver.cc - src/pubsub_nanomsg_common.cc - ) - - target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE - Celix::pubsub_spi - Celix::framework Celix::dfi Celix::log_helper - NANOMSG::lib - ) - target_include_directories(celix_pubsub_admin_nanomsg PRIVATE - src - ) - - install_celix_bundle(celix_pubsub_admin_nanomsg EXPORT celix COMPONENT pubsub) - target_link_libraries(celix_pubsub_admin_nanomsg PRIVATE Celix::shell_api) - add_library(Celix::pubsub_admin_nanomsg ALIAS celix_pubsub_admin_nanomsg) -endif() diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/LogHelper.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/LogHelper.h deleted file mode 100644 index d5d2f0b8..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/LogHelper.h +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once -#include <sstream> -#include "log_helper.h" -#include <mutex> -namespace celix { - namespace pubsub { - namespace nanomsg { - /* - * Not that the loghelper is created in the firs log-call. This is because when a log-helper is started - * during registration of a service with a service-factory a dead-lock can occur - * This prevents it. - */ - class LogHelper { - public: - LogHelper(bundle_context_t *_ctx, const std::string& _componentName ) : ctx{_ctx}, helperCreated{true}, componentName{_componentName}{ - } - - LogHelper(const LogHelper& ) = delete; - LogHelper& operator=(const LogHelper&) = delete; - - - ~LogHelper() { - if (helperCreated && _logHelper) { - logHelper_stop(_logHelper); - logHelper_destroy(&_logHelper); - } - std::cerr << "Destroyed loghelper for " << componentName << std::endl; - } - template<typename... Args> - void ERROR(Args... args) { - auto ss = LOG_STREAM(args...); - log_string(OSGI_LOGSERVICE_ERROR, ss.str()); - } - - template<typename... Args> - void WARN(Args... args) { - auto ss = LOG_STREAM(args...); - log_string(OSGI_LOGSERVICE_WARNING, ss.str()); - } - - template<typename... Args> - void INFO(Args... args) { - auto ss = LOG_STREAM(args...); - log_string(OSGI_LOGSERVICE_INFO, ss.str()); - } - - template<typename... Args> - void DBG(Args... args) { - auto ss = LOG_STREAM(args...); - log_string(OSGI_LOGSERVICE_DEBUG, ss.str()); - } - - private: - bundle_context_t *ctx; - bool helperCreated{false}; - log_helper_t *_logHelper{}; - std::string componentName{}; - template<typename T> - std::stringstream LOG_STREAM(T first) const { - std::stringstream ss; - ss << first; - return ss; - } - - template<typename T, typename... Args> - std::stringstream LOG_STREAM(T first, Args... args) const { - std::stringstream ss; - ss << "[" << componentName << "] " << first << LOG_STREAM(args...).str(); - return ss; - } - - void log_string(log_level_t level, const std::string& msg) { - if (_logHelper == nullptr) { - helperCreated = true; - logHelper_create(ctx, &_logHelper); - logHelper_start(_logHelper); - } - logHelper_log(_logHelper, level, msg.c_str()); - } - }; - - } - } -} \ No newline at end of file diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc b/misc/experimental/bundles/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc deleted file mode 100644 index 20c23920..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/psa_nanomsg_activator.cc +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <stdlib.h> -#include <new> -#include <iostream> -#include "celix_api.h" -#include "pubsub_serializer.h" -#include "LogHelper.h" -#include "pubsub_admin.h" -#include "pubsub_nanomsg_admin.h" - -namespace celix { namespace pubsub { namespace nanomsg { - class Activator { - public: - Activator(celix_bundle_context_t *ctx) : - context{ctx}, - L{context, std::string("PSA_NANOMSG_ACTIVATOR")}, - admin(context) - { - } - Activator(const Activator&) = delete; - Activator& operator=(const Activator&) = delete; - - ~Activator() = default; - - celix_status_t start() { - admin.start(); - return CELIX_SUCCESS; - } - - celix_status_t stop() { - admin.stop(); - return CELIX_SUCCESS; - }; - - private: - celix_bundle_context_t *context{}; - celix::pubsub::nanomsg::LogHelper L; - pubsub_nanomsg_admin admin; - - }; -}}} - -celix_status_t celix_bundleActivator_create(celix_bundle_context_t *ctx , void **userData) { - celix_status_t status = CELIX_SUCCESS; - auto data = new (std::nothrow) celix::pubsub::nanomsg::Activator{ctx}; - if (data != NULL) { - *userData = data; - } else { - status = CELIX_ENOMEM; - } - return status; -} - -celix_status_t celix_bundleActivator_start(void *userData, celix_bundle_context_t *) { - auto act = static_cast<celix::pubsub::nanomsg::Activator*>(userData); - return act->start(); -} - -celix_status_t celix_bundleActivator_stop(void *userData, celix_bundle_context_t *) { - auto act = static_cast<celix::pubsub::nanomsg::Activator*>(userData); - return act->stop(); -} - - -celix_status_t celix_bundleActivator_destroy(void *userData, celix_bundle_context_t *) { - auto act = static_cast<celix::pubsub::nanomsg::Activator*>(userData); - delete act; - return CELIX_SUCCESS; -} diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc deleted file mode 100644 index 064f3e73..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ /dev/null @@ -1,625 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <string> -#include <vector> -#include <functional> -#include <memory.h> -#include <iostream> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <netdb.h> -#include <ifaddrs.h> -#include <pubsub_endpoint.h> -#include <algorithm> - -#include "pubsub_utils.h" -#include "pubsub_nanomsg_admin.h" -#include "pubsub_psa_nanomsg_constants.h" - -#include "celix_compiler.h" - - -static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip); - -pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx): - ctx{_ctx}, - L{ctx, "pubsub_nanomsg_admin"} { - verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT); - fwUUID = celix_bundleContext_getProperty(ctx, CELIX_FRAMEWORK_UUID, nullptr); - - char *ip = nullptr; - const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_IP_KEY , nullptr); - if (confIp != NULL) { - if (strchr(confIp, '/') != NULL) { - // IP with subnet prefix specified - ip = ipUtils_findIpBySubnet(confIp); - if (ip == NULL) { - L_WARN("[PSA_NANOMSG] Could not find interface for requested subnet %s", confIp); - } - } else { - // IP address specified - ip = strndup(confIp, 1024); - } - } - - if (ip == nullptr) { - //TODO try to get ip from subnet (CIDR) - } - - if (ip == nullptr) { - //try to get ip from itf - const char *interface = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_ITF_KEY, nullptr); - nanoMsg_getIpAddress(interface, &ip); - } - - if (ip == nullptr) { - L.WARN("[PSA_NANOMSG] Could not determine IP address for PSA, using default ip (", PUBSUB_NANOMSG_DEFAULT_IP, ")"); - ip = strndup(PUBSUB_NANOMSG_DEFAULT_IP, 1024); - } - - ipAddress = ip; - if (verbose) { - L.INFO("[PSA_NANOMSG] Using ", ip, " for service annunciation."); - } - - - long _basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_BASE_PORT, PSA_NANOMSG_DEFAULT_BASE_PORT); - long _maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_NANOMSG_MAX_PORT, PSA_NANOMSG_DEFAULT_MAX_PORT); - basePort = (unsigned int)_basePort; - maxPort = (unsigned int)_maxPort; - if (verbose) { - L.INFO("[PSA_NANOMSG] Using base till max port: ", _basePort, " till ", _maxPort); - } - - - defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE); - qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE); - qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE); -} - -pubsub_nanomsg_admin::~pubsub_nanomsg_admin() { - //note assuming al psa register services and service tracker are removed. - { -// std::lock_guard<std::mutex> lock(topicSenders.mutex); -// for (auto &kv : topicSenders.map) { -// auto &sender = kv.second; -// delete (sender); -// } - } - - { - std::lock_guard<std::mutex> lock(topicReceivers.mutex); - for (auto &kv: topicReceivers.map) { - delete kv.second; - } - } - - { - std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex); - for (auto &entry : discoveredEndpoints.map) { - auto *ep = entry.second; - celix_properties_destroy(ep); - } - } - - { - std::lock_guard<std::mutex> lock(serializers.mutex); - serializers.map.clear(); - } - - free(ipAddress); - -} - -void pubsub_nanomsg_admin::start() { - adminService.handle = this; - adminService.matchPublisher = [](void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, double *score, long *serializerSvcId) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->matchPublisher(svcRequesterBndId, svcFilter, outTopicProperties, score, serializerSvcId); - }; - adminService.matchSubscriber = [](void *handle, long svcProviderBndId, const celix_properties_t *svcProperties, celix_properties_t **outTopicProperties, double *score, long *serializerSvcId) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->matchSubscriber(svcProviderBndId, svcProperties, outTopicProperties, score, serializerSvcId); - }; - adminService.matchDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint, bool *match) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->matchEndpoint(endpoint, match); - }; - adminService.setupTopicSender = [](void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **publisherEndpoint) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->setupTopicSender(scope, topic, topicProperties, serializerSvcId, publisherEndpoint); - }; - adminService.teardownTopicSender = [](void *handle, const char *scope, const char *topic) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->teardownTopicSender(scope, topic); - }; - adminService.setupTopicReceiver = [](void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, celix_properties_t **subscriberEndpoint) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->setupTopicReceiver(std::string(scope), std::string(topic), topicProperties, serializerSvcId, subscriberEndpoint); - }; - - adminService.teardownTopicReceiver = [] (void *handle, const char *scope, const char *topic) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->teardownTopicReceiver(scope, topic); - }; - adminService.addDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->addEndpoint(endpoint); - }; - adminService.removeDiscoveredEndpoint = [](void *handle, const celix_properties_t *endpoint) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->removeEndpoint(endpoint); - }; - - celix_properties_t *props = celix_properties_create(); - celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_NANOMSG_ADMIN_TYPE); - - adminSvcId = celix_bundleContext_registerService(ctx, static_cast<void*>(&adminService), PUBSUB_ADMIN_SERVICE_NAME, props); - - - celix_service_tracking_options_t opts{}; - opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME; - opts.filter.ignoreServiceLanguage = true; - opts.callbackHandle = this; - opts.addWithProperties = [](void *handle, void *svc, const celix_properties_t *props) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - me->addSerializerSvc(svc, props); - }; - opts.removeWithProperties = [](void *handle, void *svc, const celix_properties_t *props) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - me->removeSerializerSvc(svc, props); - }; - serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); - - //register shell command service - cmdSvc.handle = this; - cmdSvc.executeCommand = [](void *handle, char * commandLine, FILE *outStream, FILE *errorStream) { - auto me = static_cast<pubsub_nanomsg_admin*>(handle); - return me->executeCommand(commandLine, outStream, errorStream); - }; - - celix_properties_t* shellProps = celix_properties_create(); - celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg"); - celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg"); - celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the nanomsg PSA"); - cmdSvcId = celix_bundleContext_registerService(ctx, &cmdSvc, OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps); - -} - -void pubsub_nanomsg_admin::stop() { - celix_bundleContext_unregisterService(ctx, adminSvcId); - celix_bundleContext_unregisterService(ctx, cmdSvcId); - celix_bundleContext_stopTracker(ctx, serializersTrackerId); -} - -void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const celix_properties_t *props) { - const char *serType = celix_properties_get(props, PUBSUB_SERIALIZER_TYPE_KEY, nullptr); - long svcId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); - - if (serType == nullptr) { - L.INFO("[PSA_NANOMSG] Ignoring serializer service without ", PUBSUB_SERIALIZER_TYPE_KEY, " property"); - return; - } - - { - std::lock_guard<std::mutex> lock(serializers.mutex); - auto it = serializers.map.find(svcId); - if (it == serializers.map.end()) { - serializers.map.emplace(std::piecewise_construct, - std::forward_as_tuple(svcId), - std::forward_as_tuple(serType, svcId, static_cast<pubsub_serializer_service_t*>(svc))); - } - } -} - - -void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const celix_properties_t *props) { - long svcId = celix_properties_getAsLong(props, CELIX_FRAMEWORK_SERVICE_ID, -1L); - - //remove serializer - // 1) First find entry and - // 2) loop and destroy all topic sender using the serializer and - // 3) loop and destroy all topic receivers using the serializer - // Note that it is the responsibility of the topology manager to create new topic senders/receivers - - std::lock_guard<std::mutex> lock(serializers.mutex); - - auto kvsm = serializers.map.find(svcId); - if (kvsm != serializers.map.end()) { - auto &entry = kvsm->second; - { - std::lock_guard<std::mutex> senderLock(topicSenders.mutex); - for (auto it = topicSenders.map.begin(); it != topicSenders.map.end(); /*nothing*/) { - auto &sender = it->second; - if (entry.svcId == sender.getSerializerSvcId()) { - it = topicSenders.map.erase(it); - } else { - ++it; - } - } - } - - { - std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex); - for (auto iter = topicReceivers.map.begin(); iter != topicReceivers.map.end();) { - auto *receiver = iter->second; - if (receiver != nullptr && entry.svcId == receiver->serializerSvcId()) { - auto key = iter->first; - topicReceivers.map.erase(iter++); - delete receiver; - } else { - ++iter; - } - } - } - - } -} - -celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, - double *outScore, long *outSerializerSvcId) { - L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchPublisher"); - celix_status_t status = CELIX_SUCCESS; - double score = pubsub_utils_matchPublisher(ctx, svcRequesterBndId, svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE, - qosSampleScore, qosControlScore, defaultScore, outTopicProperties, outSerializerSvcId); - *outScore = score; - - return status; -} - -celix_status_t pubsub_nanomsg_admin::matchSubscriber( - long svcProviderBndId, - const celix_properties_t *svcProperties, - celix_properties_t **outTopicProperties, - double *outScore, - long *outSerializerSvcId) { - L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchSubscriber"); - celix_status_t status = CELIX_SUCCESS; - double score = pubsub_utils_matchSubscriber(ctx, svcProviderBndId, svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE, - qosSampleScore, qosControlScore, defaultScore, outTopicProperties, outSerializerSvcId); - if (outScore != nullptr) { - *outScore = score; - } - return status; -} - -celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t *endpoint, bool *outMatch) { - L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchEndpoint"); - celix_status_t status = CELIX_SUCCESS; - bool match = pubsub_utils_matchEndpoint(ctx, endpoint, PUBSUB_NANOMSG_ADMIN_TYPE, nullptr); - if (outMatch != nullptr) { - *outMatch = match; - } - return status; -} - -celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, const char *topic, - const celix_properties_t */*topicProperties*/, - long serializerSvcId, celix_properties_t **outPublisherEndpoint) { - celix_status_t status = CELIX_SUCCESS; - - //1) Create TopicSender - //2) Store TopicSender - //3) Connect existing endpoints - //4) set outPublisherEndpoint - - char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - std::lock_guard<std::mutex> serializerLock(serializers.mutex); - std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); - auto sender = topicSenders.map.find(key); - if (sender == topicSenders.map.end()) { - //psa_nanomsg_serializer_entry *serEntry = nullptr; - auto kv = serializers.map.find(serializerSvcId); - if (kv != serializers.map.end()) { - auto &serEntry = kv->second; - auto e = topicSenders.map.emplace(std::piecewise_construct, - std::forward_as_tuple(key), - std::forward_as_tuple(ctx, scope, topic, serializerSvcId, serEntry.svc, ipAddress, - basePort, maxPort)); - celix_properties_t *newEndpoint = pubsubEndpoint_create(fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, - PUBSUB_NANOMSG_ADMIN_TYPE, serEntry.serType, nullptr); - celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, e.first->second.getUrl().c_str()); - //if available also set container name - const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr); - if (cn != nullptr) { - celix_properties_set(newEndpoint, "container_name", cn); - } - if (newEndpoint != nullptr && outPublisherEndpoint != nullptr) { - *outPublisherEndpoint = newEndpoint; - } - } else { - L.ERROR("[PSA NANOMSG] Error creating a TopicSender"); - } - } else { - L.ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for scope/topic ", scope,"/", topic); - } - free(key); - - return status; -} - -celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, const char *topic) { - celix_status_t status = CELIX_SUCCESS; - - //1) Find and remove TopicSender from map - //2) destroy topic sender - - char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); - if (topicSenders.map.erase(key) == 0) { - L.ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic ", scope, "/", topic, " Does not exists"); - } - free(key); - - return status; -} - -celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string &scope, const std::string &topic, - const celix_properties_t */*topicProperties*/, - long serializerSvcId, celix_properties_t **outSubscriberEndpoint) { - - celix_properties_t *newEndpoint = nullptr; - - auto key = pubsubEndpoint_createScopeTopicKey(scope.c_str(), topic.c_str()); - pubsub::nanomsg::topic_receiver * receiver = nullptr; - { - std::lock_guard<std::mutex> serializerLock(serializers.mutex); - std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); - auto trkv = topicReceivers.map.find(key); - if (trkv != topicReceivers.map.end()) { - receiver = trkv->second; - } - if (receiver == nullptr) { - auto kvs = serializers.map.find(serializerSvcId); - if (kvs != serializers.map.end()) { - auto serEntry = kvs->second; - receiver = new pubsub::nanomsg::topic_receiver(ctx, scope, topic, serializerSvcId, serEntry.svc); - } else { - L.ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender ", scope, "/", topic); - } - if (receiver != nullptr) { - const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE; - const char *serType = kvs->second.serType; - newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, - serType, nullptr); - //if available also set container name - const char *cn = celix_bundleContext_getProperty(ctx, "CELIX_CONTAINER_NAME", nullptr); - if (cn != nullptr) { - celix_properties_set(newEndpoint, "container_name", cn); - } - topicReceivers.map[key] = receiver; - } else { - L.ERROR("[PSA NANOMSG] Error creating a TopicReceiver."); - } - } else { - L.ERROR("[PSA_NANOMSG] Cannot setup already existing TopicReceiver for scope/topic ", scope, "/", topic); - } - } - if (receiver != nullptr && newEndpoint != nullptr) { - std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); - for (auto entry : discoveredEndpoints.map) { - auto *endpoint = entry.second; - const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr); - if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { - connectEndpointToReceiver(receiver, endpoint); - } - } - } - - if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) { - *outSubscriberEndpoint = newEndpoint; - } - free(key); - celix_status_t status = CELIX_SUCCESS; - return status; -} - -celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, const char *topic) { - char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); - auto entry = topicReceivers.map.find(key); - free(key); - if (entry != topicReceivers.map.end()) { - auto receiverKey = entry->first; - pubsub::nanomsg::topic_receiver *receiver = entry->second; - topicReceivers.map.erase(receiverKey); - - delete receiver; - } - - celix_status_t status = CELIX_SUCCESS; - return status; -} - -celix_status_t pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver, - const celix_properties_t *endpoint) { - //note can be called with discoveredEndpoint.mutex lock - celix_status_t status = CELIX_SUCCESS; - - auto scope = receiver->scope(); - auto topic = receiver->topic(); - - std::string eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, ""); - std::string eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, ""); - const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr); - - if (url == nullptr) { -// L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url (admin: %s, type: %s)", admin , type); - status = CELIX_BUNDLE_EXCEPTION; - } else { - if ((eScope == scope) && (eTopic == topic)) { - receiver->connectTo(url); - } - } - - return status; -} - -celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t *endpoint) { - const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr); - - if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { - std::lock_guard<std::mutex> threadLock(topicReceivers.mutex); - for (auto &entry: topicReceivers.map) { - pubsub::nanomsg::topic_receiver *receiver = entry.second; - connectEndpointToReceiver(receiver, endpoint); - } - } - - std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); - celix_properties_t *cpy = celix_properties_copy(endpoint); - //TODO : check if properties are never deleted before map. - const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, nullptr); - discoveredEndpoints.map[uuid] = cpy; - - celix_status_t status = CELIX_SUCCESS; - return status; -} - - -celix_status_t pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver, - const celix_properties_t *endpoint) { - //note can be called with discoveredEndpoint.mutex lock - celix_status_t status = CELIX_SUCCESS; - - auto scope = receiver->scope(); - auto topic = receiver->topic(); - - auto eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, ""); - auto eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, ""); - const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, nullptr); - - if (url == nullptr) { - L.WARN("[PSA NANOMSG] Error got endpoint without nanomsg url"); - status = CELIX_BUNDLE_EXCEPTION; - } else { - if ((eScope == scope) && (eTopic == topic)) { - receiver->disconnectFrom(url); - } - } - - return status; -} - -celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t *endpoint) { - const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, nullptr); - - if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { - std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); - for (auto &entry : topicReceivers.map) { - pubsub::nanomsg::topic_receiver *receiver = entry.second; - disconnectEndpointFromReceiver(receiver, endpoint); - } - } - { - std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex); - const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, nullptr); - discoveredEndpoints.map.erase(uuid); - } - return CELIX_SUCCESS;; -} - -celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine CELIX_UNUSED, FILE *out, - FILE *errStream CELIX_UNUSED) { - celix_status_t status = CELIX_SUCCESS; - fprintf(out, "\n"); - fprintf(out, "Topic Senders:\n"); - { - std::lock_guard<std::mutex> serializerLock(serializers.mutex); - std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex); - for (auto &senderEntry: topicSenders.map) { - auto &sender = senderEntry.second; - long serSvcId = sender.getSerializerSvcId(); - auto kvs = serializers.map.find(serSvcId); - const char* serType = ( kvs == serializers.map.end() ? "!Error" : kvs->second.serType); - const auto scope = sender.getScope(); - const auto topic = sender.getTopic(); - const auto url = sender.getUrl(); - fprintf(out, "|- Topic Sender %s/%s\n", scope.c_str(), topic.c_str()); - fprintf(out, " |- serializer type = %s\n", serType); - fprintf(out, " |- url = %s\n", url.c_str()); - } - } - - { - fprintf(out, "\n"); - fprintf(out, "\nTopic Receivers:\n"); - std::lock_guard<std::mutex> serializerLock(serializers.mutex); - std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex); - for (auto &entry : topicReceivers.map) { - pubsub::nanomsg::topic_receiver *receiver = entry.second; - long serSvcId = receiver->serializerSvcId(); - auto kv = serializers.map.find(serSvcId); - const char *serType = (kv == serializers.map.end() ? "!Error!" : kv->second.serType); - auto scope = receiver->scope(); - auto topic = receiver->topic(); - - std::vector<std::string> connected{}; - std::vector<std::string> unconnected{}; - receiver->listConnections(connected, unconnected); - - fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), topic.c_str()); - fprintf(out, " |- serializer type = %s\n", serType); - for (auto &url : connected) { - fprintf(out, " |- connected url = %s\n", url.c_str()); - } - for (auto &url : unconnected) { - fprintf(out, " |- unconnected url = %s\n", url.c_str()); - } - } - } - fprintf(out, "\n"); - - return status; -} - -#ifndef ANDROID -static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip) { - celix_status_t status = CELIX_BUNDLE_EXCEPTION; - - struct ifaddrs *ifaddr, *ifa; - char host[NI_MAXHOST]; - - if (getifaddrs(&ifaddr) != -1) - { - for (ifa = ifaddr; ifa != nullptr && status != CELIX_SUCCESS; ifa = ifa->ifa_next) - { - if (ifa->ifa_addr == nullptr) - continue; - - if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, nullptr, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) { - if (interface == nullptr) { - *ip = strdup(host); - status = CELIX_SUCCESS; - } - else if (strcmp(ifa->ifa_name, interface) == 0) { - *ip = strdup(host); - status = CELIX_SUCCESS; - } - } - } - - freeifaddrs(ifaddr); - } - - return status; -} -#endif \ No newline at end of file diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h deleted file mode 100644 index 3785ce5e..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef CELIX_PUBSUB_NANOMSG_ADMIN_H -#define CELIX_PUBSUB_NANOMSG_ADMIN_H - -#include <mutex> -#include <map> -#include <pubsub_admin.h> -#include "celix_api.h" -#include "celix_compiler.h" -#include "pubsub_nanomsg_topic_receiver.h" -#include <pubsub_serializer.h> -#include "LogHelper.h" -#include "command.h" -#include "pubsub_nanomsg_topic_sender.h" -#include "pubsub_nanomsg_topic_receiver.h" - -#define PUBSUB_NANOMSG_ADMIN_TYPE "nanomsg" -#define PUBSUB_NANOMSG_URL_KEY "nanomsg.url" - -#define PUBSUB_NANOMSG_VERBOSE_KEY "PSA_NANOMSG_VERBOSE" -#define PUBSUB_NANOMSG_VERBOSE_DEFAULT true - -#define PUBSUB_NANOMSG_PSA_IP_KEY "PSA_IP" -#define PUBSUB_NANOMSG_PSA_ITF_KEY "PSA_INTERFACE" - -#define PUBSUB_NANOMSG_DEFAULT_IP "127.0.0.1" - -template <typename key, typename value> -struct ProtectedMap { - std::mutex mutex{}; - std::map<key, value> map{}; -}; - -class pubsub_nanomsg_admin { -public: - pubsub_nanomsg_admin(celix_bundle_context_t *ctx); - pubsub_nanomsg_admin(const pubsub_nanomsg_admin&) = delete; - pubsub_nanomsg_admin& operator=(const pubsub_nanomsg_admin&) = delete; - ~pubsub_nanomsg_admin(); - void start(); - void stop(); - -private: - void addSerializerSvc(void *svc, const celix_properties_t *props); - void removeSerializerSvc(void */*svc*/, const celix_properties_t *props); - celix_status_t matchPublisher( - long svcRequesterBndId, - const celix_filter_t *svcFilter, - celix_properties_t **outTopicProperties, - double *outScore, - long *outsSerializerSvcId); - celix_status_t matchSubscriber( - long svcProviderBndId, - const celix_properties_t *svcProperties, - celix_properties_t **outTopicProperties, - double *outScope, - long *outSerializerSvcId); - celix_status_t matchEndpoint(const celix_properties_t *endpoint, bool *match); - - celix_status_t setupTopicSender( - const char *scope, - const char *topic, - const celix_properties_t *topicProperties, - long serializerSvcId, - celix_properties_t **outPublisherEndpoint); - - celix_status_t teardownTopicSender(const char *scope, const char *topic); - - celix_status_t setupTopicReceiver( - const std::string &scope, - const std::string &topic, - const celix_properties_t *topicProperties, - long serializerSvcId, - celix_properties_t **outSubscriberEndpoint); - - celix_status_t teardownTopicReceiver(const char *scope, const char *topic); - - celix_status_t addEndpoint(const celix_properties_t *endpoint); - celix_status_t removeEndpoint(const celix_properties_t *endpoint); - - celix_status_t executeCommand(char *commandLine CELIX_UNUSED, FILE *out, - FILE *errStream CELIX_UNUSED); - - celix_status_t connectEndpointToReceiver(pubsub::nanomsg::topic_receiver *receiver, - const celix_properties_t *endpoint); - - celix_status_t disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver, - const celix_properties_t *endpoint); - celix_bundle_context_t *ctx; - celix::pubsub::nanomsg::LogHelper L; - pubsub_admin_service_t adminService{}; - long adminSvcId = -1L; - long cmdSvcId = -1L; - command_service_t cmdSvc{}; - long serializersTrackerId = -1L; - - const char *fwUUID{}; - - char* ipAddress{}; - - unsigned int basePort{}; - unsigned int maxPort{}; - - double qosSampleScore{}; - double qosControlScore{}; - double defaultScore{}; - - bool verbose{}; - - class psa_nanomsg_serializer_entry { - public: - psa_nanomsg_serializer_entry(const char*_serType, long _svcId, pubsub_serializer_service_t *_svc) : - serType{_serType}, svcId{_svcId}, svc{_svc} { - - } - - const char *serType; - long svcId; - pubsub_serializer_service_t *svc; - }; - ProtectedMap<long, psa_nanomsg_serializer_entry> serializers{}; - ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender> topicSenders{}; - ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> topicReceivers{}; - ProtectedMap<const std::string, celix_properties_t *> discoveredEndpoints{}; -}; - -#ifdef __cplusplus -extern "C" { -#endif - -#ifdef __cplusplus -} -#endif - - -#endif //CELIX_PUBSUB_NANOMSG_ADMIN_H diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc deleted file mode 100644 index ccafd436..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.cc +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <memory.h> -#include "pubsub_nanomsg_common.h" -#include "celix_compiler.h" - -int celix::pubsub::nanomsg::localMsgTypeIdForMsgType(void *handle CELIX_UNUSED, const char *msgType, - unsigned int *msgTypeId) { - *msgTypeId = utils_stringHash(msgType); - return 0; -} - -bool celix::pubsub::nanomsg::checkVersion(version_pt msgVersion, const celix::pubsub::nanomsg::msg_header *hdr) { - bool check=false; - int major=0,minor=0; - - if (msgVersion!=NULL) { - version_getMajor(msgVersion,&major); - version_getMinor(msgVersion,&minor); - if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */ - check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ - } - } - - return check; -} - -std::string celix::pubsub::nanomsg::setScopeAndTopicFilter(const std::string &scope, const std::string &topic) { - std::string result(""); - if (scope.size() >= 2) { //3 ?? - result += scope[0]; - result += scope[1]; - } - if (topic.size() >= 2) { //3 ?? - result += topic[0]; - result += topic[1]; - } - return result; -} \ No newline at end of file diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h deleted file mode 100644 index 465669bf..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_common.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef CELIX_PUBSUB_NANOMSG_COMMON_H -#define CELIX_PUBSUB_NANOMSG_COMMON_H - -#include <string> -#include <sstream> -#include <utils.h> - -#include "version.h" -#include "log_helper.h" - -/* - * NOTE zmq is used by first sending three frames: - * 1) A subscription filter. - * This is a 5 char string of the first two chars of scope and topic combined and terminated with a '\0'. - * - * 2) The pubsub_zmq_msg_header_t is send containing the type id and major/minor version - * - * 3) The actual payload - */ - -namespace celix { namespace pubsub { namespace nanomsg { - struct msg_header { - //header - unsigned int type; - unsigned char major; - unsigned char minor; - }; - int localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId); - std::string setScopeAndTopicFilter(const std::string &scope, const std::string &topic); - - bool checkVersion(version_pt msgVersion, const celix::pubsub::nanomsg::msg_header *hdr); - -}}} - - - -#endif //CELIX_PUBSUB_NANOMSG_COMMON_H diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc deleted file mode 100644 index 443f2cee..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <iostream> -#include <mutex> -#include <memory.h> -#include <vector> -#include <string> -#include <sstream> - -#include <stdlib.h> -#include <assert.h> - -#include <sys/epoll.h> -#include <arpa/inet.h> - -#include <nanomsg/nn.h> -#include <nanomsg/bus.h> - -#include <pubsub_serializer.h> -#include <pubsub/subscriber.h> -#include <pubsub_constants.h> -#include <pubsub_endpoint.h> -#include <LogHelper.h> - -#include "pubsub_nanomsg_topic_receiver.h" -#include "pubsub_psa_nanomsg_constants.h" -#include "pubsub_nanomsg_common.h" -#include "pubsub_topology_manager.h" - -//TODO see if block and wakeup (reset) also works -#define PSA_NANOMSG_RECV_TIMEOUT 100 //100 msec timeout - -/* -#define L_DEBUG(...) \ - logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__) -#define L_INFO(...) \ - logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__) -#define L_WARN(...) \ - logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__) -#define L_ERROR(...) \ - logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) -*/ -//#define L_DEBUG printf -//#define L_INFO printf -//#define L_WARN printf -//#define L_ERROR printf - - -pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx, - const std::string &_scope, - const std::string &_topic, - long _serializerSvcId, - pubsub_serializer_service_t *_serializer) : L{_ctx, "NANOMSG_topic_receiver"}, m_serializerSvcId{_serializerSvcId}, m_scope{_scope}, m_topic{_topic} { - ctx = _ctx; - serializer = _serializer; - - m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS); - if (m_nanoMsgSocket < 0) { - L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for scope/topic: ", m_scope.c_str(), "/", m_topic.c_str()); - std::bad_alloc{}; - } else { - int timeout = PSA_NANOMSG_RECV_TIMEOUT; - if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, - sizeof (timeout)) < 0) { - L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for ",m_scope, "/",m_topic, ", set sockopt RECV_TIMEO failed"); - std::bad_alloc{}; - } - - auto subscriberFilter = celix::pubsub::nanomsg::setScopeAndTopicFilter(m_scope, m_topic); - - auto opts = createOptions(); - - subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); - recvThread.running = true; - free ((void*)opts.filter.filter); - recvThread.thread = std::thread([this]() {this->recvThread_exec();}); - } -} - -celix_service_tracking_options_t pubsub::nanomsg::topic_receiver::createOptions() { - std::stringstream filter_str; - - filter_str << "(" << PUBSUB_SUBSCRIBER_TOPIC << "=" << m_topic << ")"; - celix_service_tracking_options_t opts{}; - opts.filter.ignoreServiceLanguage = true; - opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; - opts.filter.filter = strdup(filter_str.str().c_str()); // TODO : memory leak ?? - opts.callbackHandle = this; - opts.addWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) { - static_cast<pubsub::nanomsg::topic_receiver*>(handle)->addSubscriber(svc, props, svcOwner); - }; - opts.removeWithOwner = [](void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *svcOwner) { - static_cast<pubsub::nanomsg::topic_receiver*>(handle)->removeSubscriber(svc, props, svcOwner); - }; - return opts; -} - -pubsub::nanomsg::topic_receiver::~topic_receiver() { - - { - std::lock_guard<std::mutex> _lock(recvThread.mutex); - recvThread.running = false; - } - recvThread.thread.join(); - - celix_bundleContext_stopTracker(ctx, subscriberTrackerId); - - { - std::lock_guard<std::mutex> _lock(subscribers.mutex); - for (auto elem : subscribers.map) { - serializer->destroySerializerMap(serializer->handle, elem.second.msgTypes); - } - subscribers.map.clear(); - } - nn_close(m_nanoMsgSocket); - -} - -std::string pubsub::nanomsg::topic_receiver::scope() const { - return m_scope; -} - -std::string pubsub::nanomsg::topic_receiver::topic() const { - return m_topic; -} - -long pubsub::nanomsg::topic_receiver::serializerSvcId() const { - return m_serializerSvcId; -} - -void pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> &connectedUrls, - std::vector<std::string> &unconnectedUrls) { - std::lock_guard<std::mutex> _lock(requestedConnections.mutex); - for (auto entry : requestedConnections.map) { - if (entry.second.isConnected()) { - connectedUrls.emplace_back(entry.second.getUrl()); - } else { - unconnectedUrls.emplace_back(entry.second.getUrl()); - } - } -} - - -void pubsub::nanomsg::topic_receiver::connectTo(const char *url) { - L.DBG("[PSA_NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " connecting to nanomsg url ", url); - - std::lock_guard<std::mutex> _lock(requestedConnections.mutex); - auto entry = requestedConnections.map.find(url); - if (entry == requestedConnections.map.end()) { - requestedConnections.map.emplace( - std::piecewise_construct, - std::forward_as_tuple(std::string(url)), - std::forward_as_tuple(url, -1)); - entry = requestedConnections.map.find(url); - } - if (!entry->second.isConnected()) { - int connection_id = nn_connect(m_nanoMsgSocket, url); - if (connection_id >= 0) { - entry->second.setConnected(true); - entry->second.setId(connection_id); - } else { - L.WARN("[PSA_NANOMSG] Error connecting to NANOMSG url ", url, " (",strerror(errno), ")"); - } - } -} - -void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) { - L.DBG("[PSA NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " disconnect from nanomsg url ", url); - - std::lock_guard<std::mutex> _lock(requestedConnections.mutex); - auto entry = requestedConnections.map.find(url); - if (entry != requestedConnections.map.end()) { - if (entry->second.isConnected()) { - if (nn_shutdown(m_nanoMsgSocket, entry->second.getId()) == 0) { - entry->second.setConnected(false); - } else { - L.WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url ", url, ", id: ", entry->second.getId(), " (",strerror(errno),")"); - } - } - requestedConnections.map.erase(url); - std::cerr << "REMOVING connection " << url << std::endl; - } else { - std::cerr << "Disconnecting from unknown URL " << url << std::endl; - } -} - -void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const celix_properties_t *props, - const celix_bundle_t *bnd) { - long bndId = celix_bundle_getId(bnd); - std::string subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_DEFAULT_ENDPOINT_SCOPE); - if (subScope != m_scope) { - //not the same scope. ignore - return; - } - - std::lock_guard<std::mutex> _lock(subscribers.mutex); - auto entry = subscribers.map.find(bndId); - if (entry != subscribers.map.end()) { - entry->second.usageCount += 1; - } else { - //new create entry - subscribers.map.emplace(std::piecewise_construct, - std::forward_as_tuple(bndId), - std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 1)); - entry = subscribers.map.find(bndId); - - int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)bnd, &entry->second.msgTypes); - if (rc != 0) { - L.ERROR("[PSA_NANOMSG] Cannot create msg serializer map for TopicReceiver ", m_scope.c_str(), "/", m_topic.c_str()); - subscribers.map.erase(bndId); - } - } -} - -void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/, - const celix_properties_t */*props*/, const celix_bundle_t *bnd) { - long bndId = celix_bundle_getId(bnd); - - std::lock_guard<std::mutex> _lock(subscribers.mutex); - auto entry = subscribers.map.find(bndId); - if (entry != subscribers.map.end()) { - entry->second.usageCount -= 1; - if (entry->second.usageCount <= 0) { - //remove entry - int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes); - if (rc != 0) { - L.ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for TopicReceiver ", m_scope.c_str(), "/",m_topic.c_str(),"\n"); - } - subscribers.map.erase(bndId); - } - } -} - -void pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const celix::pubsub::nanomsg::msg_header *hdr, const char* payload, size_t payloadSize) { - pubsub_msg_serializer_t* msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, (void*)(uintptr_t)(hdr->type))); - pubsub_subscriber_t *svc = entry->svc; - - if (msgSer!= NULL) { - void *deserializedMsg = NULL; - bool validVersion = celix::pubsub::nanomsg::checkVersion(msgSer->msgVersion, hdr); - if (validVersion) { - celix_status_t status = msgSer->deserialize(msgSer, payload, payloadSize, &deserializedMsg); - if (status == CELIX_SUCCESS) { - bool release = false; - svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release); - if (release) { - msgSer->freeMsg(msgSer->handle, deserializedMsg); - } - } else { - L.WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type ", msgSer->msgName , "for scope/topic ", scope(), "/", topic()); - } - } - } else { - L.WARN("[PSA_NANOMSG_TR] Cannot find serializer for type id ", hdr->type); - } -} - -void pubsub::nanomsg::topic_receiver::processMsg(const celix::pubsub::nanomsg::msg_header *hdr, const char *payload, size_t payloadSize) { - std::lock_guard<std::mutex> _lock(subscribers.mutex); - for (auto entry : subscribers.map) { - processMsgForSubscriberEntry(&entry.second, hdr, payload, payloadSize); - } -} - -struct Message { - celix::pubsub::nanomsg::msg_header header; - char payload[]; -}; - -void pubsub::nanomsg::topic_receiver::recvThread_exec() { - while (recvThread.running) { - Message *msg = nullptr; - nn_iovec iov[2]; - iov[0].iov_base = &msg; - iov[0].iov_len = NN_MSG; - - nn_msghdr msgHdr; - memset(&msgHdr, 0, sizeof(msgHdr)); - - msgHdr.msg_iov = iov; - msgHdr.msg_iovlen = 1; - - msgHdr.msg_control = nullptr; - msgHdr.msg_controllen = 0; - - errno = 0; - int recvBytes = nn_recvmsg(m_nanoMsgSocket, &msgHdr, 0); - if (msg && static_cast<unsigned long>(recvBytes) >= sizeof(celix::pubsub::nanomsg::msg_header)) { - processMsg(&msg->header, msg->payload, recvBytes-sizeof(msg->header)); - nn_freemsg(msg); - } else if (recvBytes >= 0) { - L.ERROR("[PSA_NANOMSG_TR] Error receiving nanomsg msg, size (", recvBytes,") smaller than header\n"); - } else if (errno == EAGAIN || errno == ETIMEDOUT) { - // no data: go to next cycle - } else if (errno == EINTR) { - L.DBG("[PSA_NANOMSG_TR] nn_recvmsg interrupted"); - } else { - L.WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno ", errno, " : ", strerror(errno), "\n"); - } - } // while - -} diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h deleted file mode 100644 index e93ea30f..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.h +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -#include <string> -#include <vector> -#include <thread> -#include <mutex> -#include <map> -#include "pubsub_serializer.h" -#include "LogHelper.h" -#include "celix_bundle_context.h" -#include "pubsub_nanomsg_common.h" -#include "pubsub/subscriber.h" - -struct psa_nanomsg_subscriber_entry { - psa_nanomsg_subscriber_entry(pubsub_subscriber_t *_svc, int _usageCount) : - svc{_svc}, usageCount{_usageCount} { - } - pubsub_subscriber_t *svc{}; - int usageCount; - hash_map_t *msgTypes{nullptr}; //map from serializer svc -}; - -typedef struct psa_nanomsg_requested_connection_entry { -public: - psa_nanomsg_requested_connection_entry(std::string _url, int _id, bool _connected=false): - url{_url}, id{_id}, connected{_connected} { - } - bool isConnected() const { - return connected; - } - - int getId() const { - return id; - } - - void setId(int _id) { - id = _id; - } - void setConnected(bool c) { - connected = c; - } - - const std::string &getUrl() const { - return url; - } -private: - std::string url; - int id; - bool connected; -} psa_nanomsg_requested_connection_entry_t; - -namespace pubsub { - namespace nanomsg { - class topic_receiver { - public: - topic_receiver(celix_bundle_context_t - *ctx, - const std::string &scope, - const std::string &topic, - long serializerSvcId, pubsub_serializer_service_t - *serializer); - topic_receiver(const topic_receiver &) = delete; - topic_receiver & operator=(const topic_receiver &) = delete; - ~topic_receiver(); - - std::string scope() const; - std::string topic() const; - long serializerSvcId() const; - void listConnections(std::vector<std::string> &connectedUrls, std::vector<std::string> &unconnectedUrls); - void connectTo(const char *url); - void disconnectFrom(const char *url); - void recvThread_exec(); - void processMsg(const celix::pubsub::nanomsg::msg_header *hdr, const char *payload, size_t payloadSize); - void processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry* entry, const celix::pubsub::nanomsg::msg_header *hdr, const char* payload, size_t payloadSize); - void addSubscriber(void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); - void removeSubscriber(void */*svc*/, const celix_properties_t */*props*/, const celix_bundle_t *bnd); - celix_service_tracking_options_t createOptions(); - - private: - celix_bundle_context_t *ctx{nullptr}; - celix::pubsub::nanomsg::LogHelper L; - long m_serializerSvcId{0}; - pubsub_serializer_service_t *serializer{nullptr}; - const std::string m_scope{}; - const std::string m_topic{}; - - int m_nanoMsgSocket{0}; - - struct { - std::thread thread; - std::mutex mutex; - bool running; - } recvThread{}; - - struct { - std::mutex mutex; - std::map<std::string, psa_nanomsg_requested_connection_entry_t> map; - } requestedConnections{}; - - long subscriberTrackerId{0}; - struct { - std::mutex mutex; - std::map<long, psa_nanomsg_subscriber_entry> map; - } subscribers{}; - }; - } -} - diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc deleted file mode 100644 index 452ea785..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <memory.h> -#include <iostream> -#include <sstream> -#include <stdlib.h> -#include <utils.h> -#include <arpa/inet.h> -#include <LogHelper.h> -#include <nanomsg/nn.h> -#include <nanomsg/bus.h> - - -#include <pubsub_constants.h> -#include "pubsub_nanomsg_topic_sender.h" -#include "pubsub_psa_nanomsg_constants.h" -#include "pubsub_nanomsg_common.h" -#include "celix_compiler.h" - -#define FIRST_SEND_DELAY_IN_SECONDS 2 -#define NANOMSG_BIND_MAX_RETRY 10 - -static unsigned int rand_range(unsigned int min, unsigned int max); -static void delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper); - -pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, - const char *_scope, - const char *_topic, - long _serializerSvcId, - pubsub_serializer_service_t *_ser, - const char *_bindIp, - unsigned int _basePort, - unsigned int _maxPort) : - ctx{_ctx}, - L{ctx, "PSA_NANOMSG_TS"}, - serializerSvcId {_serializerSvcId}, - serializer{_ser}, - scope{_scope}, - topic{_topic}{ - - scopeAndTopicFilter = celix::pubsub::nanomsg::setScopeAndTopicFilter(_scope, _topic); - - //setting up nanomsg socket for nanomsg TopicSender - int nnSock = nn_socket(AF_SP, NN_BUS); - if (nnSock == -1) { - perror("Error for nanomsg_socket"); - } - - int rv = -1, retry=0; - while (rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) { - /* Randomized part due to same bundle publishing on different topics */ - unsigned int port = rand_range(_basePort,_maxPort); - std::stringstream _url; - _url << "tcp://" << _bindIp << ":" << port; - - std::stringstream bindUrl; - bindUrl << "tcp://0.0.0.0:" << port; - - rv = nn_bind (nnSock, bindUrl.str().c_str()); - if (rv == -1) { - perror("Error for nn_bind"); - } else { - this->url = _url.str(); - nanomsg.socket = nnSock; - } - retry++; - } - - if (!url.empty()) { - - //register publisher services using a service factory - publisher.factory.handle = this; - publisher.factory.getService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) { - return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService( - requestingBundle, - svcProperties); - }; - publisher.factory.ungetService = [](void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) { - return static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->ungetPublisherService( - requestingBundle, - svcProperties); - }; - - celix_properties_t *props = celix_properties_create(); - celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic.c_str()); - celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope.c_str()); - - celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; - opts.factory = &publisher.factory; - opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME; - opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION; - opts.properties = props; - - publisher.svcId = celix_bundleContext_registerServiceWithOptions(_ctx, &opts); - } - -} - -pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() { - celix_bundleContext_unregisterService(ctx, publisher.svcId); - - nn_close(nanomsg.socket); - std::lock_guard<std::mutex> lock(boundedServices.mutex); - for (auto &it: boundedServices.map) { - serializer->destroySerializerMap(serializer->handle, it.second.msgTypes); - } - boundedServices.map.clear(); - -} - -long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const { - return serializerSvcId; -} - -const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() const { - return scope; -} - -const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() const { - return topic; -} - -const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() const { - return url; -} - - -void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const celix_bundle_t *requestingBundle, - const celix_properties_t *svcProperties CELIX_UNUSED) { - long bndId = celix_bundle_getId(requestingBundle); - void *service{nullptr}; - std::lock_guard<std::mutex> lock(boundedServices.mutex); - auto existingEntry = boundedServices.map.find(bndId); - if (existingEntry != boundedServices.map.end()) { - existingEntry->second.getCount += 1; - service = &existingEntry->second.service; - } else { - auto entry = boundedServices.map.emplace(std::piecewise_construct, - std::forward_as_tuple(bndId), - std::forward_as_tuple(scope, topic, bndId, nanomsg.socket, ctx)); - int rc = serializer->createSerializerMap(serializer->handle, (celix_bundle_t*)requestingBundle, &entry.first->second.msgTypes); - - if (rc == 0) { - entry.first->second.service.handle = &entry.first->second; - entry.first->second.service.localMsgTypeIdForMsgType = celix::pubsub::nanomsg::localMsgTypeIdForMsgType; - entry.first->second.service.send = [](void *handle, unsigned int msgTypeId, const void *msg) { - return static_cast<pubsub::nanomsg::bounded_service_entry*>(handle)->topicPublicationSend(msgTypeId, msg); - }; - service = &entry.first->second.service; - } else { - boundedServices.map.erase(bndId); - L.ERROR("Error creating serializer map for NanoMsg TopicSender. Scope: ", scope, ", Topic: ", topic); - } - } - - return service; -} - -void pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const celix_bundle_t *requestingBundle, - const celix_properties_t */*svcProperties*/) { - long bndId = celix_bundle_getId(requestingBundle); - - std::lock_guard<std::mutex> lock(boundedServices.mutex); - auto entry = boundedServices.map.find(bndId); - if (entry != boundedServices.map.end()) { - entry->second.getCount -= 1; - if (entry->second.getCount == 0) { - int rc = serializer->destroySerializerMap(serializer->handle, entry->second.msgTypes); - if (rc != 0) { - L.ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n"); - } - boundedServices.map.erase(bndId); - } - } -} - -int pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int msgTypeId, const void *inMsg) { - int status; - auto msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes, (void*)(uintptr_t)msgTypeId)); - - if (msgSer != nullptr) { - delay_first_send_for_late_joiners(L); - - int major = 0, minor = 0; - - celix::pubsub::nanomsg::msg_header msg_hdr{}; - msg_hdr.type = msgTypeId; - - if (msgSer->msgVersion != nullptr) { - version_getMajor(msgSer->msgVersion, &major); - version_getMinor(msgSer->msgVersion, &minor); - msg_hdr.major = (unsigned char) major; - msg_hdr.minor = (unsigned char) minor; - } - - void *serializedOutput = nullptr; - size_t serializedOutputLen = 0; - status = msgSer->serialize(msgSer, inMsg, &serializedOutput, &serializedOutputLen); - if (status == CELIX_SUCCESS) { - nn_iovec data[2]; - - nn_msghdr msg{}; - msg.msg_iov = data; - msg.msg_iovlen = 2; - msg.msg_iov[0].iov_base = static_cast<void*>(&msg_hdr); - msg.msg_iov[0].iov_len = sizeof(msg_hdr); - msg.msg_iov[1].iov_base = serializedOutput; - msg.msg_iov[1].iov_len = serializedOutputLen; - msg.msg_control = nullptr; - msg.msg_controllen = 0; - errno = 0; - int rc = nn_sendmsg(nanoMsgSocket, &msg, 0 ); - free(serializedOutput); - if (rc < 0) { - L.WARN("[PSA_NANOMSG_TS] Error sending zmsg, rc: ", rc, ", error: ", strerror(errno)); - } else { - L.INFO("[PSA_NANOMSG_TS] Send message with size ", rc, "\n"); - L.INFO("[PSA_NANOMSG_TS] Send message ID ", msg_hdr.type, - " major: ", (int)msg_hdr.major, - " minor: ", (int)msg_hdr.minor,"\n"); - } - } else { - L.WARN("[PSA_NANOMSG_TS] Error serialize message of type ", msgSer->msgName, - " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n"); - } - } else { - status = CELIX_SERVICE_EXCEPTION; - L.WARN("[PSA_NANOMSG_TS] Error cannot serialize message with msg type id ", msgTypeId, - " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n"); - } - return status; -} - -static void delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper) { - - static bool firstSend = true; - - if (firstSend) { - logHelper.INFO("PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); - sleep(FIRST_SEND_DELAY_IN_SECONDS); - firstSend = false; - } -} - -static unsigned int rand_range(unsigned int min, unsigned int max) { - double scaled = ((double)random())/((double)RAND_MAX); - return (unsigned int)((max-min+1)*scaled + min); -} diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h deleted file mode 100644 index 3a29ad4e..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.h +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef CELIX_PUBSUB_NANOMSG_TOPIC_SENDER_H -#define CELIX_PUBSUB_NANOMSG_TOPIC_SENDER_H - -#include <mutex> -#include <map> -#include "celix_bundle_context.h" -#include "celix_compiler.h" -#include <log_helper.h> -#include <pubsub_serializer.h> -#include <pubsub/publisher.h> - -namespace pubsub { - namespace nanomsg { - - class bounded_service_entry { - public: - bounded_service_entry( - std::string &_scope, - std::string &_topic, - long _bndId, - int _nanoMsgSocket, - celix_bundle_context_t *_context) : scope{_scope}, topic{_topic}, bndId{_bndId}, nanoMsgSocket{_nanoMsgSocket}, L{_context, "nanomsg_bounded_service_entry"} { - - } - bounded_service_entry(const bounded_service_entry&) = delete; - bounded_service_entry &operator=(const bounded_service_entry&) = delete; - int topicPublicationSend(unsigned int msgTypeId, const void *inMsg); - - pubsub_publisher_t service{}; - std::string scope; - std::string topic; - long bndId{}; - hash_map_t *msgTypes{}; - int getCount{1}; - int nanoMsgSocket{}; - celix::pubsub::nanomsg::LogHelper L; - } ; - - - class pubsub_nanomsg_topic_sender { - public: - pubsub_nanomsg_topic_sender(celix_bundle_context_t *_ctx, - const char *_scope, - const char *_topic, long _serializerSvcId, pubsub_serializer_service_t *_ser, - const char *_bindIp, unsigned int _basePort, unsigned int _maxPort); - - ~pubsub_nanomsg_topic_sender(); - - pubsub_nanomsg_topic_sender(const pubsub_nanomsg_topic_sender &) = delete; - - const pubsub_nanomsg_topic_sender &operator=(const pubsub_nanomsg_topic_sender &) = delete; - - long getSerializerSvcId() const ; - const std::string &getScope() const ; - const std::string &getTopic() const ; - const std::string &getUrl() const; - - void* getPublisherService(const celix_bundle_t *requestingBundle, - const celix_properties_t *svcProperties CELIX_UNUSED); - void ungetPublisherService(const celix_bundle_t *requestingBundle, - const celix_properties_t *svcProperties CELIX_UNUSED); - int topicPublicationSend(unsigned int msgTypeId, const void *inMsg); - void delay_first_send_for_late_joiners() ; - - //private: - celix_bundle_context_t *ctx; - celix::pubsub::nanomsg::LogHelper L; - long serializerSvcId; - pubsub_serializer_service_t *serializer; - - std::string scope{}; - std::string topic{}; - std::string scopeAndTopicFilter{}; - std::string url{}; - - struct { - std::mutex mutex; - int socket; - } nanomsg{}; - - struct { - long svcId; - celix_service_factory_t factory; - } publisher{}; - - struct { - std::mutex mutex{}; - std::map<long, bounded_service_entry> map{}; - //hash_map_t *map{}; //key = bndId, value = psa_nanomsg_bounded_service_entry_t - } boundedServices{}; - }; - } -} - -#endif //CELIX_PUBSUB_NANOMSG_TOPIC_SENDER_H diff --git a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h b/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h deleted file mode 100644 index 1700ee42..00000000 --- a/misc/experimental/bundles/pubsub_admin_nanomsg/src/pubsub_psa_nanomsg_constants.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef PUBSUB_PSA_NANOMSG_CONSTANTS_H_ -#define PUBSUB_PSA_NANOMSG_CONSTANTS_H_ - - -#define PSA_NANOMSG_BASE_PORT "PSA_NANOMSG_BASE_PORT" -#define PSA_NANOMSG_MAX_PORT "PSA_NANOMSG_MAX_PORT" - -#define PSA_NANOMSG_DEFAULT_BASE_PORT 5501 -#define PSA_NANOMSG_DEFAULT_MAX_PORT 6000 - -#define PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE 30 -#define PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE 70 -#define PSA_NANOMSG_DEFAULT_SCORE 30 - -#define PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY "PSA_NANOMSG_QOS_SAMPLE_SCORE" -#define PSA_NANOMSG_QOS_CONTROL_SCORE_KEY "PSA_NANOMSG_QOS_CONTROL_SCORE" -#define PSA_NANOMSG_DEFAULT_SCORE_KEY "PSA_NANOMSG_DEFAULT_SCORE" - - -#endif /* PUBSUB_PSA_NANOMSG_CONSTANTS_H_ */
