This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 363d491f4ae2e977735c94d56c0ade5c799c6ce0
Merge: b3d28d1 d1d001e
Author: Pepijn Noltes <[email protected]>
AuthorDate: Mon Jan 7 11:47:33 2019 +0100

    Merge branch 'feature/CELIX-454-pubsub-disc' into develop

 .travis.yml                                        |    1 +
 bundles/log_service/src/log_helper.c               |   52 +-
 bundles/pubsub/CMakeLists.txt                      |    2 +-
 bundles/pubsub/examples/CMakeLists.txt             |   42 +-
 .../pubsub/examples/mp_pubsub/common/include/ew.h  |   53 -
 .../pubsub/examples/mp_pubsub/common/include/ide.h |   49 -
 .../examples/mp_pubsub/common/include/kinematics.h |   55 -
 .../mp_pubsub/msg_descriptors/msg_ew.descriptor    |    9 -
 .../mp_pubsub/msg_descriptors/msg_ide.descriptor   |    9 -
 .../msg_descriptors/msg_kinematics.descriptor      |   10 -
 .../examples/mp_pubsub/publisher/CMakeLists.txt    |   43 -
 .../private/include/mp_publisher_private.h         |   58 -
 .../publisher/private/src/mp_pub_activator.c       |  144 --
 .../mp_pubsub/publisher/private/src/mp_publisher.c |  160 ---
 .../examples/mp_pubsub/subscriber/CMakeLists.txt   |   43 -
 .../private/include/mp_subscriber_private.h        |   50 -
 .../subscriber/private/src/mp_sub_activator.c      |  108 --
 .../subscriber/private/src/mp_subscriber.c         |  119 --
 .../private/include/pubsub_subscriber_private.h    |    2 +-
 .../subscriber/private/src/pubsub_subscriber.c     |    2 +-
 bundles/pubsub/mock/src/publisher_mock.cc          |   14 -
 .../src/pubsub_nanomsg_admin.cc                    |   39 +-
 .../src/pubsub_nanomsg_admin.h                     |   35 +-
 .../src/pubsub_nanomsg_topic_receiver.cc           |    2 +-
 .../src/pubsub_nanomsg_topic_sender.cc             |    1 -
 .../pubsub/pubsub_admin_udp_mc/src/psa_activator.c |    6 +-
 .../src/pubsub_psa_udpmc_constants.h               |   29 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c   |   67 +-
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h   |   21 +-
 .../src/pubsub_udpmc_topic_receiver.c              |  242 +++-
 .../src/pubsub_udpmc_topic_receiver.h              |    8 +-
 .../src/pubsub_udpmc_topic_sender.c                |   36 +-
 .../src/pubsub_udpmc_topic_sender.h                |    5 +-
 .../pubsub/pubsub_admin_zmq/src/psa_activator.c    |    6 +-
 .../src/pubsub_psa_zmq_constants.h                 |   42 +-
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c |   50 +-
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.h |   27 +-
 .../src/pubsub_zmq_topic_receiver.c                |  127 +-
 .../src/pubsub_zmq_topic_receiver.h                |    1 +
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c |   68 +-
 .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.h |    2 +
 .../sync.h => pubsub_api/include/pubsub/api.h}     |   13 +-
 .../pubsub/pubsub_api/include/pubsub/publisher.h   |   21 +-
 .../pubsub/pubsub_api/include/pubsub/subscriber.h  |   20 +-
 .../pubsub/pubsub_discovery/src/psd_activator.c    |    2 +-
 .../pubsub_discovery/src/pubsub_discovery_impl.c   |   14 +-
 .../pubsub_discovery/src/pubsub_discovery_impl.h   |    2 +-
 .../src/ps_json_serializer_activator.c             |    8 +-
 .../src/pubsub_serializer_impl.c                   |   76 +-
 .../src/pubsub_serializer_impl.h                   |   18 +-
 bundles/pubsub/pubsub_spi/include/pubsub_admin.h   |   18 +-
 .../pubsub/pubsub_spi/include/pubsub_listeners.h   |    7 +-
 .../pubsub/pubsub_spi/include/pubsub_serializer.h  |    1 +
 bundles/pubsub/pubsub_spi/include/pubsub_utils.h   |   39 +-
 bundles/pubsub/pubsub_spi/src/pubsub_utils.c       |    4 +-
 bundles/pubsub/pubsub_spi/src/pubsub_utils_match.c |   12 +-
 .../src/pubsub_topology_manager.c                  | 1429 +++++++++++---------
 .../src/pubsub_topology_manager.h                  |    2 +-
 bundles/pubsub/test/CMakeLists.txt                 |  146 +-
 .../{msg_descriptors => meta_data}/msg.descriptor  |    2 +-
 .../meta_data/ping.properties}                     |   14 +-
 .../pubsub/test/msg_descriptors/sync.descriptor    |    9 -
 bundles/pubsub/test/test/sut_activator.c           |  123 +-
 bundles/pubsub/test/test/test_runner.cc            |   19 +
 bundles/pubsub/test/test/tst_activator.cc          |  120 ++
 bundles/pubsub/test/test/tst_activator.cpp         |  221 ---
 cmake/cmake_celix/ContainerPackaging.cmake         |    6 +-
 libs/framework/include/celix_bundle_activator.h    |    2 +-
 libs/framework/include/celix_log.h                 |    2 +
 libs/framework/private/test/bundle_cache_test.cpp  |    2 +
 libs/framework/private/test/bundle_test.cpp        |   36 +-
 libs/framework/src/bundle.c                        |   15 +-
 libs/framework/src/bundle_context.c                |    3 +-
 libs/framework/src/bundle_private.h                |    1 +
 libs/framework/src/celix_log.c                     |   31 +-
 libs/utils/CMakeLists.txt                          |    9 +-
 libs/utils/src/properties.c                        |    4 +-
 77 files changed, 1892 insertions(+), 2398 deletions(-)

diff --cc bundles/pubsub/examples/CMakeLists.txt
index 126db2d,2e43e6d..dd9ca54
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@@ -251,104 -223,6 +223,92 @@@ if (BUILD_PUBSUB_PSA_ZMQ
                  etcd
              USE_TERM
          )
- 
-         #Runtime starting a multipart (multiple message in one send) publish 
and subscriber for zmq
-         add_celix_runtime(pubsub_rt_multipart_zmq
-             NAME zmq_multipart
-             GROUP pubsub
-             CONTAINERS
-                 pubsub_mp_subscriber_zmq
-                 pubsub_mp_publisher_zmq
-             COMMANDS
-                 etcd
-             USE_TERM
-         )
      endif ()
 +endif()
 +if (BUILD_PUBSUB_PSA_NANOMSG)
 +    add_celix_container("pubsub_publisher1_nanomsg"
 +            GROUP "pubsub"
 +            BUNDLES
 +            Celix::shell
 +            Celix::shell_tui
 +            Celix::pubsub_serializer_json
 +            Celix::pubsub_discovery_etcd
 +            Celix::pubsub_topology_manager
 +            Celix::pubsub_admin_nanomsg
 +            celix_pubsub_poi_publisher
 +            PROPERTIES
 +            PSA_NANOMSG_VERBOSE=true
 +            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
 +            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
 +            )
 +    target_link_libraries(pubsub_publisher1_nanomsg PRIVATE 
${PUBSUB_CONTAINER_LIBS})
 +
 +    add_celix_container("pubsub_publisher2_nanomsg"
 +            GROUP "pubsub"
 +            BUNDLES
 +            Celix::shell
 +            Celix::shell_tui
 +            Celix::pubsub_serializer_json
 +            Celix::pubsub_discovery_etcd
 +            Celix::pubsub_topology_manager
 +            Celix::pubsub_admin_nanomsg
 +            celix_pubsub_poi_publisher
 +            PROPERTIES
 +            PSA_NANOMSG_VERBOSE=true
 +            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
 +            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
 +            )
 +    target_link_libraries(pubsub_publisher2_nanomsg PRIVATE 
${PUBSUB_CONTAINER_LIBS})
 +
 +    add_celix_container(pubsub_subscriber1_nanomsg
 +            GROUP "pubsub"
 +            BUNDLES
 +            Celix::shell
 +            Celix::shell_tui
 +            Celix::pubsub_serializer_json
 +            Celix::pubsub_discovery_etcd
 +            Celix::pubsub_topology_manager
 +            Celix::pubsub_admin_nanomsg
 +            celix_pubsub_poi_subscriber
 +            PROPERTIES
 +            PSA_NANOMSG_VERBOSE=true
 +            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
 +            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
 +            )
 +    target_link_libraries(pubsub_subscriber1_nanomsg PRIVATE 
${PUBSUB_CONTAINER_LIBS})
 +
 +    add_celix_container(pubsub_subscriber2_nanomsg
 +            GROUP "pubsub"
 +            BUNDLES
 +            Celix::shell
 +            Celix::shell_tui
 +            Celix::pubsub_serializer_json
 +            Celix::pubsub_discovery_etcd
 +            Celix::pubsub_topology_manager
 +            Celix::pubsub_admin_nanomsg
 +            celix_pubsub_poi_subscriber
 +            PROPERTIES
 +            PSA_NANOMSG_VERBOSE=true
 +            PUBSUB_ETCD_DISCOVERY_VERBOSE=true
 +            PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
 +            )
 +    target_link_libraries(pubsub_subscriber2_nanomsg PRIVATE 
${PUBSUB_CONTAINER_LIBS})
 +
 +
 +    if (ETCD_CMD AND XTERM_CMD)
 +        #Runtime starting a publish and 2 subscribers for zmq
 +        add_celix_runtime(pubsub_rt_nanomsg
-                 NAME zmq
++                NAME nanomsg
 +                GROUP pubsub
 +                CONTAINERS
 +                pubsub_publisher1_nanomsg
 +                pubsub_publisher2_nanomsg
 +                pubsub_subscriber1_nanomsg
 +                pubsub_subscriber2_nanomsg
 +                COMMANDS
 +                etcd
 +                USE_TERM
 +                )
 +    endif ()
  
  endif()
diff --cc bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 8f3c229,0000000..f44a067
mode 100644,000000..100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@@ -1,609 -1,0 +1,614 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <string>
 +#include <vector>
 +#include <functional>
 +#include <memory.h>
 +#include <iostream>
 +#include <sys/socket.h>
 +#include <netinet/in.h>
 +#include <arpa/inet.h>
 +#include <netdb.h>
 +#include <ifaddrs.h>
 +#include <pubsub_endpoint.h>
 +#include <algorithm>
 +
 +#include "pubsub_utils.h"
 +#include "pubsub_nanomsg_admin.h"
 +#include "pubsub_psa_nanomsg_constants.h"
 +
 +
 +static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip);
 +
 +pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx):
 +    ctx{_ctx},
 +    L{ctx, "pubsub_nanomsg_admin"} {
 +    verbose = celix_bundleContext_getPropertyAsBool(ctx, 
PUBSUB_NANOMSG_VERBOSE_KEY, PUBSUB_NANOMSG_VERBOSE_DEFAULT);
 +    fwUUID = celix_bundleContext_getProperty(ctx, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, nullptr);
 +
 +    char *ip = nullptr;
 +    const char *confIp = celix_bundleContext_getProperty(ctx, 
PUBSUB_NANOMSG_PSA_IP_KEY , nullptr);
 +    if (confIp != nullptr) {
 +        ip = strndup(confIp, 1024);
 +    }
 +
 +    if (ip == nullptr) {
 +        //TODO try to get ip from subnet (CIDR)
 +    }
 +
 +    if (ip == nullptr) {
 +        //try to get ip from itf
 +        const char *interface = celix_bundleContext_getProperty(ctx, 
PUBSUB_NANOMSG_PSA_ITF_KEY, nullptr);
 +        nanoMsg_getIpAddress(interface, &ip);
 +    }
 +
 +    if (ip == nullptr) {
 +        L.WARN("[PSA_NANOMSG] Could not determine IP address for PSA, using 
default ip (", PUBSUB_NANOMSG_DEFAULT_IP, ")");
 +        ip = strndup(PUBSUB_NANOMSG_DEFAULT_IP, 1024);
 +    }
 +
 +    ipAddress = ip;
 +    if (verbose) {
 +        L.INFO("[PSA_NANOMSG] Using ", ip, " for service annunciation.");
 +    }
 +
 +
 +    long _basePort = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_NANOMSG_BASE_PORT, PSA_NANOMSG_DEFAULT_BASE_PORT);
 +    long _maxPort = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_NANOMSG_MAX_PORT, PSA_NANOMSG_DEFAULT_MAX_PORT);
 +    basePort = (unsigned int)_basePort;
 +    maxPort = (unsigned int)_maxPort;
 +    if (verbose) {
 +        L.INFO("[PSA_NANOMSG] Using base till max port: ", _basePort, " till 
", _maxPort);
 +    }
 +
 +
 +    defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_NANOMSG_DEFAULT_SCORE_KEY, PSA_NANOMSG_DEFAULT_SCORE);
 +    qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_NANOMSG_QOS_SAMPLE_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_SAMPLE_SCORE);
 +    qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_NANOMSG_QOS_CONTROL_SCORE_KEY, PSA_NANOMSG_DEFAULT_QOS_CONTROL_SCORE);
 +}
 +
 +pubsub_nanomsg_admin::~pubsub_nanomsg_admin() {
 +    //note assuming al psa register services and service tracker are removed.
 +    {
 +//        std::lock_guard<std::mutex> lock(topicSenders.mutex);
 +//        for (auto &kv : topicSenders.map) {
 +//            auto &sender = kv.second;
 +//            delete (sender);
 +//        }
 +    }
 +
 +    {
 +        std::lock_guard<std::mutex> lock(topicReceivers.mutex);
 +        for (auto &kv: topicReceivers.map) {
 +            delete kv.second;
 +        }
 +    }
 +
 +    {
 +        std::lock_guard<std::mutex> lock(discoveredEndpoints.mutex);
 +        for (auto &entry : discoveredEndpoints.map) {
 +            auto *ep = entry.second;
 +            celix_properties_destroy(ep);
 +        }
 +    }
 +
 +    {
 +        std::lock_guard<std::mutex> lock(serializers.mutex);
 +        serializers.map.clear();
 +    }
 +
 +    free(ipAddress);
 +
 +}
 +
 +void pubsub_nanomsg_admin::start() {
 +    adminService.handle = this;
-     adminService.matchPublisher = [](void *handle, long svcRequesterBndId, 
const celix_filter_t *svcFilter, double *score, long *serializerSvcId) {
++    adminService.matchPublisher = [](void *handle, long svcRequesterBndId, 
const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties, 
double *score, long *serializerSvcId) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
-         return me->matchPublisher(svcRequesterBndId, svcFilter,score, 
serializerSvcId);
++        return me->matchPublisher(svcRequesterBndId, svcFilter, 
outTopicProperties, score, serializerSvcId);
 +    };
-     adminService.matchSubscriber = [](void *handle, long svcProviderBndId, 
const celix_properties_t *svcProperties, double *score, long *serializerSvcId) {
++    adminService.matchSubscriber = [](void *handle, long svcProviderBndId, 
const celix_properties_t *svcProperties, celix_properties_t 
**outTopicProperties, double *score, long *serializerSvcId) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
-         return me->matchSubscriber(svcProviderBndId, svcProperties, score, 
serializerSvcId);
++        return me->matchSubscriber(svcProviderBndId, svcProperties, 
outTopicProperties, score, serializerSvcId);
 +    };
-     adminService.matchEndpoint = [](void *handle, const celix_properties_t 
*endpoint, bool *match) {
++    adminService.matchDiscoveredEndpoint = [](void *handle, const 
celix_properties_t *endpoint, bool *match) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->matchEndpoint(endpoint, match);
 +    };
-     adminService.setupTopicSender = [](void *handle, const char *scope, const 
char *topic, long serializerSvcId, celix_properties_t **publisherEndpoint) {
++    adminService.setupTopicSender = [](void *handle, const char *scope, const 
char *topic, const celix_properties_t *topicProperties, long serializerSvcId, 
celix_properties_t **publisherEndpoint) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
-         return me->setupTopicSender(scope, topic, serializerSvcId, 
publisherEndpoint);
++        return me->setupTopicSender(scope, topic, topicProperties, 
serializerSvcId, publisherEndpoint);
 +    };
 +    adminService.teardownTopicSender = [](void *handle, const char *scope, 
const char *topic) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->teardownTopicSender(scope, topic);
 +    };
-     adminService.setupTopicReceiver = [](void *handle, const char *scope, 
const char *topic, long serializerSvcId, celix_properties_t 
**subscriberEndpoint) {
++    adminService.setupTopicReceiver = [](void *handle, const char *scope, 
const char *topic, const celix_properties_t *topicProperties, long 
serializerSvcId, celix_properties_t **subscriberEndpoint) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
-         return me->setupTopicReceiver(std::string(scope), 
std::string(topic),serializerSvcId, subscriberEndpoint);
++        return me->setupTopicReceiver(std::string(scope), std::string(topic), 
topicProperties, serializerSvcId, subscriberEndpoint);
 +    };
 +
 +    adminService.teardownTopicReceiver = [] (void *handle, const char *scope, 
const char *topic) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->teardownTopicReceiver(scope, topic);
 +    };
-     adminService.addEndpoint = [](void *handle, const celix_properties_t 
*endpoint) {
++    adminService.addDiscoveredEndpoint = [](void *handle, const 
celix_properties_t *endpoint) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->addEndpoint(endpoint);
 +    };
-     adminService.removeEndpoint = [](void *handle, const celix_properties_t 
*endpoint) {
++    adminService.removeDiscoveredEndpoint = [](void *handle, const 
celix_properties_t *endpoint) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->removeEndpoint(endpoint);
 +    };
 +
 +    celix_properties_t *props = celix_properties_create();
 +    celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, 
PUBSUB_NANOMSG_ADMIN_TYPE);
 +
 +    adminSvcId = celix_bundleContext_registerService(ctx, 
static_cast<void*>(&adminService), PUBSUB_ADMIN_SERVICE_NAME, props);
 +
 +
 +    celix_service_tracking_options_t opts{};
 +    opts.filter.serviceName = PUBSUB_SERIALIZER_SERVICE_NAME;
 +    opts.filter.ignoreServiceLanguage = true;
 +    opts.callbackHandle = this;
 +    opts.addWithProperties = [](void *handle, void *svc, const 
celix_properties_t *props) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        me->addSerializerSvc(svc, props);
 +    };
 +    opts.removeWithProperties = [](void *handle, void *svc, const 
celix_properties_t *props) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        me->removeSerializerSvc(svc, props);
 +    };
 +    serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, 
&opts);
 +
 +    //register shell command service
 +    cmdSvc.handle = this;
 +    cmdSvc.executeCommand = [](void *handle, char * commandLine, FILE 
*outStream, FILE *errorStream) {
 +        auto me = static_cast<pubsub_nanomsg_admin*>(handle);
 +        return me->executeCommand(commandLine, outStream, errorStream);
 +    };
 +
 +    celix_properties_t* shellProps = celix_properties_create();
 +    celix_properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "psa_nanomsg");
 +    celix_properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "psa_nanomsg");
 +    celix_properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "Print 
the information about the TopicSender and TopicReceivers for the nanomsg PSA");
 +    cmdSvcId = celix_bundleContext_registerService(ctx, &cmdSvc, 
OSGI_SHELL_COMMAND_SERVICE_NAME, shellProps);
 +
 +}
 +
 +void pubsub_nanomsg_admin::stop() {
 +    celix_bundleContext_unregisterService(ctx, adminSvcId);
 +    celix_bundleContext_unregisterService(ctx, cmdSvcId);
 +    celix_bundleContext_stopTracker(ctx, serializersTrackerId);
 +}
 +
 +void pubsub_nanomsg_admin::addSerializerSvc(void *svc, const 
celix_properties_t *props) {
 +    const char *serType = celix_properties_get(props, 
PUBSUB_SERIALIZER_TYPE_KEY, nullptr);
 +    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, 
-1L);
 +
 +    if (serType == nullptr) {
 +        L.INFO("[PSA_NANOMSG] Ignoring serializer service without ", 
PUBSUB_SERIALIZER_TYPE_KEY, " property");
 +        return;
 +    }
 +
 +    {
 +        std::lock_guard<std::mutex> lock(serializers.mutex);
 +        auto it = serializers.map.find(svcId);
 +        if (it == serializers.map.end()) {
 +            serializers.map.emplace(std::piecewise_construct,
 +                    std::forward_as_tuple(svcId),
 +                    std::forward_as_tuple(serType, svcId, 
static_cast<pubsub_serializer_service_t*>(svc)));
 +        }
 +    }
 +}
 +
 +
 +void pubsub_nanomsg_admin::removeSerializerSvc(void */*svc*/, const 
celix_properties_t *props) {
 +    long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, 
-1L);
 +
 +    //remove serializer
 +    // 1) First find entry and
 +    // 2) loop and destroy all topic sender using the serializer and
 +    // 3) loop and destroy all topic receivers using the serializer
 +    // Note that it is the responsibility of the topology manager to create 
new topic senders/receivers
 +
 +    std::lock_guard<std::mutex> lock(serializers.mutex);
 +
 +    auto kvsm = serializers.map.find(svcId);
 +    if (kvsm != serializers.map.end()) {
 +        auto &entry = kvsm->second;
 +        {
 +            std::lock_guard<std::mutex> senderLock(topicSenders.mutex);
 +            for(auto it = topicSenders.map.begin(); it != 
topicSenders.map.end(); /*nothing*/) {
 +                auto &sender = it->second;
 +                if (entry.svcId == sender.getSerializerSvcId()) {
 +                    it = topicSenders.map.erase(it);
 +                } else {
 +                    ++it;
 +                }
 +            }
 +        }
 +
 +        {
 +            std::lock_guard<std::mutex> receiverLock(topicReceivers.mutex);
 +            for (auto iter = topicReceivers.map.begin(); iter != 
topicReceivers.map.end();) {
 +                auto *receiver = iter->second;
 +                if (receiver != nullptr && entry.svcId == 
receiver->serializerSvcId()) {
 +                    auto key = iter->first;
 +                    topicReceivers.map.erase(iter++);
 +                    delete receiver;
 +                } else {
 +                    ++iter;
 +                }
 +            }
 +        }
 +
 +    }
 +}
 +
- celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, 
const celix_filter_t *svcFilter,
++celix_status_t pubsub_nanomsg_admin::matchPublisher(long svcRequesterBndId, 
const celix_filter_t *svcFilter, celix_properties_t **outTopicProperties,
 +                                                  double *outScore, long 
*outSerializerSvcId) {
 +    L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchPublisher");
 +    celix_status_t  status = CELIX_SUCCESS;
 +    double score = pubsub_utils_matchPublisher(ctx, svcRequesterBndId, 
svcFilter->filterStr, PUBSUB_NANOMSG_ADMIN_TYPE,
-             qosSampleScore, qosControlScore, defaultScore, 
outSerializerSvcId);
++            qosSampleScore, qosControlScore, defaultScore, 
outTopicProperties, outSerializerSvcId);
 +    *outScore = score;
 +
 +    return status;
 +}
 +
- celix_status_t pubsub_nanomsg_admin::matchSubscriber(long svcProviderBndId,
-                                                    const celix_properties_t 
*svcProperties, double *outScore,
-                                                    long *outSerializerSvcId) {
++celix_status_t pubsub_nanomsg_admin::matchSubscriber(
++        long svcProviderBndId,
++        const celix_properties_t *svcProperties,
++        celix_properties_t **outTopicProperties,
++        double *outScore,
++        long *outSerializerSvcId) {
 +    L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchSubscriber");
 +    celix_status_t  status = CELIX_SUCCESS;
 +    double score = pubsub_utils_matchSubscriber(ctx, svcProviderBndId, 
svcProperties, PUBSUB_NANOMSG_ADMIN_TYPE,
-             qosSampleScore, qosControlScore, defaultScore, 
outSerializerSvcId);
++            qosSampleScore, qosControlScore, defaultScore, 
outTopicProperties, outSerializerSvcId);
 +    if (outScore != nullptr) {
 +        *outScore = score;
 +    }
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::matchEndpoint(const celix_properties_t 
*endpoint, bool *outMatch) {
 +    L.DBG("[PSA_NANOMSG] pubsub_nanoMsgAdmin_matchEndpoint");
 +    celix_status_t  status = CELIX_SUCCESS;
 +    bool match = pubsub_utils_matchEndpoint(ctx, endpoint, 
PUBSUB_NANOMSG_ADMIN_TYPE, nullptr);
 +    if (outMatch != nullptr) {
 +        *outMatch = match;
 +    }
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::setupTopicSender(const char *scope, 
const char *topic,
++                                                    const celix_properties_t 
*/*topicProperties*/,
 +                                                    long serializerSvcId, 
celix_properties_t **outPublisherEndpoint) {
 +    celix_status_t status = CELIX_SUCCESS;
 +
 +    //1) Create TopicSender
 +    //2) Store TopicSender
 +    //3) Connect existing endpoints
 +    //4) set outPublisherEndpoint
 +
 +    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 +    std::lock_guard<std::mutex> serializerLock(serializers.mutex);
 +    std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
 +    auto sender = topicSenders.map.find(key);
 +    if (sender == topicSenders.map.end()) {
 +        //psa_nanomsg_serializer_entry *serEntry = nullptr;
 +        auto kv = serializers.map.find(serializerSvcId);
 +        if (kv != serializers.map.end()) {
 +            auto &serEntry = kv->second;
 +            auto e = topicSenders.map.emplace(std::piecewise_construct,
 +                    std::forward_as_tuple(key),
 +                    std::forward_as_tuple(ctx, scope, topic, serializerSvcId, 
serEntry.svc, ipAddress,
 +                                          basePort, maxPort));
 +            celix_properties_t *newEndpoint = pubsubEndpoint_create(fwUUID, 
scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE,
 +                    PUBSUB_NANOMSG_ADMIN_TYPE, serEntry.serType, nullptr);
 +            celix_properties_set(newEndpoint, PUBSUB_NANOMSG_URL_KEY, 
e.first->second.getUrl().c_str());
 +            //if available also set container name
 +            const char *cn = celix_bundleContext_getProperty(ctx, 
"CELIX_CONTAINER_NAME", nullptr);
 +            if (cn != nullptr) {
 +                celix_properties_set(newEndpoint, "container_name", cn);
 +            }
 +            if (newEndpoint != nullptr && outPublisherEndpoint != nullptr) {
 +                *outPublisherEndpoint = newEndpoint;
 +            }
 +        } else {
 +            L.ERROR("[PSA NANOMSG] Error creating a TopicSender");
 +        }
 +    } else {
 +        L.ERROR("[PSA_NANOMSG] Cannot setup already existing TopicSender for 
scope/topic ", scope,"/", topic);
 +    }
 +    free(key);
 +
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::teardownTopicSender(const char *scope, 
const char *topic) {
 +    celix_status_t  status = CELIX_SUCCESS;
 +
 +    //1) Find and remove TopicSender from map
 +    //2) destroy topic sender
 +
 +    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 +    std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
 +    if (topicSenders.map.erase(key) == 0) {
 +        L.ERROR("[PSA NANOMSG] Cannot teardown TopicSender with scope/topic 
", scope, "/", topic, " Does not exists");
 +    }
 +    free(key);
 +
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::setupTopicReceiver(const std::string 
&scope, const std::string &topic,
++                                                      const 
celix_properties_t */*topicProperties*/,
 +                                                      long serializerSvcId, 
celix_properties_t **outSubscriberEndpoint) {
 +
 +    celix_properties_t *newEndpoint = nullptr;
 +
 +    auto key = pubsubEndpoint_createScopeTopicKey(scope.c_str(), 
topic.c_str());
 +    pubsub::nanomsg::topic_receiver * receiver = nullptr;
 +    {
 +        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
 +        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
 +         auto trkv = topicReceivers.map.find(key);
 +         if (trkv != topicReceivers.map.end()) {
 +             receiver = trkv->second;
 +         }
 +        if (receiver == nullptr) {
 +            auto kvs = serializers.map.find(serializerSvcId);
 +            if (kvs != serializers.map.end()) {
 +                auto serEntry = kvs->second;
 +                receiver = new pubsub::nanomsg::topic_receiver(ctx, scope, 
topic, serializerSvcId, serEntry.svc);
 +            } else {
 +                L.ERROR("[PSA_NANOMSG] Cannot find serializer for TopicSender 
", scope, "/", topic);
 +            }
 +            if (receiver != nullptr) {
 +                const char *psaType = PUBSUB_NANOMSG_ADMIN_TYPE;
 +                const char *serType = kvs->second.serType;
 +                newEndpoint = pubsubEndpoint_create(fwUUID, scope.c_str(), 
topic.c_str(), PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
 +                                                    serType, nullptr);
 +                //if available also set container name
 +                const char *cn = celix_bundleContext_getProperty(ctx, 
"CELIX_CONTAINER_NAME", nullptr);
 +                if (cn != nullptr) {
 +                    celix_properties_set(newEndpoint, "container_name", cn);
 +                }
 +                topicReceivers.map[key] = receiver;
 +            } else {
 +                L.ERROR("[PSA NANOMSG] Error creating a TopicReceiver.");
 +            }
 +        } else {
 +            L.ERROR("[PSA_NANOMSG] Cannot setup already existing 
TopicReceiver for scope/topic ", scope, "/", topic);
 +        }
 +    }
 +    if (receiver != nullptr && newEndpoint != nullptr) {
 +        std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
 +        for (auto entry : discoveredEndpoints.map) {
 +            auto *endpoint = entry.second;
 +            const char *type = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TYPE, nullptr);
 +            if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, 
type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
 +                connectEndpointToReceiver(receiver, endpoint);
 +            }
 +        }
 +    }
 +
 +    if (newEndpoint != nullptr && outSubscriberEndpoint != nullptr) {
 +        *outSubscriberEndpoint = newEndpoint;
 +    }
 +    free(key);
 +    celix_status_t  status = CELIX_SUCCESS;
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::teardownTopicReceiver(const char *scope, 
const char *topic) {
 +    char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 +    std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
 +    auto entry = topicReceivers.map.find(key);
 +    free(key);
 +    if (entry != topicReceivers.map.end()) {
 +        auto receiverKey = entry->first;
 +        pubsub::nanomsg::topic_receiver *receiver = entry->second;
 +        topicReceivers.map.erase(receiverKey);
 +
 +        delete receiver;
 +    }
 +
 +    celix_status_t  status = CELIX_SUCCESS;
 +    return status;
 +}
 +
 +celix_status_t 
pubsub_nanomsg_admin::connectEndpointToReceiver(pubsub::nanomsg::topic_receiver 
*receiver,
 +                                                                    const 
celix_properties_t *endpoint) {
 +    //note can be called with discoveredEndpoint.mutex lock
 +    celix_status_t status = CELIX_SUCCESS;
 +
 +    auto scope = receiver->scope();
 +    auto topic = receiver->topic();
 +
 +    std::string eScope = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, "");
 +    std::string eTopic = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, "");
 +    const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, 
nullptr);
 +
 +    if (url == nullptr) {
 +//        L_WARN("[PSA NANOMSG] Error got endpoint without a nanomsg url 
(admin: %s, type: %s)", admin , type);
 +        status = CELIX_BUNDLE_EXCEPTION;
 +    } else {
 +        if ((eScope == scope) && (eTopic == topic)) {
 +            receiver->connectTo(url);
 +        }
 +    }
 +
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::addEndpoint(const celix_properties_t 
*endpoint) {
 +    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, 
nullptr);
 +
 +    if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, 
strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
 +        std::lock_guard<std::mutex> threadLock(topicReceivers.mutex);
 +        for (auto &entry: topicReceivers.map) {
 +            pubsub::nanomsg::topic_receiver *receiver = entry.second;
 +            connectEndpointToReceiver(receiver, endpoint);
 +        }
 +    }
 +
 +    std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
 +    celix_properties_t *cpy = celix_properties_copy(endpoint);
 +    //TODO : check if properties are never deleted before map.
 +    const char *uuid = celix_properties_get(cpy, PUBSUB_ENDPOINT_UUID, 
nullptr);
 +    discoveredEndpoints.map[uuid] = cpy;
 +
 +    celix_status_t  status = CELIX_SUCCESS;
 +    return status;
 +}
 +
 +
 +celix_status_t 
pubsub_nanomsg_admin::disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver
 *receiver,
 +                                                                            
const celix_properties_t *endpoint) {
 +    //note can be called with discoveredEndpoint.mutex lock
 +    celix_status_t status = CELIX_SUCCESS;
 +
 +    auto scope = receiver->scope();
 +    auto topic = receiver->topic();
 +
 +    auto eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, 
"");
 +    auto eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, 
"");
 +    const char *url = celix_properties_get(endpoint, PUBSUB_NANOMSG_URL_KEY, 
nullptr);
 +
 +    if (url == nullptr) {
 +        L.WARN("[PSA NANOMSG] Error got endpoint without nanomsg url");
 +        status = CELIX_BUNDLE_EXCEPTION;
 +    } else {
 +        if ((eScope == scope) && (eTopic == topic)) {
 +            receiver->disconnectFrom(url);
 +        }
 +    }
 +
 +    return status;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::removeEndpoint(const celix_properties_t 
*endpoint) {
 +    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, 
nullptr);
 +
 +    if (type != nullptr && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, 
strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
 +        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
 +        for (auto &entry : topicReceivers.map) {
 +            pubsub::nanomsg::topic_receiver *receiver = entry.second;
 +            disconnectEndpointFromReceiver(receiver, endpoint);
 +        }
 +    }
 +    {
 +        std::lock_guard<std::mutex> discEpLock(discoveredEndpoints.mutex);
 +        const char *uuid = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_UUID, nullptr);
 +        discoveredEndpoints.map.erase(uuid);
 +    }
 +    return CELIX_SUCCESS;;
 +}
 +
 +celix_status_t pubsub_nanomsg_admin::executeCommand(char *commandLine 
__attribute__((unused)), FILE *out,
 +                                                  FILE *errStream 
__attribute__((unused))) {
 +    celix_status_t  status = CELIX_SUCCESS;
 +    fprintf(out, "\n");
 +    fprintf(out, "Topic Senders:\n");
 +    {
 +        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
 +        std::lock_guard<std::mutex> topicSenderLock(topicSenders.mutex);
 +        for (auto &senderEntry: topicSenders.map) {
 +            auto &sender = senderEntry.second;
 +            long serSvcId = sender.getSerializerSvcId();
 +            auto kvs = serializers.map.find(serSvcId);
 +            const char* serType = ( kvs == serializers.map.end() ? "!Error" : 
 kvs->second.serType);
 +            const auto scope = sender.getScope();
 +            const auto topic = sender.getTopic();
 +            const auto url = sender.getUrl();
 +            fprintf(out, "|- Topic Sender %s/%s\n", scope.c_str(), 
topic.c_str());
 +            fprintf(out, "   |- serializer type = %s\n", serType);
 +            fprintf(out, "   |- url             = %s\n", url.c_str());
 +        }
 +    }
 +
 +    {
 +        fprintf(out, "\n");
 +        fprintf(out, "\nTopic Receivers:\n");
 +        std::lock_guard<std::mutex> serializerLock(serializers.mutex);
 +        std::lock_guard<std::mutex> topicReceiverLock(topicReceivers.mutex);
 +        for (auto &entry : topicReceivers.map) {
 +            pubsub::nanomsg::topic_receiver *receiver = entry.second;
 +            long serSvcId = receiver->serializerSvcId();
 +            auto kv =  serializers.map.find(serSvcId);
 +            const char *serType = (kv == serializers.map.end() ? "!Error!" : 
kv->second.serType);
 +            auto scope = receiver->scope();
 +            auto topic = receiver->topic();
 +
 +            std::vector<std::string> connected{};
 +            std::vector<std::string> unconnected{};
 +            receiver->listConnections(connected, unconnected);
 +
 +            fprintf(out, "|- Topic Receiver %s/%s\n", scope.c_str(), 
topic.c_str());
 +            fprintf(out, "   |- serializer type = %s\n", serType);
 +            for (auto &url : connected) {
 +                fprintf(out, "   |- connected url   = %s\n", url.c_str());
 +            }
 +            for (auto &url : unconnected) {
 +                fprintf(out, "   |- unconnected url = %s\n", url.c_str());
 +            }
 +        }
 +    }
 +    fprintf(out, "\n");
 +
 +    return status;
 +}
 +
 +#ifndef ANDROID
 +static celix_status_t nanoMsg_getIpAddress(const char *interface, char **ip) {
 +    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
 +
 +    struct ifaddrs *ifaddr, *ifa;
 +    char host[NI_MAXHOST];
 +
 +    if (getifaddrs(&ifaddr) != -1)
 +    {
 +        for (ifa = ifaddr; ifa != nullptr && status != CELIX_SUCCESS; ifa = 
ifa->ifa_next)
 +        {
 +            if (ifa->ifa_addr == nullptr)
 +                continue;
 +
 +            if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, 
NI_MAXHOST, nullptr, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == 
AF_INET)) {
 +                if (interface == nullptr) {
 +                    *ip = strdup(host);
 +                    status = CELIX_SUCCESS;
 +                }
 +                else if (strcmp(ifa->ifa_name, interface) == 0) {
 +                    *ip = strdup(host);
 +                    status = CELIX_SUCCESS;
 +                }
 +            }
 +        }
 +
 +        freeifaddrs(ifaddr);
 +    }
 +
 +    return status;
 +}
 +#endif
diff --cc bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
index 768accc,0000000..dccf280
mode 100644,000000..100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.h
@@@ -1,136 -1,0 +1,153 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#ifndef CELIX_PUBSUB_ZMQ_ADMIN_H
 +#define CELIX_PUBSUB_ZMQ_ADMIN_H
 +
 +#include <mutex>
 +#include <map>
 +#include <pubsub_admin.h>
 +#include "celix_api.h"
 +#include "pubsub_nanomsg_topic_receiver.h"
 +#include <pubsub_serializer.h>
 +#include "LogHelper.h"
 +#include "command.h"
 +#include "pubsub_nanomsg_topic_sender.h"
 +#include "pubsub_nanomsg_topic_receiver.h"
 +
 +#define PUBSUB_NANOMSG_ADMIN_TYPE       "zmq"
 +#define PUBSUB_NANOMSG_URL_KEY          "zmq.url"
 +
 +#define PUBSUB_NANOMSG_VERBOSE_KEY      "PSA_ZMQ_VERBOSE"
 +#define PUBSUB_NANOMSG_VERBOSE_DEFAULT  true
 +
 +#define PUBSUB_NANOMSG_PSA_IP_KEY       "PSA_IP"
 +#define PUBSUB_NANOMSG_PSA_ITF_KEY            "PSA_INTERFACE"
 +
 +#define PUBSUB_NANOMSG_DEFAULT_IP       "127.0.0.1"
 +
 +template <typename key, typename value>
 +struct ProtectedMap {
 +    std::mutex mutex{};
 +    std::map<key, value> map{};
 +};
 +
 +class pubsub_nanomsg_admin {
 +public:
 +    pubsub_nanomsg_admin(celix_bundle_context_t *ctx);
 +    pubsub_nanomsg_admin(const pubsub_nanomsg_admin&) = delete;
 +    pubsub_nanomsg_admin& operator=(const pubsub_nanomsg_admin&) = delete;
 +    ~pubsub_nanomsg_admin();
 +    void start();
 +    void stop();
 +
 +private:
 +    void addSerializerSvc(void *svc, const celix_properties_t *props);
 +    void removeSerializerSvc(void */*svc*/, const celix_properties_t *props);
-     celix_status_t matchPublisher(long svcRequesterBndId, const 
celix_filter_t *svcFilter,
-                                                       double *score, long 
*serializerSvcId);
-     celix_status_t matchSubscriber(long svcProviderBndId,
-                                                        const 
celix_properties_t *svcProperties, double *score,
-                                                        long *serializerSvcId);
++    celix_status_t matchPublisher(
++            long svcRequesterBndId,
++            const celix_filter_t *svcFilter,
++            celix_properties_t **outTopicProperties,
++            double *outScore,
++            long *outsSerializerSvcId);
++    celix_status_t matchSubscriber(
++            long svcProviderBndId,
++            const celix_properties_t *svcProperties,
++            celix_properties_t **outTopicProperties,
++            double *outScope,
++            long *outSerializerSvcId);
 +    celix_status_t matchEndpoint(const celix_properties_t *endpoint, bool 
*match);
 +
-     celix_status_t setupTopicSender(const char *scope, const char *topic,
-                                                         long serializerSvcId, 
celix_properties_t **publisherEndpoint);
++    celix_status_t setupTopicSender(
++            const char *scope,
++            const char *topic,
++            const celix_properties_t *topicProperties,
++            long serializerSvcId,
++            celix_properties_t **outPublisherEndpoint);
++
 +    celix_status_t teardownTopicSender(const char *scope, const char *topic);
 +
-     celix_status_t setupTopicReceiver(const std::string &scope, const 
std::string &topic,
-                                                           long 
serializerSvcId, celix_properties_t **subscriberEndpoint);
++    celix_status_t setupTopicReceiver(
++            const std::string &scope,
++            const std::string &topic,
++            const celix_properties_t *topicProperties,
++            long serializerSvcId,
++            celix_properties_t **outSubscriberEndpoint);
++
 +    celix_status_t teardownTopicReceiver(const char *scope, const char 
*topic);
 +
 +    celix_status_t addEndpoint(const celix_properties_t *endpoint);
 +    celix_status_t removeEndpoint(const celix_properties_t *endpoint);
 +
 +    celix_status_t executeCommand(char *commandLine __attribute__((unused)), 
FILE *out,
 +                                                        FILE *errStream 
__attribute__((unused)));
 +
 +    celix_status_t connectEndpointToReceiver(pubsub::nanomsg::topic_receiver 
*receiver,
 +                                                                   const 
celix_properties_t *endpoint);
 +
 +    celix_status_t 
disconnectEndpointFromReceiver(pubsub::nanomsg::topic_receiver *receiver,
 +                                                                        const 
celix_properties_t *endpoint);
 +    celix_bundle_context_t *ctx;
 +    celix::pubsub::nanomsg::LogHelper L;
 +    pubsub_admin_service_t adminService{};
 +    long adminSvcId = -1L;
 +    long cmdSvcId = -1L;
 +    command_service_t cmdSvc{};
 +    long serializersTrackerId = -1L;
 +
 +    const char *fwUUID{};
 +
 +    char* ipAddress{};
 +
 +    unsigned int basePort{};
 +    unsigned int maxPort{};
 +
 +    double qosSampleScore{};
 +    double qosControlScore{};
 +    double defaultScore{};
 +
 +    bool verbose{};
 +
 +    class psa_nanomsg_serializer_entry {
 +    public:
 +        psa_nanomsg_serializer_entry(const char*_serType, long _svcId, 
pubsub_serializer_service_t *_svc) :
 +            serType{_serType}, svcId{_svcId}, svc{_svc} {
 +
 +        }
 +
 +        const char *serType;
 +        long svcId;
 +        pubsub_serializer_service_t *svc;
 +    };
 +    ProtectedMap<long, psa_nanomsg_serializer_entry> serializers{};
 +    ProtectedMap<std::string, pubsub::nanomsg::pubsub_nanomsg_topic_sender> 
topicSenders{};
 +    ProtectedMap<std::string, pubsub::nanomsg::topic_receiver*> 
topicReceivers{};
 +    ProtectedMap<const std::string, celix_properties_t *> 
discoveredEndpoints{};
 +};
 +
 +#ifdef __cplusplus
 +extern "C" {
 +#endif
 +
 +#ifdef __cplusplus
 +}
 +#endif
 +
 +
 +#endif //CELIX_PUBSUB_ZMQ_ADMIN_H
diff --cc 
bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
index d35de89,0000000..b27eb76
mode 100644,000000..100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_receiver.cc
@@@ -1,319 -1,0 +1,319 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <iostream>
 +#include <mutex>
 +#include <memory.h>
 +#include <vector>
 +#include <string>
 +#include <sstream>
 +
 +#include <stdlib.h>
 +#include <assert.h>
 +
 +#include <sys/epoll.h>
 +#include <arpa/inet.h>
 +
 +#include <nanomsg/nn.h>
 +#include <nanomsg/bus.h>
 +
 +#include <pubsub_serializer.h>
 +#include <pubsub/subscriber.h>
 +#include <pubsub_constants.h>
 +#include <pubsub_endpoint.h>
 +#include <LogHelper.h>
 +
 +#include "pubsub_nanomsg_topic_receiver.h"
 +#include "pubsub_psa_nanomsg_constants.h"
 +#include "pubsub_nanomsg_common.h"
 +#include "pubsub_topology_manager.h"
 +
 +//TODO see if block and wakeup (reset) also works
 +#define PSA_NANOMSG_RECV_TIMEOUT 100 //100 msec timeout
 +
 +/*
 +#define L_DEBUG(...) \
 +    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
 +#define L_INFO(...) \
 +    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
 +#define L_WARN(...) \
 +    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
 +#define L_ERROR(...) \
 +    logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 +*/
 +//#define L_DEBUG printf
 +//#define L_INFO printf
 +//#define L_WARN printf
 +//#define L_ERROR printf
 +
 +
 +pubsub::nanomsg::topic_receiver::topic_receiver(celix_bundle_context_t *_ctx,
 +        const std::string &_scope,
 +        const std::string &_topic,
 +        long _serializerSvcId,
 +        pubsub_serializer_service_t *_serializer) : L{_ctx, 
"NANOMSG_topic_receiver"}, m_serializerSvcId{_serializerSvcId}, 
m_scope{_scope}, m_topic{_topic} {
 +    ctx = _ctx;
 +    serializer = _serializer;
 +
 +    m_nanoMsgSocket = nn_socket(AF_SP, NN_BUS);
 +    if (m_nanoMsgSocket < 0) {
 +        L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for scope/topic: 
", m_scope.c_str(), "/", m_topic.c_str());
 +        std::bad_alloc{};
 +    } else {
 +        int timeout = PSA_NANOMSG_RECV_TIMEOUT;
 +        if (nn_setsockopt(m_nanoMsgSocket , NN_SOL_SOCKET, NN_RCVTIMEO, 
&timeout,
 +                          sizeof (timeout)) < 0) {
 +            L.ERROR("[PSA_NANOMSG] Cannot create TopicReceiver for ",m_scope, 
"/",m_topic, ", set sockopt RECV_TIMEO failed");
 +            std::bad_alloc{};
 +        }
 +
 +        auto subscriberFilter = 
celix::pubsub::nanomsg::setScopeAndTopicFilter(m_scope, m_topic);
 +
 +        auto opts = createOptions();
 +
 +        subscriberTrackerId = 
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
 +        recvThread.running = true;
 +        free ((void*)opts.filter.filter);
 +        recvThread.thread = std::thread([this]() {this->recvThread_exec();});
 +    }
 +}
 +
 +celix_service_tracking_options_t 
pubsub::nanomsg::topic_receiver::createOptions() {
 +    std::stringstream filter_str;
 +
 +    filter_str << "(" << PUBSUB_SUBSCRIBER_TOPIC << "=" <<  m_topic << ")";
 +    celix_service_tracking_options_t opts{};
 +    opts.filter.ignoreServiceLanguage = true;
 +    opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
 +    opts.filter.filter = strdup(filter_str.str().c_str()); // TODO : memory 
leak ??
 +    opts.callbackHandle = this;
 +    opts.addWithOwner = [](void *handle, void *svc, const celix_properties_t 
*props, const celix_bundle_t *svcOwner) {
 +            
static_cast<pubsub::nanomsg::topic_receiver*>(handle)->addSubscriber(svc, 
props, svcOwner);
 +        };
 +    opts.removeWithOwner = [](void *handle, void *svc, const 
celix_properties_t *props, const celix_bundle_t *svcOwner) {
 +            
static_cast<pubsub::nanomsg::topic_receiver*>(handle)->removeSubscriber(svc, 
props, svcOwner);
 +        };
 +    return opts;
 +}
 +
 +pubsub::nanomsg::topic_receiver::~topic_receiver() {
 +
 +        {
 +            std::lock_guard<std::mutex> _lock(recvThread.mutex);
 +            recvThread.running = false;
 +        }
 +        recvThread.thread.join();
 +
 +        celix_bundleContext_stopTracker(ctx, subscriberTrackerId);
 +
 +        {
 +            std::lock_guard<std::mutex> _lock(subscribers.mutex);
 +            for(auto elem : subscribers.map) {
 +                serializer->destroySerializerMap(serializer->handle, 
elem.second.msgTypes);
 +            }
 +            subscribers.map.clear();
 +        }
 +        nn_close(m_nanoMsgSocket);
 +
 +}
 +
 +std::string pubsub::nanomsg::topic_receiver::scope() const {
 +    return m_scope;
 +}
 +
 +std::string pubsub::nanomsg::topic_receiver::topic() const {
 +    return m_topic;
 +}
 +
 +long pubsub::nanomsg::topic_receiver::serializerSvcId() const {
 +    return m_serializerSvcId;
 +}
 +
 +void 
pubsub::nanomsg::topic_receiver::listConnections(std::vector<std::string> 
&connectedUrls,
 +                                                 std::vector<std::string> 
&unconnectedUrls) {
 +    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
 +    for (auto entry : requestedConnections.map) {
 +        if (entry.second.isConnected()) {
 +            connectedUrls.emplace_back(entry.second.getUrl());
 +        } else {
 +            unconnectedUrls.emplace_back(entry.second.getUrl());
 +        }
 +    }
 +}
 +
 +
 +void pubsub::nanomsg::topic_receiver::connectTo(const char *url) {
 +    L.DBG("[PSA_NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " connecting 
to nanomsg url ", url);
 +
 +    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
 +    auto entry  = requestedConnections.map.find(url);
 +    if (entry == requestedConnections.map.end()) {
 +        requestedConnections.map.emplace(
 +                std::piecewise_construct,
 +                std::forward_as_tuple(std::string(url)),
 +                std::forward_as_tuple(url, -1));
 +        entry = requestedConnections.map.find(url);
 +    }
 +    if (!entry->second.isConnected()) {
 +        int connection_id = nn_connect(m_nanoMsgSocket, url);
 +        if (connection_id >= 0) {
 +            entry->second.setConnected(true);
 +            entry->second.setId(connection_id);
 +        } else {
 +            L.WARN("[PSA_NANOMSG] Error connecting to NANOMSG url ", url, " 
(",strerror(errno), ")");
 +        }
 +    }
 +}
 +
 +void pubsub::nanomsg::topic_receiver::disconnectFrom(const char *url) {
 +    L.DBG("[PSA NANOMSG] TopicReceiver ", m_scope, "/", m_topic, " disconnect 
from nanomsg url ", url);
 +
 +    std::lock_guard<std::mutex> _lock(requestedConnections.mutex);
 +    auto entry = requestedConnections.map.find(url);
 +    if (entry != requestedConnections.map.end()) {
 +        if (entry->second.isConnected()) {
 +            if (nn_shutdown(m_nanoMsgSocket, entry->second.getId()) == 0) {
 +                entry->second.setConnected(false);
 +            } else {
 +                L.WARN("[PSA_NANOMSG] Error disconnecting from nanomsg url ", 
url, ", id: ", entry->second.getId(), " (",strerror(errno),")");
 +            }
 +        }
 +        requestedConnections.map.erase(url);
 +        std::cerr << "REMOVING connection " << url << std::endl;
 +    } else {
 +        std::cerr << "Disconnecting from unknown URL " << url << std::endl;
 +    }
 +}
 +
 +void pubsub::nanomsg::topic_receiver::addSubscriber(void *svc, const 
celix_properties_t *props,
 +                                                      const celix_bundle_t 
*bnd) {
 +    long bndId = celix_bundle_getId(bnd);
 +    std::string subScope = celix_properties_get(props, 
PUBSUB_SUBSCRIBER_SCOPE, "default");
 +    if (subScope != m_scope) {
 +        //not the same scope. ignore
 +        return;
 +    }
 +
 +    std::lock_guard<std::mutex> _lock(subscribers.mutex);
 +    auto entry = subscribers.map.find(bndId);
 +    if (entry != subscribers.map.end()) {
 +        entry->second.usageCount += 1;
 +    } else {
 +        //new create entry
 +        subscribers.map.emplace(std::piecewise_construct,
 +                std::forward_as_tuple(bndId),
 +                std::forward_as_tuple(static_cast<pubsub_subscriber_t*>(svc), 
1));
 +        entry = subscribers.map.find(bndId);
 +
 +        int rc = serializer->createSerializerMap(serializer->handle, 
(celix_bundle_t*)bnd, &entry->second.msgTypes);
 +        if (rc != 0) {
 +            L.ERROR("[PSA_NANOMSG] Cannot create msg serializer map for 
TopicReceiver ", m_scope.c_str(), "/", m_topic.c_str());
 +            subscribers.map.erase(bndId);
 +        }
 +    }
 +}
 +
 +void pubsub::nanomsg::topic_receiver::removeSubscriber(void */*svc*/,
 +                                                         const 
celix_properties_t */*props*/, const celix_bundle_t *bnd) {
 +    long bndId = celix_bundle_getId(bnd);
 +
 +    std::lock_guard<std::mutex> _lock(subscribers.mutex);
 +    auto entry = subscribers.map.find(bndId);
 +    if (entry != subscribers.map.end()) {
 +        entry->second.usageCount -= 1;
 +        if (entry->second.usageCount <= 0) {
 +            //remove entry
 +            int rc = serializer->destroySerializerMap(serializer->handle, 
entry->second.msgTypes);
 +            if (rc != 0) {
 +                L.ERROR("[PSA_NANOMSG] Cannot destroy msg serializers map for 
TopicReceiver ", m_scope.c_str(), "/",m_topic.c_str(),"\n");
 +            }
 +            subscribers.map.erase(bndId);
 +        }
 +    }
 +}
 +
 +void 
pubsub::nanomsg::topic_receiver::processMsgForSubscriberEntry(psa_nanomsg_subscriber_entry*
 entry, const celix::pubsub::nanomsg::msg_header *hdr, const char* payload, 
size_t payloadSize) {
 +    pubsub_msg_serializer_t* msgSer = 
static_cast<pubsub_msg_serializer_t*>(hashMap_get(entry->msgTypes, 
(void*)(uintptr_t)(hdr->type)));
 +    pubsub_subscriber_t *svc = entry->svc;
 +
 +    if (msgSer!= NULL) {
 +        void *deserializedMsg = NULL;
 +        bool validVersion = 
celix::pubsub::nanomsg::checkVersion(msgSer->msgVersion, hdr);
 +        if (validVersion) {
 +            celix_status_t status = msgSer->deserialize(msgSer, payload, 
payloadSize, &deserializedMsg);
 +            if(status == CELIX_SUCCESS) {
 +                bool release = false;
-                 svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, 
deserializedMsg, NULL, &release);
++                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, 
deserializedMsg, &release);
 +                if (release) {
 +                    msgSer->freeMsg(msgSer->handle, deserializedMsg);
 +                }
 +            } else {
 +                L.WARN("[PSA_NANOMSG_TR] Cannot deserialize msg type ", 
msgSer->msgName , "for scope/topic ", scope(), "/", topic());
 +            }
 +        }
 +    } else {
 +        L.WARN("[PSA_NANOMSG_TR] Cannot find serializer for type id ", 
hdr->type);
 +    }
 +}
 +
 +void pubsub::nanomsg::topic_receiver::processMsg(const 
celix::pubsub::nanomsg::msg_header *hdr, const char *payload, size_t 
payloadSize) {
 +    std::lock_guard<std::mutex> _lock(subscribers.mutex);
 +    for (auto entry : subscribers.map) {
 +        processMsgForSubscriberEntry(&entry.second, hdr, payload, 
payloadSize);
 +    }
 +}
 +
 +struct Message {
 +    celix::pubsub::nanomsg::msg_header header;
 +    char payload[];
 +};
 +
 +void pubsub::nanomsg::topic_receiver::recvThread_exec() {
 +    while (recvThread.running) {
 +        Message *msg = nullptr;
 +        nn_iovec iov[2];
 +        iov[0].iov_base = &msg;
 +        iov[0].iov_len = NN_MSG;
 +
 +        nn_msghdr msgHdr;
 +        memset(&msgHdr, 0, sizeof(msgHdr));
 +
 +        msgHdr.msg_iov = iov;
 +        msgHdr.msg_iovlen = 1;
 +
 +        msgHdr.msg_control = nullptr;
 +        msgHdr.msg_controllen = 0;
 +
 +        errno = 0;
 +        int recvBytes = nn_recvmsg(m_nanoMsgSocket, &msgHdr, 0);
 +        if (msg && static_cast<unsigned long>(recvBytes) >= 
sizeof(celix::pubsub::nanomsg::msg_header)) {
 +            processMsg(&msg->header, msg->payload, 
recvBytes-sizeof(msg->header));
 +            nn_freemsg(msg);
 +        } else if (recvBytes >= 0) {
 +            L.ERROR("[PSA_NANOMSG_TR] Error receiving nanomsg msg, size (", 
recvBytes,") smaller than header\n");
 +        } else if (errno == EAGAIN || errno == ETIMEDOUT) {
 +            // no data: go to next cycle
 +        } else if (errno == EINTR) {
 +            L.DBG("[PSA_NANOMSG_TR] nn_recvmsg interrupted");
 +        } else {
 +            L.WARN("[PSA_NANOMSG_TR] Error receiving nanomessage: errno ", 
errno, " : ",  strerror(errno), "\n");
 +        }
 +    } // while
 +
 +}
diff --cc bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
index 154bd11,0000000..4190729
mode 100644,000000..100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_topic_sender.cc
@@@ -1,267 -1,0 +1,266 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <memory.h>
 +#include <iostream>
 +#include <sstream>
 +#include <stdlib.h>
 +#include <utils.h>
 +#include <arpa/inet.h>
 +#include <zconf.h>
 +#include <LogHelper.h>
 +#include <nanomsg/nn.h>
 +#include <nanomsg/bus.h>
 +
 +
 +#include <pubsub_constants.h>
 +#include <pubsub_common.h>
 +#include "pubsub_nanomsg_topic_sender.h"
 +#include "pubsub_psa_nanomsg_constants.h"
 +#include "pubsub_nanomsg_common.h"
 +
 +#define FIRST_SEND_DELAY_IN_SECONDS                 2
 +#define NANOMSG_BIND_MAX_RETRY                      10
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max);
 +static void 
delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper);
 +
 
+pubsub::nanomsg::pubsub_nanomsg_topic_sender::pubsub_nanomsg_topic_sender(celix_bundle_context_t
 *_ctx,
 +                                                         const char *_scope,
 +                                                         const char *_topic,
 +                                                         long 
_serializerSvcId,
 +                                                         
pubsub_serializer_service_t *_ser,
 +                                                         const char *_bindIp,
 +                                                         unsigned int 
_basePort,
 +                                                         unsigned int 
_maxPort) :
 +        ctx{_ctx},
 +        L{ctx, "PSA_ZMQ_TS"},
 +        serializerSvcId {_serializerSvcId},
 +        serializer{_ser},
 +        scope{_scope},
 +        topic{_topic}{
 +
 +    scopeAndTopicFilter = 
celix::pubsub::nanomsg::setScopeAndTopicFilter(_scope, _topic);
 +
 +    //setting up nanomsg socket for nanomsg TopicSender
 +    int nnSock = nn_socket(AF_SP, NN_BUS);
 +    if (nnSock == -1) {
 +        perror("Error for nanomsg_socket");
 +    }
 +
 +    int rv = -1, retry=0;
 +    while(rv == -1 && retry < NANOMSG_BIND_MAX_RETRY ) {
 +        /* Randomized part due to same bundle publishing on different topics 
*/
 +        unsigned int port = rand_range(_basePort,_maxPort);
 +        std::stringstream _url;
 +        _url << "tcp://" << _bindIp << ":" << port;
 +
 +        std::stringstream bindUrl;
 +        bindUrl << "tcp://0.0.0.0:" << port;
 +
 +        rv = nn_bind (nnSock, bindUrl.str().c_str());
 +        if (rv == -1) {
 +            perror("Error for nn_bind");
 +        } else {
 +            this->url = _url.str();
 +            nanomsg.socket = nnSock;
 +        }
 +        retry++;
 +    }
 +
 +    if (!url.empty()) {
 +
 +        //register publisher services using a service factory
 +        publisher.factory.handle = this;
 +        publisher.factory.getService = [](void *handle, const celix_bundle_t 
*requestingBundle, const celix_properties_t *svcProperties) {
 +            return 
static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->getPublisherService(
 +                    requestingBundle,
 +                    svcProperties);
 +        };
 +        publisher.factory.ungetService = [](void *handle, const 
celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties) {
 +            return 
static_cast<pubsub::nanomsg::pubsub_nanomsg_topic_sender*>(handle)->ungetPublisherService(
 +                    requestingBundle,
 +                    svcProperties);
 +        };
 +
 +        celix_properties_t *props = celix_properties_create();
 +        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, topic.c_str());
 +        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, scope.c_str());
 +
 +        celix_service_registration_options_t opts = 
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
 +        opts.factory = &publisher.factory;
 +        opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
 +        opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
 +        opts.properties = props;
 +
 +        publisher.svcId = 
celix_bundleContext_registerServiceWithOptions(_ctx, &opts);
 +    }
 +
 +}
 +
 +pubsub::nanomsg::pubsub_nanomsg_topic_sender::~pubsub_nanomsg_topic_sender() {
 +    celix_bundleContext_unregisterService(ctx, publisher.svcId);
 +
 +    nn_close(nanomsg.socket);
 +    std::lock_guard<std::mutex> lock(boundedServices.mutex);
 +    for  (auto &it: boundedServices.map) {
 +            serializer->destroySerializerMap(serializer->handle, 
it.second.msgTypes);
 +    }
 +    boundedServices.map.clear();
 +
 +}
 +
 +long pubsub::nanomsg::pubsub_nanomsg_topic_sender::getSerializerSvcId() const 
{
 +    return serializerSvcId;
 +}
 +
 +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getScope() 
const {
 +    return scope;
 +}
 +
 +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getTopic() 
const {
 +    return topic;
 +}
 +
 +const std::string &pubsub::nanomsg::pubsub_nanomsg_topic_sender::getUrl() 
const  {
 +    return url;
 +}
 +
 +
 +void* pubsub::nanomsg::pubsub_nanomsg_topic_sender::getPublisherService(const 
celix_bundle_t *requestingBundle,
 +                                             const celix_properties_t 
*svcProperties __attribute__((unused))) {
 +    long bndId = celix_bundle_getId(requestingBundle);
 +    void *service{nullptr};
 +    std::lock_guard<std::mutex> lock(boundedServices.mutex);
 +    auto existingEntry = boundedServices.map.find(bndId);
 +    if (existingEntry != boundedServices.map.end()) {
 +        existingEntry->second.getCount += 1;
 +        service = &existingEntry->second.service;
 +    } else {
 +        auto entry = boundedServices.map.emplace(std::piecewise_construct,
 +                                    std::forward_as_tuple(bndId),
 +                                    std::forward_as_tuple(scope, topic, 
bndId, nanomsg.socket, ctx));
 +        int rc = serializer->createSerializerMap(serializer->handle, 
(celix_bundle_t*)requestingBundle, &entry.first->second.msgTypes);
 +
 +        if (rc == 0) {
 +            entry.first->second.service.handle = &entry.first->second;
 +            entry.first->second.service.localMsgTypeIdForMsgType = 
celix::pubsub::nanomsg::localMsgTypeIdForMsgType;
 +            entry.first->second.service.send = [](void *handle, unsigned int 
msgTypeId, const void *msg) {
 +                return 
static_cast<pubsub::nanomsg::bounded_service_entry*>(handle)->topicPublicationSend(msgTypeId,
 msg);
 +            };
-             entry.first->second.service.sendMultipart = nullptr; //not 
supported TODO remove
 +            service = &entry.first->second.service;
 +        } else {
 +            boundedServices.map.erase(bndId);
 +            L.ERROR("Error creating serializer map for NanoMsg TopicSender. 
Scope: ", scope, ", Topic: ", topic);
 +        }
 +    }
 +
 +    return service;
 +}
 +
 +void 
pubsub::nanomsg::pubsub_nanomsg_topic_sender::ungetPublisherService(const 
celix_bundle_t *requestingBundle,
 +                                                                         
const celix_properties_t */*svcProperties*/) {
 +    long bndId = celix_bundle_getId(requestingBundle);
 +
 +    std::lock_guard<std::mutex> lock(boundedServices.mutex);
 +    auto entry = boundedServices.map.find(bndId);
 +    if (entry != boundedServices.map.end()) {
 +        entry->second.getCount -= 1;
 +        if (entry->second.getCount == 0) {
 +            int rc = serializer->destroySerializerMap(serializer->handle, 
entry->second.msgTypes);
 +            if (rc != 0) {
 +                L.ERROR("Error destroying publisher service, serializer not 
available / cannot get msg serializer map\n");
 +            }
 +            boundedServices.map.erase(bndId);
 +        }
 +    }
 +}
 +
 +int pubsub::nanomsg::bounded_service_entry::topicPublicationSend(unsigned int 
msgTypeId, const void *inMsg) {
 +    int status;
 +    auto msgSer = static_cast<pubsub_msg_serializer_t*>(hashMap_get(msgTypes, 
(void*)(uintptr_t)msgTypeId));
 +
 +    if (msgSer != nullptr) {
 +        delay_first_send_for_late_joiners(L);
 +
 +        int major = 0, minor = 0;
 +
 +        celix::pubsub::nanomsg::msg_header msg_hdr{};
 +        msg_hdr.type = msgTypeId;
 +
 +        if (msgSer->msgVersion != nullptr) {
 +            version_getMajor(msgSer->msgVersion, &major);
 +            version_getMinor(msgSer->msgVersion, &minor);
 +            msg_hdr.major = (unsigned char) major;
 +            msg_hdr.minor = (unsigned char) minor;
 +        }
 +
 +        void *serializedOutput = nullptr;
 +        size_t serializedOutputLen = 0;
 +        status = msgSer->serialize(msgSer, inMsg, &serializedOutput, 
&serializedOutputLen);
 +        if (status == CELIX_SUCCESS) {
 +            nn_iovec data[2];
 +
 +            nn_msghdr msg{};
 +            msg.msg_iov = data;
 +            msg.msg_iovlen = 2;
 +            msg.msg_iov[0].iov_base = static_cast<void*>(&msg_hdr);
 +            msg.msg_iov[0].iov_len = sizeof(msg_hdr);
 +            msg.msg_iov[1].iov_base = serializedOutput;
 +            msg.msg_iov[1].iov_len = serializedOutputLen;
 +            msg.msg_control = nullptr;
 +            msg.msg_controllen = 0;
 +            errno = 0;
 +            int rc = nn_sendmsg(nanoMsgSocket, &msg, 0 );
 +            free(serializedOutput);
 +            if (rc < 0) {
 +                L.WARN("[PSA_ZMQ_TS] Error sending zmsg, rc: ", rc, ", error: 
",  strerror(errno));
 +            } else {
 +                L.INFO("[PSA_ZMQ_TS] Send message with size ",  rc, "\n");
 +                L.INFO("[PSA_ZMQ_TS] Send message ID ", msg_hdr.type,
 +                        " major: ", (int)msg_hdr.major,
 +                        " minor: ",  (int)msg_hdr.minor,"\n");
 +            }
 +        } else {
 +            L.WARN("[PSA_ZMQ_TS] Error serialize message of type ", 
msgSer->msgName,
 +                    " for scope/topic ", scope.c_str(), "/", 
topic.c_str(),"\n");
 +        }
 +    } else {
 +        status = CELIX_SERVICE_EXCEPTION;
 +        L.WARN("[PSA_ZMQ_TS] Error cannot serialize message with msg type id 
", msgTypeId,
 +                " for scope/topic ", scope.c_str(), "/", topic.c_str(),"\n");
 +    }
 +    return status;
 +}
 +
 +static void 
delay_first_send_for_late_joiners(celix::pubsub::nanomsg::LogHelper& logHelper) 
{
 +
 +    static bool firstSend = true;
 +
 +    if(firstSend){
 +        logHelper.INFO("PSA_UDP_MC_TP: Delaying first send for late 
joiners...\n");
 +        sleep(FIRST_SEND_DELAY_IN_SECONDS);
 +        firstSend = false;
 +    }
 +}
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max){
 +    double scaled = ((double)random())/((double)RAND_MAX);
 +    return (unsigned int)((max-min+1)*scaled + min);
 +}
diff --cc bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index 2d36579,d7add8b..b7fc8bd
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@@ -576,8 -592,6 +592,7 @@@ celix_status_t pubsub_udpmcAdmin_execut
      celixThreadMutex_unlock(&psa->serializers.mutex);
      fprintf(out, "\n");
  
-     //TODO topic receivers/senders connection count
 +
      return status;
  }
  
@@@ -681,4 -695,4 +696,4 @@@ static celix_status_t udpmc_getIpAddres
  
      return status;
  }
--#endif
++#endif
diff --cc bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
index cada021,469fa9c..c71341b
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.h
@@@ -22,19 -22,9 +22,8 @@@
  
  #include "celix_api.h"
  #include "log_helper.h"
- 
- #define PUBSUB_UDPMC_ADMIN_TYPE                     "udp_mc"
- #define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY                   
"udpmc.socket_address"
- #define PUBSUB_PSA_UDPMC_SOCKET_PORT_KEY            "udpmc.socket_port"
- 
- #define PUBSUB_UDPMC_IP_KEY                       "PSA_IP"
- #define PUBSUB_UDPMC_ITF_KEY                      "PSA_INTERFACE"
- #define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_KEY        "PSA_MC_PREFIX"
- #define PUBSUB_UDPMC_VERBOSE_KEY                    "PSA_UDPMC_VERBOSE"
- 
- #define PUBSUB_UDPMC_MULTICAST_IP_PREFIX_DEFAULT    "224.100"
- #define PUBSUB_UDPMC_VERBOSE_DEFAULT                true
+ #include "pubsub_psa_udpmc_constants.h"
  
 -
  typedef struct pubsub_udpmc_admin pubsub_udpmc_admin_t;
  
  pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, 
log_helper_t *logHelper);
diff --cc bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
index 64d7a23,1b7c1db..b5ca4fd
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.c
@@@ -339,3 -345,7 +345,7 @@@ static unsigned int rand_range(unsigne
  long pubsub_udpmcTopicSender_serializerSvcId(pubsub_udpmc_topic_sender_t 
*sender) {
      return sender->serializerSvcId;
  }
+ 
+ bool pubsub_udpmcTopicSender_isStatic(pubsub_udpmc_topic_sender_t *sender) {
+     return sender->staticallyConfigured;
 -}
++}
diff --cc bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index a13cb26,8c9e163..ea7653e
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@@ -370,6 -373,22 +373,22 @@@ celix_status_t pubsub_zmqAdmin_setupTop
              newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, 
PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
                                                  serType, NULL);
              celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, 
pubsub_zmqTopicSender_url(sender));
+ 
+             //if configured use a static discover url
+             const char *staticDiscUrl = celix_properties_get(topicProperties, 
PUBSUB_ZMQ_STATIC_DISCOVER_URL, NULL);
+             if (staticDiscUrl != NULL) {
+                 celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, 
staticDiscUrl);
+             }
+             celix_properties_setBool(newEndpoint, 
PUBSUB_ZMQ_STATIC_CONFIGURED, staticBindUrl != NULL || staticDiscUrl != NULL);
+ 
+             //if url starts with ipc:// constrain discovery to host 
visibility, else use system visibility
+             const char *u = celix_properties_get(newEndpoint, 
PUBSUB_ZMQ_URL_KEY, "");
+             if (strncmp("ipc://", u, strlen("ipc://")) == 0) {
 -                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, 
PUBSUB_ENDPOINT_HOST_VISIBLITY);
++                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, 
PUBSUB_ENDPOINT_HOST_VISIBILITY);
+             } else {
 -                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, 
PUBSUB_ENDPOINT_SYSTEM_VISIBLITY);
++                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, 
PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
+             }
+ 
              //if available also set container name
              const char *cn = celix_bundleContext_getProperty(psa->ctx, 
"CELIX_CONTAINER_NAME", NULL);
              if (cn != NULL) {
diff --cc bundles/pubsub/pubsub_spi/include/pubsub_utils.h
index e4983b1,9d707f3..396ae6f
--- a/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_utils.h
@@@ -38,26 -36,40 +38,43 @@@ extern "C" 
   * present a allocated scope string.
   * The caller is owner of the topic and scope output string.
   */
- celix_status_t pubsub_getPubSubInfoFromFilter(const char *filterstr, char 
**topic, char **scope);
+ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char 
**topic, char **scope);
  
- char *pubsub_getKeysBundleDir(bundle_context_pt ctx);
+ char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
  
- double
- pubsub_utils_matchPublisher(celix_bundle_context_t *ctx, long bundleId, const 
char *filter, const char *adminType,
-                             double sampleScore, double controlScore, double 
defaultScore, long *outSerializerSvcId);
+ double pubsub_utils_matchPublisher(
+         celix_bundle_context_t *ctx,
+         long bundleId,
+         const char *filter,
+         const char *adminType,
+         double sampleScore,
+         double controlScore,
+         double defaultScore,
+         celix_properties_t **outTopicProperties,
+         long *outSerializerSvcId);
  
- double pubsub_utils_matchSubscriber(celix_bundle_context_t *ctx, long 
svcProviderBundleId,
-                                     const celix_properties_t *svcProperties, 
const char *adminType, double sampleScore,
-                                     double controlScore, double defaultScore, 
long *outSerializerSvcId);
+ double pubsub_utils_matchSubscriber(
+         celix_bundle_context_t *ctx,
+         long svcProviderBundleId,
+         const celix_properties_t *svcProperties,
+         const char *adminType,
+         double sampleScore,
+         double controlScore,
+         double defaultScore,
+         celix_properties_t **outTopicProperties,
+         long *outSerializerSvcId);
  
- bool pubsub_utils_matchEndpoint(celix_bundle_context_t *ctx, const 
celix_properties_t *endpoint, const char *adminType,
-                                 long *outSerializerSvcId);
+ bool pubsub_utils_matchEndpoint(
+         celix_bundle_context_t *ctx,
+         const celix_properties_t *endpoint,
+         const char *adminType,
+         long *outSerializerSvcId);
  
  
- celix_properties_t *pubsub_utils_getTopicProperties(const celix_bundle_t 
*bundle, const char *topic, bool isPublisher);
+ celix_properties_t* pubsub_utils_getTopicProperties(const celix_bundle_t 
*bundle, const char *topic, bool isPublisher);
  
 +#ifdef __cplusplus
 +}
 +#endif
  
  #endif /* PUBSUB_UTILS_H_ */

Reply via email to