This is an automated email from the ASF dual-hosted git repository.

rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 4b59c692bc60bee7b9e5cbc571ee678b6fd37530
Author: Roy Lenferink <[email protected]>
AuthorDate: Sun Jul 28 11:25:14 2019 +0200

    Added support for subnet mask to tcp and nanomsg admins
---
 .../src/pubsub_nanomsg_admin.cc                    |   20 +-
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c |   23 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_common.h       |    2 +-
 .../src/pubsub_tcp_topic_receiver.c                | 1096 ++++++++++----------
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |  853 ++++++++-------
 .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c   |    4 +-
 .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c |    8 +-
 7 files changed, 1015 insertions(+), 991 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc 
b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
index 27b264d..5245d03 100644
--- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
+++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc
@@ -45,8 +45,24 @@ 
pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx):
 
     char *ip = nullptr;
     const char *confIp = celix_bundleContext_getProperty(ctx, 
PUBSUB_NANOMSG_PSA_IP_KEY , nullptr);
-    if (confIp != nullptr) {
-        ip = strndup(confIp, 1024);
+    if (confIp != NULL) {
+        if (strchr(confIp, '/') != NULL) {
+            // IP with subnet prefix specified
+            char *found_if_ip = calloc(16, sizeof(char));
+            celix_status_t ip_status = ipUtils_findIpBySubnet(confIp, 
&found_if_ip);
+            if (ip_status == CELIX_SUCCESS) {
+                if (found_if_ip != NULL)
+                    ip = strndup(found_if_ip, 16);
+                else
+                    L_WARN("[PSA_NANOMSG] Could not find interface for 
requested subnet %s", confIp);
+            } else {
+                L_ERROR("[PSA_NANOMSG] Error while searching for available 
network interface for subnet %s", confIp);
+            }
+            free(found_if_ip);
+        } else {
+            // IP address specified
+            ip = strndup(confIp, 1024);
+        }
     }
 
     if (ip == nullptr) {
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index 3ac9555..473d051 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -25,6 +25,7 @@
 #include <ifaddrs.h>
 #include <pubsub_endpoint.h>
 #include <pubsub_serializer.h>
+#include <ip_utils.h>
 
 #include "pubsub_utils.h"
 #include "pubsub_tcp_admin.h"
@@ -102,11 +103,23 @@ pubsub_tcp_admin_t* 
pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, log_help
     char *ip = NULL;
     const char *confIp = celix_bundleContext_getProperty(ctx, 
PUBSUB_TCP_PSA_IP_KEY , NULL);
     if (confIp != NULL) {
-        ip = strndup(confIp, 1024);
-    }
-
-    if (ip == NULL) {
-        //TODO try to get ip from subnet (CIDR)
+        if (strchr(confIp, '/') != NULL) {
+            // IP with subnet prefix specified
+            char *found_if_ip = calloc(16, sizeof(char));
+            celix_status_t ip_status = ipUtils_findIpBySubnet(confIp, 
&found_if_ip);
+            if (ip_status == CELIX_SUCCESS) {
+                if (found_if_ip != NULL)
+                    ip = strndup(found_if_ip, 16);
+                else
+                    L_WARN("[PSA_TCP] Could not find interface for requested 
subnet %s", confIp);
+            } else {
+                L_ERROR("[PSA_TCP] Error while searching for available network 
interface for subnet %s", confIp);
+            }
+            free(found_if_ip);
+        } else {
+            // IP address specified
+            ip = strndup(confIp, 1024);
+        }
     }
 
     if (ip == NULL) {
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
index 09b5842..b6f4d5a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
@@ -40,7 +40,7 @@ typedef struct pubsub_tcp_endPointStore{
  * 1) A subscription filter.
  * This is a 5 char string of the first two chars of scope and topic combined 
and terminated with a '\0'.
  *
- * 2) The pubsub_tcp_msg_header_t is send containg the type id and major/minor 
version
+ * 2) The pubsub_tcp_msg_header_t is send containing the type id and 
major/minor version
  *
  * 3) The actual payload
  */
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 5533331..6456f27 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -52,68 +52,68 @@
     logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 
 struct pubsub_tcp_topic_receiver {
-  celix_bundle_context_t *ctx;
-  log_helper_t *logHelper;
-  long serializerSvcId;
-  pubsub_serializer_service_t *serializer;
-  char *scope;
-  char *topic;
-  char scopeAndTopicFilter[5];
-  bool metricsEnabled;
-  pubsub_tcpHandler_pt socketHandler;
-  pubsub_tcpHandler_pt sharedSocketHandler;
-
-  struct {
-    celix_thread_t thread;
-    celix_thread_mutex_t mutex;
-    bool running;
-  } thread;
-
-  struct {
-    celix_thread_mutex_t mutex;
-    hash_map_t *map; //key = tcp url, value = 
psa_tcp_requested_connection_entry_t*
-    bool allConnected; //true if all requestedConnectection are connected
-  } requestedConnections;
-
-  long subscriberTrackerId;
-  struct {
-    celix_thread_mutex_t mutex;
-    hash_map_t *map; //key = bnd id, value = psa_tcp_subscriber_entry_t
-    bool allInitialized;
-  } subscribers;
+    celix_bundle_context_t *ctx;
+    log_helper_t *logHelper;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
+    char *scope;
+    char *topic;
+    char scopeAndTopicFilter[5];
+    bool metricsEnabled;
+    pubsub_tcpHandler_pt socketHandler;
+    pubsub_tcpHandler_pt sharedSocketHandler;
+
+    struct {
+        celix_thread_t thread;
+        celix_thread_mutex_t mutex;
+        bool running;
+    } thread;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = tcp url, value = 
psa_tcp_requested_connection_entry_t*
+        bool allConnected; //true if all requestedConnection are connected
+    } requestedConnections;
+
+    long subscriberTrackerId;
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = bnd id, value = psa_tcp_subscriber_entry_t
+        bool allInitialized;
+    } subscribers;
 };
 
 typedef struct psa_tcp_requested_connection_entry {
-  pubsub_tcp_topic_receiver_t *parent;
-  char *key;
-  char *url;
-  int  fd;
-  bool connected;
-  bool statically; //true if the connection is statically configured through 
the topic properties.
+    pubsub_tcp_topic_receiver_t *parent;
+    char *key;
+    char *url;
+    int fd;
+    bool connected;
+    bool statically; //true if the connection is statically configured through 
the topic properties.
 } psa_tcp_requested_connection_entry_t;
 
 typedef struct psa_tcp_subscriber_metrics_entry_t {
-  unsigned int msgTypeId;
-  uuid_t origin;
-
-  unsigned long nrOfMessagesReceived;
-  unsigned long nrOfSerializationErrors;
-  struct timespec lastMessageReceived;
-  double averageTimeBetweenMessagesInSeconds;
-  double averageSerializationTimeInSeconds;
-  double averageDelayInSeconds;
-  double maxDelayInSeconds;
-  double minDelayInSeconds;
-  unsigned int lastSeqNr;
-  unsigned long nrOfMissingSeqNumbers;
+    unsigned int msgTypeId;
+    uuid_t origin;
+
+    unsigned long nrOfMessagesReceived;
+    unsigned long nrOfSerializationErrors;
+    struct timespec lastMessageReceived;
+    double averageTimeBetweenMessagesInSeconds;
+    double averageSerializationTimeInSeconds;
+    double averageDelayInSeconds;
+    double maxDelayInSeconds;
+    double minDelayInSeconds;
+    unsigned int lastSeqNr;
+    unsigned long nrOfMissingSeqNumbers;
 } psa_tcp_subscriber_metrics_entry_t;
 
 typedef struct psa_tcp_subscriber_entry {
-  int usageCount;
-  hash_map_t *msgTypes; //map from serializer svc
-  hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin 
uuid, value = psa_tcp_subscriber_metrics_entry_t*
-  pubsub_subscriber_t *svc;
-  bool initialized; //true if the init function is called through the receive 
thread
+    int usageCount;
+    hash_map_t *msgTypes; //map from serializer svc
+    hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin 
uuid, value = psa_tcp_subscriber_metrics_entry_t*
+    pubsub_subscriber_t *svc;
+    bool initialized; //true if the init function is called through the 
receive thread
 } psa_tcp_subscriber_entry_t;
 
 
@@ -129,619 +129,615 @@ static void 
psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t
 
 static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t 
*receiver);
 
-static void processMsg(void* handle, const pubsub_tcp_msg_header_t *hdr, const 
unsigned char *payload, size_t payloadSize, struct timespec *receiveTime);
+static void processMsg(void *handle, const pubsub_tcp_msg_header_t *hdr, const 
unsigned char *payload, size_t payloadSize, struct timespec *receiveTime);
 static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
 static void psa_tcp_disConnectHandler(void *handle, const char *url);
 
 
-
 pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
                                                             log_helper_t 
*logHelper,
                                                             const char *scope,
                                                             const char *topic,
                                                             const 
celix_properties_t *topicProperties,
-                                                            
pubsub_tcp_endPointStore_t* endPointStore,
+                                                            
pubsub_tcp_endPointStore_t *endPointStore,
                                                             long 
serializerSvcId,
                                                             
pubsub_serializer_service_t *serializer) {
-  pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
-  receiver->ctx = ctx;
-  receiver->logHelper = logHelper;
-  receiver->serializerSvcId = serializerSvcId;
-  receiver->serializer = serializer;
-  receiver->scope = strndup(scope, 1024 * 1024);
-  receiver->topic = strndup(topic, 1024 * 1024);
-
-  long sessions = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_MAX_RECV_SESSIONS, PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
-  long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_RECV_BUFFER_SIZE, PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
-  long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, 
PSA_TCP_DEFAULT_TIMEOUT);
-  const char *staticConnectUrls = celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
-
-  /* Check if it's a static endpoint */
-  bool isEndPointTypeClient = false;
-  bool isEndPointTypeServer = false;
-  const char *endPointType  = celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
-  if (endPointType != NULL) {
-    if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, 
strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) ==0) {
-      isEndPointTypeClient = true;
+    pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
+    receiver->ctx = ctx;
+    receiver->logHelper = logHelper;
+    receiver->serializerSvcId = serializerSvcId;
+    receiver->serializer = serializer;
+    receiver->scope = strndup(scope, 1024 * 1024);
+    receiver->topic = strndup(topic, 1024 * 1024);
+
+    long sessions = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_MAX_RECV_SESSIONS, PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
+    long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_RECV_BUFFER_SIZE, PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
+    long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, 
PSA_TCP_DEFAULT_TIMEOUT);
+    const char *staticConnectUrls = celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
+
+    /* Check if it's a static endpoint */
+    bool isEndPointTypeClient = false;
+    bool isEndPointTypeServer = false;
+    const char *endPointType = celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
+    if (endPointType != NULL) {
+        if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, 
strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
+            isEndPointTypeClient = true;
+        }
+        if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, 
strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
+            isEndPointTypeServer = true;
+        }
     }
-    if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, 
strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) ==0) {
-      isEndPointTypeServer = true;
+    // When endpoint is server, use the bind url as a key.
+    const char *staticBindUrl = ((topicProperties != NULL) && 
isEndPointTypeServer) ? celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_BIND_URL, NULL) : NULL;
+    /* When it's an endpoint share the socket with the receiver */
+    if (staticBindUrl != NULL || (isEndPointTypeClient && staticConnectUrls != 
NULL)) {
+        celixThreadMutex_lock(&receiver->thread.mutex);
+        pubsub_tcpHandler_pt entry = hashMap_get(endPointStore->map, 
(isEndPointTypeServer) ? staticBindUrl : staticConnectUrls);
+        if (entry != NULL) {
+            receiver->socketHandler = entry;
+            receiver->sharedSocketHandler = entry;
+        } else {
+            L_ERROR("[PSA_TCP] Cannot find static Endpoint URL for %s/%s", 
scope, topic);
+        }
+        celixThreadMutex_unlock(&receiver->thread.mutex);
     }
-  }
-  // When endpoint is server, use the bind url as a key.
-  const char *staticBindUrl = ((topicProperties != NULL) && 
isEndPointTypeServer) ? celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_BIND_URL, NULL) : NULL;
-  /* When it's an endpoint share the socket with the receiver */
-  if (staticBindUrl != NULL  || (isEndPointTypeClient && staticConnectUrls != 
NULL)) {
-    celixThreadMutex_lock(&receiver->thread.mutex);
-    pubsub_tcpHandler_pt entry = hashMap_get(endPointStore->map, 
(isEndPointTypeServer) ? staticBindUrl : staticConnectUrls);
-    if(entry != NULL) {
-      receiver->socketHandler = entry;
-      receiver->sharedSocketHandler = entry;
-    } else {
-      L_ERROR("[PSA_TCP] Cannot find static Endpoint URL for %s/%s", scope, 
topic);
+
+    if (receiver->socketHandler == NULL) {
+        receiver->socketHandler = 
pubsub_tcpHandler_create(receiver->logHelper);
     }
-    celixThreadMutex_unlock(&receiver->thread.mutex);
-  }
-
-  if (receiver->socketHandler == NULL) {
-    receiver->socketHandler = pubsub_tcpHandler_create(receiver->logHelper);
-  }
-
-  if (receiver->socketHandler != NULL) {
-    pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, 
(unsigned int) sessions, (unsigned int) buffer_size);
-    pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) 
timeout);
-    pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, 
processMsg);
-    pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, receiver, 
psa_tcp_connectHandler,psa_tcp_disConnectHandler);
-  }
-
-  psa_tcp_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter);
-  receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, 
PSA_TCP_METRICS_ENABLED,
-                                                                        
PSA_TCP_DEFAULT_METRICS_ENABLED);
-
-  celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
-  celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
-  celixThreadMutex_create(&receiver->thread.mutex, NULL);
-
-  receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
-  receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-  receiver->requestedConnections.allConnected = false;
-
-  if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && 
(staticBindUrl == NULL)) {
-    char *urlsCopy = strndup(staticConnectUrls, 1024*1024);
-    char* url;
-    char* save = urlsCopy;
-    while ((url = strtok_r(save, " ", &save))) {
-      psa_tcp_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
-      entry->statically = true;
-      entry->connected = false;
-      entry->url = strndup(url, 1024*1024);
-      entry->parent = receiver;
-      hashMap_put(receiver->requestedConnections.map, entry->url, entry);
+
+    if (receiver->socketHandler != NULL) {
+        pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, 
(unsigned int) sessions, (unsigned int) buffer_size);
+        pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) 
timeout);
+        pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, 
processMsg);
+        pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, 
receiver, psa_tcp_connectHandler, psa_tcp_disConnectHandler);
     }
-    free(urlsCopy);
-
-    // Configure Receiver thread
-    receiver->thread.running = true;
-    celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, 
receiver);
-    char name[64];
-    snprintf(name, 64, "TCP TR %s/%s", scope, topic);
-    celixThread_setName(&receiver->thread.thread, name);
-    psa_tcp_setupTcpContext(receiver->logHelper, &receiver->thread.thread, 
topicProperties);
-  }
-
-  //track subscribers
-  if (receiver->socketHandler != NULL) {
-    int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
-    char buf[size+1];
-    snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
-    celix_service_tracking_options_t opts = 
CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-    opts.filter.ignoreServiceLanguage = true;
-    opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
-    opts.filter.filter = buf;
-    opts.callbackHandle = receiver;
-    opts.addWithOwner = pubsub_tcpTopicReceiver_addSubscriber;
-    opts.removeWithOwner = pubsub_tcpTopicReceiver_removeSubscriber;
-    receiver->subscriberTrackerId = 
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-  }
-
-  if (receiver->socketHandler == NULL) {
-    free(receiver->scope);
-    free(receiver->topic);
-    free(receiver);
-    receiver = NULL;
-    L_ERROR("[PSA_TCP] Cannot create TopicReceiver for %s/%s", scope, topic);
-  }
-  return receiver;
-}
 
-void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
-  if (receiver != NULL) {
+    psa_tcp_setScopeAndTopicFilter(scope, topic, 
receiver->scopeAndTopicFilter);
+    receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, 
PSA_TCP_METRICS_ENABLED,
+                                                                     
PSA_TCP_DEFAULT_METRICS_ENABLED);
 
+    celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
+    celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
+    celixThreadMutex_create(&receiver->thread.mutex, NULL);
 
-    celixThreadMutex_lock(&receiver->thread.mutex);
-    if (!receiver->thread.running) {
-      receiver->thread.running = false;
-      celixThreadMutex_unlock(&receiver->thread.mutex);
-      celixThread_join(receiver->thread.thread, NULL);
+    receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
+    receiver->requestedConnections.map = hashMap_create(utils_stringHash, 
NULL, utils_stringEquals, NULL);
+    receiver->requestedConnections.allConnected = false;
+
+    if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && 
(staticBindUrl == NULL)) {
+        char *urlsCopy = strndup(staticConnectUrls, 1024 * 1024);
+        char *url;
+        char *save = urlsCopy;
+        while ((url = strtok_r(save, " ", &save))) {
+            psa_tcp_requested_connection_entry_t *entry = calloc(1, 
sizeof(*entry));
+            entry->statically = true;
+            entry->connected = false;
+            entry->url = strndup(url, 1024 * 1024);
+            entry->parent = receiver;
+            hashMap_put(receiver->requestedConnections.map, entry->url, entry);
+        }
+        free(urlsCopy);
+
+        // Configure Receiver thread
+        receiver->thread.running = true;
+        celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, 
receiver);
+        char name[64];
+        snprintf(name, 64, "TCP TR %s/%s", scope, topic);
+        celixThread_setName(&receiver->thread.thread, name);
+        psa_tcp_setupTcpContext(receiver->logHelper, &receiver->thread.thread, 
topicProperties);
     }
 
-    celix_bundleContext_stopTracker(receiver->ctx, 
receiver->subscriberTrackerId);
+    //track subscribers
+    if (receiver->socketHandler != NULL) {
+        int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, 
topic);
+        char buf[size + 1];
+        snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, 
topic);
+        celix_service_tracking_options_t opts = 
CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.filter.ignoreServiceLanguage = true;
+        opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
+        opts.filter.filter = buf;
+        opts.callbackHandle = receiver;
+        opts.addWithOwner = pubsub_tcpTopicReceiver_addSubscriber;
+        opts.removeWithOwner = pubsub_tcpTopicReceiver_removeSubscriber;
+        receiver->subscriberTrackerId = 
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
+    }
 
-    celixThreadMutex_lock(&receiver->subscribers.mutex);
-    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-      psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-      if (entry != NULL) {
-        
receiver->serializer->destroySerializerMap(receiver->serializer->handle, 
entry->msgTypes);
-        free(entry);
-      }
-
-      hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
-      while (hashMapIterator_hasNext(&iter2)) {
-        hash_map_t *origins = hashMapIterator_nextValue(&iter2);
-        hashMap_destroy(origins, true, true);
-      }
-      hashMap_destroy(entry->metrics, false, false);
+    if (receiver->socketHandler == NULL) {
+        free(receiver->scope);
+        free(receiver->topic);
+        free(receiver);
+        receiver = NULL;
+        L_ERROR("[PSA_TCP] Cannot create TopicReceiver for %s/%s", scope, 
topic);
     }
-    hashMap_destroy(receiver->subscribers.map, false, false);
+    return receiver;
+}
 
+void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
+    if (receiver != NULL) {
 
-    celixThreadMutex_unlock(&receiver->subscribers.mutex);
 
-    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-    iter = hashMapIterator_construct(receiver->requestedConnections.map);
-    while (hashMapIterator_hasNext(&iter)) {
-      psa_tcp_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-      if (entry != NULL) {
-        free(entry->url);
-        free(entry);
-      }
-    }
-    hashMap_destroy(receiver->requestedConnections.map, false, false);
-    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+        celixThreadMutex_lock(&receiver->thread.mutex);
+        if (!receiver->thread.running) {
+            receiver->thread.running = false;
+            celixThreadMutex_unlock(&receiver->thread.mutex);
+            celixThread_join(receiver->thread.thread, NULL);
+        }
 
-    celixThreadMutex_destroy(&receiver->subscribers.mutex);
-    celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
-    celixThreadMutex_destroy(&receiver->thread.mutex);
+        celix_bundleContext_stopTracker(receiver->ctx, 
receiver->subscriberTrackerId);
+
+        celixThreadMutex_lock(&receiver->subscribers.mutex);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_tcp_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+            if (entry != NULL) {
+                
receiver->serializer->destroySerializerMap(receiver->serializer->handle, 
entry->msgTypes);
+                free(entry);
+            }
+
+            hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->metrics);
+            while (hashMapIterator_hasNext(&iter2)) {
+                hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+                hashMap_destroy(origins, true, true);
+            }
+            hashMap_destroy(entry->metrics, false, false);
+        }
+        hashMap_destroy(receiver->subscribers.map, false, false);
 
-    pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL, NULL);
-    pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, NULL, 
NULL, NULL);
-    if ((receiver->socketHandler)&&(receiver->sharedSocketHandler == NULL)) {
-      pubsub_tcpHandler_destroy(receiver->socketHandler);
-      receiver->socketHandler = NULL;
-    }
 
-    free(receiver->scope);
-    free(receiver->topic);
-  }
-  free(receiver);
+        celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
+        celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+        iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_tcp_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+            if (entry != NULL) {
+                free(entry->url);
+                free(entry);
+            }
+        }
+        hashMap_destroy(receiver->requestedConnections.map, false, false);
+        celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+        celixThreadMutex_destroy(&receiver->subscribers.mutex);
+        celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
+        celixThreadMutex_destroy(&receiver->thread.mutex);
+
+        pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL, 
NULL);
+        pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, NULL, 
NULL, NULL);
+        if ((receiver->socketHandler) && (receiver->sharedSocketHandler == 
NULL)) {
+            pubsub_tcpHandler_destroy(receiver->socketHandler);
+            receiver->socketHandler = NULL;
+        }
+
+        free(receiver->scope);
+        free(receiver->topic);
+    }
+    free(receiver);
 }
 
 const char *pubsub_tcpTopicReceiver_scope(pubsub_tcp_topic_receiver_t 
*receiver) {
-  return receiver->scope;
+    return receiver->scope;
 }
 
 const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t 
*receiver) {
-  return receiver->topic;
+    return receiver->topic;
 }
 
 long pubsub_tcpTopicReceiver_serializerSvcId(pubsub_tcp_topic_receiver_t 
*receiver) {
-  return receiver->serializerSvcId;
+    return receiver->serializerSvcId;
 }
 
 void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t 
*receiver, celix_array_list_t *connectedUrls,
                                              celix_array_list_t 
*unconnectedUrls) {
-  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-  hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->requestedConnections.map);
-  while (hashMapIterator_hasNext(&iter)) {
-    psa_tcp_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-    char *url = NULL;
-    asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
-    if (entry->connected) {
-      celix_arrayList_add(connectedUrls, url);
-    } else {
-      celix_arrayList_add(unconnectedUrls, url);
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->requestedConnections.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        char *url = NULL;
+        asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : 
"");
+        if (entry->connected) {
+            celix_arrayList_add(connectedUrls, url);
+        } else {
+            celix_arrayList_add(unconnectedUrls, url);
+        }
     }
-  }
-  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
 
-
 void pubsub_tcpTopicReceiver_connectTo(
         pubsub_tcp_topic_receiver_t *receiver,
         const char *url) {
-  L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s", 
receiver->scope, receiver->topic, url);
-
-  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-  psa_tcp_requested_connection_entry_t *entry = 
hashMap_get(receiver->requestedConnections.map, url);
-  if (entry == NULL) {
-    entry = calloc(1, sizeof(*entry));
-    entry->url = strndup(url, 1024*1024);
-    entry->connected = false;
-    entry->statically = false;
-    entry->parent = receiver;
-    hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
-    receiver->requestedConnections.allConnected = false;
-  }
-  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+    L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s", 
receiver->scope, receiver->topic, url);
 
-  psa_tcp_connectToAllRequestedConnections(receiver);
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    psa_tcp_requested_connection_entry_t *entry = 
hashMap_get(receiver->requestedConnections.map, url);
+    if (entry == NULL) {
+        entry = calloc(1, sizeof(*entry));
+        entry->url = strndup(url, 1024 * 1024);
+        entry->connected = false;
+        entry->statically = false;
+        entry->parent = receiver;
+        hashMap_put(receiver->requestedConnections.map, (void *) entry->url, 
entry);
+        receiver->requestedConnections.allConnected = false;
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+    psa_tcp_connectToAllRequestedConnections(receiver);
 }
 
 void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t 
*receiver, const char *url) {
-  L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", 
receiver->scope, receiver->topic, url);
-
-  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-  psa_tcp_requested_connection_entry_t *entry = 
hashMap_remove(receiver->requestedConnections.map, url);
-  if (entry != NULL) {
-    int rc = pubsub_tcpHandler_closeConnection(receiver->socketHandler, 
entry->url);
-    if (rc < 0) L_WARN("[PSA_TCP] Error disconnecting from tcp url %s. (%s)", 
url, strerror(errno));
-  }
-  if (entry != NULL) {
-    free(entry->url);
-    free(entry);
-  }
-  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+    L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", 
receiver->scope, receiver->topic, url);
+
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    psa_tcp_requested_connection_entry_t *entry = 
hashMap_remove(receiver->requestedConnections.map, url);
+    if (entry != NULL) {
+        int rc = pubsub_tcpHandler_closeConnection(receiver->socketHandler, 
entry->url);
+        if (rc < 0) L_WARN("[PSA_TCP] Error disconnecting from tcp url %s. 
(%s)", url, strerror(errno));
+    }
+    if (entry != NULL) {
+        free(entry->url);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
 
 static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, 
const celix_properties_t *props,
                                                   const celix_bundle_t *bnd) {
-  pubsub_tcp_topic_receiver_t *receiver = handle;
-
-  long bndId = celix_bundle_getId(bnd);
-  const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, 
"default");
-  if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
-    //not the same scope. ignore
-    return;
-  }
-
-  celixThreadMutex_lock(&receiver->subscribers.mutex);
-  psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, 
(void *) bndId);
-  if (entry != NULL) {
-    entry->usageCount += 1;
-  } else {
-    //new create entry
-    entry = calloc(1, sizeof(*entry));
-    entry->usageCount = 1;
-    entry->svc = svc;
-    entry->initialized = false;
-    receiver->subscribers.allInitialized = false;
-
-    int rc = 
receiver->serializer->createSerializerMap(receiver->serializer->handle, 
(celix_bundle_t *) bnd,
-                                                       &entry->msgTypes);
-
-    if (rc == 0) {
-      entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
-      hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes);
-      while (hashMapIterator_hasNext(&iter)) {
-        pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
-        hash_map_t *origins = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-        hashMap_put(entry->metrics, (void *) (uintptr_t) msgSer->msgId, 
origins);
-      }
+    pubsub_tcp_topic_receiver_t *receiver = handle;
+
+    long bndId = celix_bundle_getId(bnd);
+    const char *subScope = celix_properties_get(props, 
PUBSUB_SUBSCRIBER_SCOPE, "default");
+    if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
+        //not the same scope. ignore
+        return;
     }
 
-    if (rc == 0) {
-      hashMap_put(receiver->subscribers.map, (void *) bndId, entry);
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, 
(void *) bndId);
+    if (entry != NULL) {
+        entry->usageCount += 1;
     } else {
-      L_ERROR("[PSA_TCP] Cannot create msg serializer map for TopicReceiver 
%s/%s", receiver->scope, receiver->topic);
-      free(entry);
+        //new create entry
+        entry = calloc(1, sizeof(*entry));
+        entry->usageCount = 1;
+        entry->svc = svc;
+        entry->initialized = false;
+        receiver->subscribers.allInitialized = false;
+
+        int rc = 
receiver->serializer->createSerializerMap(receiver->serializer->handle, 
(celix_bundle_t *) bnd,
+                                                           &entry->msgTypes);
+
+        if (rc == 0) {
+            entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
+            hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgTypes);
+            while (hashMapIterator_hasNext(&iter)) {
+                pubsub_msg_serializer_t *msgSer = 
hashMapIterator_nextValue(&iter);
+                hash_map_t *origins = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+                hashMap_put(entry->metrics, (void *) (uintptr_t) 
msgSer->msgId, origins);
+            }
+        }
+
+        if (rc == 0) {
+            hashMap_put(receiver->subscribers.map, (void *) bndId, entry);
+        } else {
+            L_ERROR("[PSA_TCP] Cannot create msg serializer map for 
TopicReceiver %s/%s", receiver->scope, receiver->topic);
+            free(entry);
+        }
     }
-  }
-  celixThreadMutex_unlock(&receiver->subscribers.mutex);
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
 static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, 
const celix_properties_t *props,
                                                      const celix_bundle_t 
*bnd) {
-  pubsub_tcp_topic_receiver_t *receiver = handle;
-
-  long bndId = celix_bundle_getId(bnd);
-
-  celixThreadMutex_lock(&receiver->subscribers.mutex);
-  psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, 
(void *) bndId);
-  if (entry != NULL) {
-    entry->usageCount -= 1;
-  }
-  if (entry != NULL && entry->usageCount <= 0) {
-    //remove entry
-    hashMap_remove(receiver->subscribers.map, (void *) bndId);
-    int rc = 
receiver->serializer->destroySerializerMap(receiver->serializer->handle, 
entry->msgTypes);
-    if (rc != 0) {
-      L_ERROR("[PSA_TCP] Cannot destroy msg serializers map for TopicReceiver 
%s/%s", receiver->scope, receiver->topic);
+    pubsub_tcp_topic_receiver_t *receiver = handle;
+
+    long bndId = celix_bundle_getId(bnd);
+
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, 
(void *) bndId);
+    if (entry != NULL) {
+        entry->usageCount -= 1;
     }
-    hash_map_iterator_t iter = hashMapIterator_construct(entry->metrics);
-    while (hashMapIterator_hasNext(&iter)) {
-      hash_map_t *origins = hashMapIterator_nextValue(&iter);
-      hashMap_destroy(origins, true, true);
+    if (entry != NULL && entry->usageCount <= 0) {
+        //remove entry
+        hashMap_remove(receiver->subscribers.map, (void *) bndId);
+        int rc = 
receiver->serializer->destroySerializerMap(receiver->serializer->handle, 
entry->msgTypes);
+        if (rc != 0) {
+            L_ERROR("[PSA_TCP] Cannot destroy msg serializers map for 
TopicReceiver %s/%s", receiver->scope, receiver->topic);
+        }
+        hash_map_iterator_t iter = hashMapIterator_construct(entry->metrics);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_t *origins = hashMapIterator_nextValue(&iter);
+            hashMap_destroy(origins, true, true);
+        }
+        hashMap_destroy(entry->metrics, false, false);
+        free(entry);
     }
-    hashMap_destroy(entry->metrics, false, false);
-    free(entry);
-  }
-  celixThreadMutex_unlock(&receiver->subscribers.mutex);
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
 static inline void
 processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, 
psa_tcp_subscriber_entry_t *entry,
                              const pubsub_tcp_msg_header_t *hdr, const 
unsigned char *payload, size_t payloadSize,
                              struct timespec *receiveTime) {
-  //NOTE receiver->subscribers.mutex locked
-  pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) 
(uintptr_t) (hdr->type));
-  pubsub_subscriber_t *svc = entry->svc;
-  bool monitor = receiver->metricsEnabled;
-
-  //monitoring
-  struct timespec beginSer;
-  struct timespec endSer;
-  int updateReceiveCount = 0;
-  int updateSerError = 0;
-
-  if (msgSer != NULL) {
-    void *deserializedMsg = NULL;
-    bool validVersion = psa_tcp_checkVersion(msgSer->msgVersion, hdr);
-    if (validVersion) {
-      if (monitor) {
-        clock_gettime(CLOCK_REALTIME, &beginSer);
-      }
-      celix_status_t status = msgSer->deserialize(msgSer->handle, payload, 
payloadSize, &deserializedMsg);
-      if (monitor) {
-        clock_gettime(CLOCK_REALTIME, &endSer);
-      }
-      if (status == CELIX_SUCCESS) {
-        bool release = true;
-        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, 
deserializedMsg, &release);
-        if (release) {
-          msgSer->freeMsg(msgSer->handle, deserializedMsg);
+    //NOTE receiver->subscribers.mutex locked
+    pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) 
(uintptr_t)(hdr->type));
+    pubsub_subscriber_t *svc = entry->svc;
+    bool monitor = receiver->metricsEnabled;
+
+    //monitoring
+    struct timespec beginSer;
+    struct timespec endSer;
+    int updateReceiveCount = 0;
+    int updateSerError = 0;
+
+    if (msgSer != NULL) {
+        void *deserializedMsg = NULL;
+        bool validVersion = psa_tcp_checkVersion(msgSer->msgVersion, hdr);
+        if (validVersion) {
+            if (monitor) {
+                clock_gettime(CLOCK_REALTIME, &beginSer);
+            }
+            celix_status_t status = msgSer->deserialize(msgSer->handle, 
payload, payloadSize, &deserializedMsg);
+            if (monitor) {
+                clock_gettime(CLOCK_REALTIME, &endSer);
+            }
+            if (status == CELIX_SUCCESS) {
+                bool release = true;
+                svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, 
deserializedMsg, &release);
+                if (release) {
+                    msgSer->freeMsg(msgSer->handle, deserializedMsg);
+                }
+                updateReceiveCount += 1;
+            } else {
+                updateSerError += 1;
+                L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for 
scope/topic %s/%s", msgSer->msgName, receiver->scope,
+                       receiver->topic);
+            }
         }
-        updateReceiveCount += 1;
-      } else {
-        updateSerError += 1;
-        L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic 
%s/%s", msgSer->msgName, receiver->scope,
-               receiver->topic);
-      }
-    }
-  } else {
-    L_WARN("[PSA_TCP_TR] Cannot find serializer for type id 0x%X", hdr->type);
-  }
-
-  if (msgSer != NULL && monitor) {
-    hash_map_t *origins = hashMap_get(entry->metrics, (void *) (uintptr_t) 
hdr->type);
-    char uuidStr[UUID_STR_LEN + 1];
-    uuid_unparse(hdr->originUUID, uuidStr);
-    psa_tcp_subscriber_metrics_entry_t *metrics = hashMap_get(origins, 
uuidStr);
-
-    if (metrics == NULL) {
-      metrics = calloc(1, sizeof(*metrics));
-      hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN + 1), metrics);
-      uuid_copy(metrics->origin, hdr->originUUID);
-      metrics->msgTypeId = hdr->type;
-      metrics->maxDelayInSeconds = -INFINITY;
-      metrics->minDelayInSeconds = INFINITY;
-      metrics->lastSeqNr = 0;
+    } else {
+        L_WARN("[PSA_TCP_TR] Cannot find serializer for type id 0x%X", 
hdr->type);
     }
 
-    double diff = celix_difftime(&beginSer, &endSer);
-    long n = metrics->nrOfMessagesReceived;
-    metrics->averageSerializationTimeInSeconds = 
(metrics->averageSerializationTimeInSeconds * n + diff) / (n + 1);
+    if (msgSer != NULL && monitor) {
+        hash_map_t *origins = hashMap_get(entry->metrics, (void *) (uintptr_t) 
hdr->type);
+        char uuidStr[UUID_STR_LEN + 1];
+        uuid_unparse(hdr->originUUID, uuidStr);
+        psa_tcp_subscriber_metrics_entry_t *metrics = hashMap_get(origins, 
uuidStr);
+
+        if (metrics == NULL) {
+            metrics = calloc(1, sizeof(*metrics));
+            hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN + 1), metrics);
+            uuid_copy(metrics->origin, hdr->originUUID);
+            metrics->msgTypeId = hdr->type;
+            metrics->maxDelayInSeconds = -INFINITY;
+            metrics->minDelayInSeconds = INFINITY;
+            metrics->lastSeqNr = 0;
+        }
 
-    diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
-    n = metrics->nrOfMessagesReceived;
-    if (metrics->nrOfMessagesReceived >= 1) {
-      metrics->averageTimeBetweenMessagesInSeconds =
-        (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
-    }
-    metrics->lastMessageReceived = *receiveTime;
+        double diff = celix_difftime(&beginSer, &endSer);
+        long n = metrics->nrOfMessagesReceived;
+        metrics->averageSerializationTimeInSeconds = 
(metrics->averageSerializationTimeInSeconds * n + diff) / (n + 1);
 
+        diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
+        n = metrics->nrOfMessagesReceived;
+        if (metrics->nrOfMessagesReceived >= 1) {
+            metrics->averageTimeBetweenMessagesInSeconds =
+                    (metrics->averageTimeBetweenMessagesInSeconds * n + diff) 
/ (n + 1);
+        }
+        metrics->lastMessageReceived = *receiveTime;
 
-    int incr = hdr->seqNr - metrics->lastSeqNr;
-    if (metrics->lastSeqNr > 0 && incr > 1) {
-      metrics->nrOfMissingSeqNumbers += (incr - 1);
-      L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, 
hdr->seqNr);
-    }
-    metrics->lastSeqNr = hdr->seqNr;
-
-    struct timespec sendTime;
-    sendTime.tv_sec = (time_t) hdr->sendtimeSeconds;
-    sendTime.tv_nsec = (long) hdr->sendTimeNanoseconds; //TODO FIXME the 
tv_nsec is not correct
-    diff = celix_difftime(&sendTime, receiveTime);
-    metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + 
diff) / (n + 1);
-    if (diff < metrics->minDelayInSeconds) {
-      metrics->minDelayInSeconds = diff;
-    }
-    if (diff > metrics->maxDelayInSeconds) {
-      metrics->maxDelayInSeconds = diff;
-    }
 
-    metrics->nrOfMessagesReceived += updateReceiveCount;
-    metrics->nrOfSerializationErrors += updateSerError;
-  }
-}
-static void
-processMsg(void* handle, const pubsub_tcp_msg_header_t *hdr, const unsigned 
char *payload, size_t payloadSize, struct timespec *receiveTime) {
-  pubsub_tcp_topic_receiver_t *receiver = handle;
-  celixThreadMutex_lock(&receiver->subscribers.mutex);
-  hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
-  while (hashMapIterator_hasNext(&iter)) {
-    psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-    if (entry != NULL) {
-      processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize, 
receiveTime);
+        int incr = hdr->seqNr - metrics->lastSeqNr;
+        if (metrics->lastSeqNr > 0 && incr > 1) {
+            metrics->nrOfMissingSeqNumbers += (incr - 1);
+            L_WARN("Missing message seq nr went from %i to %i", 
metrics->lastSeqNr, hdr->seqNr);
+        }
+        metrics->lastSeqNr = hdr->seqNr;
+
+        struct timespec sendTime;
+        sendTime.tv_sec = (time_t) hdr->sendtimeSeconds;
+        sendTime.tv_nsec = (long) hdr->sendTimeNanoseconds; //TODO FIXME the 
tv_nsec is not correct
+        diff = celix_difftime(&sendTime, receiveTime);
+        metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + 
diff) / (n + 1);
+        if (diff < metrics->minDelayInSeconds) {
+            metrics->minDelayInSeconds = diff;
+        }
+        if (diff > metrics->maxDelayInSeconds) {
+            metrics->maxDelayInSeconds = diff;
+        }
+
+        metrics->nrOfMessagesReceived += updateReceiveCount;
+        metrics->nrOfSerializationErrors += updateSerError;
     }
-  }
-  celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-
+static void processMsg(void *handle, const pubsub_tcp_msg_header_t *hdr, const 
unsigned char *payload, size_t payloadSize, struct timespec *receiveTime) {
+    pubsub_tcp_topic_receiver_t *receiver = handle;
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+        if (entry != NULL) {
+            processMsgForSubscriberEntry(receiver, entry, hdr, payload, 
payloadSize, receiveTime);
+        }
+    }
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+}
 
 static void *psa_tcp_recvThread(void *data) {
-  pubsub_tcp_topic_receiver_t *receiver = data;
-
-  celixThreadMutex_lock(&receiver->thread.mutex);
-  bool running = receiver->thread.running;
-  celixThreadMutex_unlock(&receiver->thread.mutex);
-
-  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-  bool allConnected = receiver->requestedConnections.allConnected;
-  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
-
-  celixThreadMutex_lock(&receiver->subscribers.mutex);
-  bool allInitialized = receiver->subscribers.allInitialized;
-  celixThreadMutex_unlock(&receiver->subscribers.mutex);
-
-  while (running) {
-    if (!allConnected) {
-      psa_tcp_connectToAllRequestedConnections(receiver);
-    }
-    if (!allInitialized) {
-      psa_tcp_initializeAllSubscribers(receiver);
-    }
-    pubsub_tcpHandler_handler(receiver->socketHandler);
+    pubsub_tcp_topic_receiver_t *receiver = data;
 
     celixThreadMutex_lock(&receiver->thread.mutex);
-    running = receiver->thread.running;
+    bool running = receiver->thread.running;
     celixThreadMutex_unlock(&receiver->thread.mutex);
 
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-    allConnected = receiver->requestedConnections.allConnected;
+    bool allConnected = receiver->requestedConnections.allConnected;
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 
     celixThreadMutex_lock(&receiver->subscribers.mutex);
-    allInitialized = receiver->subscribers.allInitialized;
+    bool allInitialized = receiver->subscribers.allInitialized;
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
-  } // while
-  return NULL;
+
+    while (running) {
+        if (!allConnected) {
+            psa_tcp_connectToAllRequestedConnections(receiver);
+        }
+        if (!allInitialized) {
+            psa_tcp_initializeAllSubscribers(receiver);
+        }
+        pubsub_tcpHandler_handler(receiver->socketHandler);
+
+        celixThreadMutex_lock(&receiver->thread.mutex);
+        running = receiver->thread.running;
+        celixThreadMutex_unlock(&receiver->thread.mutex);
+
+        celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+        allConnected = receiver->requestedConnections.allConnected;
+        celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+
+        celixThreadMutex_lock(&receiver->subscribers.mutex);
+        allInitialized = receiver->subscribers.allInitialized;
+        celixThreadMutex_unlock(&receiver->subscribers.mutex);
+    } // while
+    return NULL;
 }
 
 pubsub_admin_receiver_metrics_t 
*pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) {
-  pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-  snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
receiver->scope);
-  snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
receiver->topic);
-
-  int msgTypesCount = 0;
-  celixThreadMutex_lock(&receiver->subscribers.mutex);
-  hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
-  while (hashMapIterator_hasNext(&iter)) {
-    psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
-    while (hashMapIterator_hasNext(&iter2)) {
-      hashMapIterator_nextValue(&iter2);
-      msgTypesCount += 1;
+    pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
receiver->scope);
+    snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
receiver->topic);
+
+    int msgTypesCount = 0;
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+        while (hashMapIterator_hasNext(&iter2)) {
+            hashMapIterator_nextValue(&iter2);
+            msgTypesCount += 1;
+        }
     }
-  }
-
-  result->nrOfMsgTypes = (unsigned long) msgTypesCount;
-  result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes));
-  int i = 0;
-  iter = hashMapIterator_construct(receiver->subscribers.map);
-  while (hashMapIterator_hasNext(&iter)) {
-    psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
-    while (hashMapIterator_hasNext(&iter2)) {
-      hash_map_t *origins = hashMapIterator_nextValue(&iter2);
-      result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), 
sizeof(*(result->msgTypes[i].origins)));
-      result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
-      int k = 0;
-      hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
-      while (hashMapIterator_hasNext(&iter3)) {
-        psa_tcp_subscriber_metrics_entry_t *metrics = 
hashMapIterator_nextValue(&iter3);
-        result->msgTypes[i].typeId = metrics->msgTypeId;
-        pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void 
*) (uintptr_t) metrics->msgTypeId);
-        if (msgSer) {
-          snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, 
"%s", msgSer->msgName);
-          uuid_copy(result->msgTypes[i].origins[k].originUUID, 
metrics->origin);
-          result->msgTypes[i].origins[k].nrOfMessagesReceived = 
metrics->nrOfMessagesReceived;
-          result->msgTypes[i].origins[k].nrOfSerializationErrors = 
metrics->nrOfSerializationErrors;
-          result->msgTypes[i].origins[k].averageDelayInSeconds = 
metrics->averageDelayInSeconds;
-          result->msgTypes[i].origins[k].maxDelayInSeconds = 
metrics->maxDelayInSeconds;
-          result->msgTypes[i].origins[k].minDelayInSeconds = 
metrics->minDelayInSeconds;
-          result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = 
metrics->averageTimeBetweenMessagesInSeconds;
-          result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = 
metrics->averageSerializationTimeInSeconds;
-          result->msgTypes[i].origins[k].lastMessageReceived = 
metrics->lastMessageReceived;
-          result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = 
metrics->nrOfMissingSeqNumbers;
-
-          k += 1;
-        } else {
-          L_WARN("[PSA_TCP]: Error cannot find key 0x%X in msg map during 
metrics collection!\n", metrics->msgTypeId);
+
+    result->nrOfMsgTypes = (unsigned long) msgTypesCount;
+    result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes));
+    int i = 0;
+    iter = hashMapIterator_construct(receiver->subscribers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+        while (hashMapIterator_hasNext(&iter2)) {
+            hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+            result->msgTypes[i].origins = calloc((size_t) 
hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
+            result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
+            int k = 0;
+            hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
+            while (hashMapIterator_hasNext(&iter3)) {
+                psa_tcp_subscriber_metrics_entry_t *metrics = 
hashMapIterator_nextValue(&iter3);
+                result->msgTypes[i].typeId = metrics->msgTypeId;
+                pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, 
(void *) (uintptr_t) metrics->msgTypeId);
+                if (msgSer) {
+                    snprintf(result->msgTypes[i].typeFqn, 
PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
+                    uuid_copy(result->msgTypes[i].origins[k].originUUID, 
metrics->origin);
+                    result->msgTypes[i].origins[k].nrOfMessagesReceived = 
metrics->nrOfMessagesReceived;
+                    result->msgTypes[i].origins[k].nrOfSerializationErrors = 
metrics->nrOfSerializationErrors;
+                    result->msgTypes[i].origins[k].averageDelayInSeconds = 
metrics->averageDelayInSeconds;
+                    result->msgTypes[i].origins[k].maxDelayInSeconds = 
metrics->maxDelayInSeconds;
+                    result->msgTypes[i].origins[k].minDelayInSeconds = 
metrics->minDelayInSeconds;
+                    
result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = 
metrics->averageTimeBetweenMessagesInSeconds;
+                    
result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = 
metrics->averageSerializationTimeInSeconds;
+                    result->msgTypes[i].origins[k].lastMessageReceived = 
metrics->lastMessageReceived;
+                    result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = 
metrics->nrOfMissingSeqNumbers;
+
+                    k += 1;
+                } else {
+                    L_WARN("[PSA_TCP]: Error cannot find key 0x%X in msg map 
during metrics collection!\n", metrics->msgTypeId);
+                }
+            }
+            i += 1;
         }
-      }
-      i += 1;
     }
-  }
-  celixThreadMutex_unlock(&receiver->subscribers.mutex);
-  return result;
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+    return result;
 }
 
 
 static void 
psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver) 
{
-  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-  if (!receiver->requestedConnections.allConnected) {
-    bool allConnected = true;
-    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->requestedConnections.map);
-    while (hashMapIterator_hasNext(&iter)) {
-      psa_tcp_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-      if (!entry->connected){
-        entry->fd = pubsub_tcpHandler_connect(entry->parent->socketHandler, 
entry->url);
-        if (entry->fd < 0) {
-          L_WARN("[PSA_TCP] Error connecting to tcp url %s\n", entry->url);
-          allConnected = false;
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    if (!receiver->requestedConnections.allConnected) {
+        bool allConnected = true;
+        hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->requestedConnections.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_tcp_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+            if (!entry->connected) {
+                entry->fd = 
pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
+                if (entry->fd < 0) {
+                    L_WARN("[PSA_TCP] Error connecting to tcp url %s\n", 
entry->url);
+                    allConnected = false;
+                }
+            }
         }
-      }
+        receiver->requestedConnections.allConnected = allConnected;
     }
-    receiver->requestedConnections.allConnected = allConnected;
-  }
-  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
 static void psa_tcp_connectHandler(void *handle, const char *url, bool lock) {
-  pubsub_tcp_topic_receiver_t *receiver = handle;
-  L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s", 
receiver->scope, receiver->topic, url);
-  if (lock) celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-  psa_tcp_requested_connection_entry_t *entry = 
hashMap_get(receiver->requestedConnections.map, url);
-  if (entry == NULL) {
-    entry = calloc(1, sizeof(*entry));
-    entry->parent = receiver;
-    entry->url = strndup(url, 1024*1024);
-    entry->statically = true;
-    hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry);
-    receiver->requestedConnections.allConnected = false;
-  }
-  entry->connected = true;
-  if (lock) celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+    pubsub_tcp_topic_receiver_t *receiver = handle;
+    L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s", 
receiver->scope, receiver->topic, url);
+    if (lock) celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    psa_tcp_requested_connection_entry_t *entry = 
hashMap_get(receiver->requestedConnections.map, url);
+    if (entry == NULL) {
+        entry = calloc(1, sizeof(*entry));
+        entry->parent = receiver;
+        entry->url = strndup(url, 1024 * 1024);
+        entry->statically = true;
+        hashMap_put(receiver->requestedConnections.map, (void *) entry->url, 
entry);
+        receiver->requestedConnections.allConnected = false;
+    }
+    entry->connected = true;
+    if (lock) celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
 static void psa_tcp_disConnectHandler(void *handle, const char *url) {
-  pubsub_tcp_topic_receiver_t *receiver = handle;
-  L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", 
receiver->scope, receiver->topic, url);
-  celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-  psa_tcp_requested_connection_entry_t *entry = 
hashMap_remove(receiver->requestedConnections.map, url);
-  if (entry != NULL) {
-    free(entry->url);
-    free(entry);
-  }
-  celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+    pubsub_tcp_topic_receiver_t *receiver = handle;
+    L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", 
receiver->scope, receiver->topic, url);
+    celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+    psa_tcp_requested_connection_entry_t *entry = 
hashMap_remove(receiver->requestedConnections.map, url);
+    if (entry != NULL) {
+        free(entry->url);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
 
 static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t 
*receiver) {
-  celixThreadMutex_lock(&receiver->subscribers.mutex);
-  if (!receiver->subscribers.allInitialized) {
-    bool allInitialized = true;
-    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-      psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
-      if (!entry->initialized) {
-        int rc = 0;
-        if (entry->svc != NULL && entry->svc->init != NULL) {
-          rc = entry->svc->init(entry->svc->handle);
-        }
-        if (rc == 0) {
-          entry->initialized = true;
-        } else {
-          L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
-          allInitialized = false;
+    celixThreadMutex_lock(&receiver->subscribers.mutex);
+    if (!receiver->subscribers.allInitialized) {
+        bool allInitialized = true;
+        hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_tcp_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+            if (!entry->initialized) {
+                int rc = 0;
+                if (entry->svc != NULL && entry->svc->init != NULL) {
+                    rc = entry->svc->init(entry->svc->handle);
+                }
+                if (rc == 0) {
+                    entry->initialized = true;
+                } else {
+                    L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
+                    allInitialized = false;
+                }
+            }
         }
-      }
+        receiver->subscribers.allInitialized = allInitialized;
     }
-    receiver->subscribers.allInitialized = allInitialized;
-  }
-  celixThreadMutex_unlock(&receiver->subscribers.mutex);
+    celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 607508c..b8a2aa9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -47,65 +47,65 @@
     logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
 
 struct pubsub_tcp_topic_sender {
-  celix_bundle_context_t *ctx;
-  log_helper_t *logHelper;
-  long serializerSvcId;
-  pubsub_serializer_service_t *serializer;
-  uuid_t fwUUID;
-  bool metricsEnabled;
-  pubsub_tcpHandler_pt socketHandler;
-  pubsub_tcpHandler_pt sharedSocketHandler;
-
-  char *scope;
-  char *topic;
-  char scopeAndTopicFilter[5];
-  char *url;
-  bool isStatic;
-
-  struct {
-    celix_thread_mutex_t mutex;
-  } tcp;
-
-  struct {
-    celix_thread_t thread;
-    celix_thread_mutex_t mutex;
-    bool running;
-  } thread;
-
-  struct {
-    long svcId;
-    celix_service_factory_t factory;
-  } publisher;
-
-  struct {
-    celix_thread_mutex_t mutex;
-    hash_map_t *map;  //key = bndId, value = psa_tcp_bounded_service_entry_t
-  } boundedServices;
+    celix_bundle_context_t *ctx;
+    log_helper_t *logHelper;
+    long serializerSvcId;
+    pubsub_serializer_service_t *serializer;
+    uuid_t fwUUID;
+    bool metricsEnabled;
+    pubsub_tcpHandler_pt socketHandler;
+    pubsub_tcpHandler_pt sharedSocketHandler;
+
+    char *scope;
+    char *topic;
+    char scopeAndTopicFilter[5];
+    char *url;
+    bool isStatic;
+
+    struct {
+        celix_thread_mutex_t mutex;
+    } tcp;
+
+    struct {
+        celix_thread_t thread;
+        celix_thread_mutex_t mutex;
+        bool running;
+    } thread;
+
+    struct {
+        long svcId;
+        celix_service_factory_t factory;
+    } publisher;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map;  //key = bndId, value = 
psa_tcp_bounded_service_entry_t
+    } boundedServices;
 };
 
 typedef struct psa_tcp_send_msg_entry {
-  pubsub_tcp_msg_header_t header; //partially filled header (only seqnr and 
time needs to be updated per send)
-  pubsub_msg_serializer_t *msgSer;
-  celix_thread_mutex_t sendLock; //protects send & Seqnr
-  int seqNr;
-  struct {
-    celix_thread_mutex_t mutex; //protects entries in struct
-    long nrOfMessagesSend;
-    long nrOfMessagesSendFailed;
-    long nrOfSerializationErrors;
-    struct timespec lastMessageSend;
-    double averageTimeBetweenMessagesInSeconds;
-    double averageSerializationTimeInSeconds;
-  } metrics;
+    pubsub_tcp_msg_header_t header; //partially filled header (only seqnr and 
time needs to be updated per send)
+    pubsub_msg_serializer_t *msgSer;
+    celix_thread_mutex_t sendLock; //protects send & Seqnr
+    int seqNr;
+    struct {
+        celix_thread_mutex_t mutex; //protects entries in struct
+        long nrOfMessagesSend;
+        long nrOfMessagesSendFailed;
+        long nrOfSerializationErrors;
+        struct timespec lastMessageSend;
+        double averageTimeBetweenMessagesInSeconds;
+        double averageSerializationTimeInSeconds;
+    } metrics;
 } psa_tcp_send_msg_entry_t;
 
 typedef struct psa_tcp_bounded_service_entry {
-  pubsub_tcp_topic_sender_t *parent;
-  pubsub_publisher_t service;
-  long bndId;
-  hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
-  hash_map_t *msgEntries; //key = msg type id, value = psa_tcp_send_msg_entry_t
-  int getCount;
+    pubsub_tcp_topic_sender_t *parent;
+    pubsub_publisher_t service;
+    long bndId;
+    hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
+    hash_map_t *msgEntries; //key = msg type id, value = 
psa_tcp_send_msg_entry_t
+    int getCount;
 } psa_tcp_bounded_service_entry_t;
 
 static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t 
*requestingBundle,
@@ -128,465 +128,464 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         const char *scope,
         const char *topic,
         const celix_properties_t *topicProperties,
-        pubsub_tcp_endPointStore_t* endPointStore,
+        pubsub_tcp_endPointStore_t *endPointStore,
         long serializerSvcId,
         pubsub_serializer_service_t *ser,
         const char *bindIP,
         const char *staticBindUrl,
         unsigned int basePort,
         unsigned int maxPort) {
-  pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender));
-  sender->ctx = ctx;
-  sender->logHelper = logHelper;
-  sender->serializerSvcId = serializerSvcId;
-  sender->serializer = ser;
-  sender->socketHandler = pubsub_tcpHandler_create(sender->logHelper);
-  psa_tcp_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
-  const char *uuid = celix_bundleContext_getProperty(ctx, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
-  if (uuid != NULL) {
-    uuid_parse(uuid, sender->fwUUID);
-  }
-  sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, 
PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
-
-  /* Check if it's a static endpoint */
-  bool isEndPointTypeClient = false;
-  bool isEndPointTypeServer = false;
-  const char *endPointType  = celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
-  if (endPointType != NULL) {
-    if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, 
strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) ==0) {
-      isEndPointTypeClient = true;
+    pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender));
+    sender->ctx = ctx;
+    sender->logHelper = logHelper;
+    sender->serializerSvcId = serializerSvcId;
+    sender->serializer = ser;
+    sender->socketHandler = pubsub_tcpHandler_create(sender->logHelper);
+    psa_tcp_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
+    const char *uuid = celix_bundleContext_getProperty(ctx, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+    if (uuid != NULL) {
+        uuid_parse(uuid, sender->fwUUID);
     }
-    if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, 
strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) ==0) {
-      isEndPointTypeServer = true;
-    }
-  }
-
-  // When endpoint is client, use the connection urls as a key.
-  const char *staticConnectUrls = ((topicProperties != NULL) && 
isEndPointTypeClient) ? celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_CONNECT_URLS, NULL) : NULL;
-
-  /* When it's an endpoint share the socket with the receiver */
-  if (staticConnectUrls != NULL  || (isEndPointTypeServer && staticBindUrl != 
NULL)) {
-    celixThreadMutex_lock(&endPointStore->mutex);
-    sender->sharedSocketHandler = sender->socketHandler;
-    pubsub_tcpHandler_pt entry = hashMap_get(endPointStore->map, 
staticConnectUrls);
-    if (entry == NULL) {
-      entry = sender->socketHandler;
-      hashMap_put(endPointStore->map, (void *)(isEndPointTypeClient ? 
staticConnectUrls : staticBindUrl), entry);
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, 
PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
+
+    /* Check if it's a static endpoint */
+    bool isEndPointTypeClient = false;
+    bool isEndPointTypeServer = false;
+    const char *endPointType = celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
+    if (endPointType != NULL) {
+        if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, 
strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
+            isEndPointTypeClient = true;
+        }
+        if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, 
strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
+            isEndPointTypeServer = true;
+        }
     }
-    celixThreadMutex_unlock(&endPointStore->mutex);
-  }
-
-  //setting up tcp socket for TCP TopicSender
-  {
-    if (staticConnectUrls != NULL) {
-      // Store url for client static endpoint
-      sender->url = strndup(staticConnectUrls, 1024 * 1024);
-      sender->isStatic = true;
+
+    // When endpoint is client, use the connection urls as a key.
+    const char *staticConnectUrls = ((topicProperties != NULL) && 
isEndPointTypeClient) ? celix_properties_get(topicProperties, 
PUBSUB_TCP_STATIC_CONNECT_URLS, NULL) : NULL;
+
+    /* When it's an endpoint share the socket with the receiver */
+    if (staticConnectUrls != NULL || (isEndPointTypeServer && staticBindUrl != 
NULL)) {
+        celixThreadMutex_lock(&endPointStore->mutex);
+        sender->sharedSocketHandler = sender->socketHandler;
+        pubsub_tcpHandler_pt entry = hashMap_get(endPointStore->map, 
staticConnectUrls);
+        if (entry == NULL) {
+            entry = sender->socketHandler;
+            hashMap_put(endPointStore->map, (void *) (isEndPointTypeClient ? 
staticConnectUrls : staticBindUrl), entry);
+        }
+        celixThreadMutex_unlock(&endPointStore->mutex);
     }
-    else if (staticBindUrl != NULL) {
-      int rv = pubsub_tcpHandler_listen(sender->socketHandler, (char *) 
staticBindUrl);
-      if (rv == -1) {
-        L_WARN("Error for tcp_bind using static bind url '%s'. %s", 
staticBindUrl, strerror(errno));
-      } else {
-        sender->url = strndup(staticBindUrl, 1024 * 1024);
-        sender->isStatic = true;
-      }
-    } else {
-      int retry = 0;
-      while (sender->url == NULL && retry < TCP_BIND_MAX_RETRY) {
-        /* Randomized part due to same bundle publishing on different topics */
-        unsigned int port = rand_range(basePort, maxPort);
-        char *url = NULL;
-        asprintf(&url, "tcp://%s:%u", bindIP, port);
-        char *bindUrl = NULL;
-        asprintf(&bindUrl, "tcp://0.0.0.0:%u", port);
-        int rv = pubsub_tcpHandler_listen(sender->socketHandler, bindUrl);
-        if (rv == -1) {
-          L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", 
bindUrl, strerror(errno));
-          free(url);
+
+    //setting up tcp socket for TCP TopicSender
+    {
+        if (staticConnectUrls != NULL) {
+            // Store url for client static endpoint
+            sender->url = strndup(staticConnectUrls, 1024 * 1024);
+            sender->isStatic = true;
+        } else if (staticBindUrl != NULL) {
+            int rv = pubsub_tcpHandler_listen(sender->socketHandler, (char *) 
staticBindUrl);
+            if (rv == -1) {
+                L_WARN("Error for tcp_bind using static bind url '%s'. %s", 
staticBindUrl, strerror(errno));
+            } else {
+                sender->url = strndup(staticBindUrl, 1024 * 1024);
+                sender->isStatic = true;
+            }
         } else {
-          sender->url = url;
+            int retry = 0;
+            while (sender->url == NULL && retry < TCP_BIND_MAX_RETRY) {
+                /* Randomized part due to same bundle publishing on different 
topics */
+                unsigned int port = rand_range(basePort, maxPort);
+                char *url = NULL;
+                asprintf(&url, "tcp://%s:%u", bindIP, port);
+                char *bindUrl = NULL;
+                asprintf(&bindUrl, "tcp://0.0.0.0:%u", port);
+                int rv = pubsub_tcpHandler_listen(sender->socketHandler, 
bindUrl);
+                if (rv == -1) {
+                    L_WARN("Error for tcp_bind using dynamic bind url '%s'. 
%s", bindUrl, strerror(errno));
+                    free(url);
+                } else {
+                    sender->url = url;
+                }
+                retry++;
+                free(bindUrl);
+            }
         }
-        retry++;
-        free(bindUrl);
-      }
     }
-  }
-
-  if (sender->url != NULL) {
-    sender->scope = strndup(scope, 1024 * 1024);
-    sender->topic = strndup(topic, 1024 * 1024);
-
-    celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
-    celixThreadMutex_create(&sender->tcp.mutex, NULL);
-    celixThreadMutex_create(&sender->thread.mutex, NULL);
-    sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
-  }
-
-  if (sender->socketHandler != NULL) {
-    sender->thread.running = true;
-    celixThread_create(&sender->thread.thread, NULL, psa_tcp_sendThread, 
sender);
-    char name[64];
-    snprintf(name, 64, "TCP TS %s/%s", scope, topic);
-    celixThread_setName(&sender->thread.thread, name);
-    psa_tcp_setupTcpContext(sender->logHelper, &sender->thread.thread, 
topicProperties);
-  }
-
-
-  //register publisher services using a service factory
-  if (sender->url != NULL) {
-    sender->publisher.factory.handle = sender;
-    sender->publisher.factory.getService = psa_tcp_getPublisherService;
-    sender->publisher.factory.ungetService = psa_tcp_ungetPublisherService;
-
-    celix_properties_t *props = celix_properties_create();
-    celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
-    celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
-
-    celix_service_registration_options_t opts = 
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
-    opts.factory = &sender->publisher.factory;
-    opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
-    opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
-    opts.properties = props;
-
-    sender->publisher.svcId = 
celix_bundleContext_registerServiceWithOptions(ctx, &opts);
-  }
-
-  if (sender->url == NULL) {
-    free(sender);
-    sender = NULL;
-  }
-
-  return sender;
-}
 
-void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
-  if (sender != NULL) {
-    celixThreadMutex_lock(&sender->thread.mutex);
-    if (!sender->thread.running) {
-      sender->thread.running = false;
-      celixThreadMutex_unlock(&sender->thread.mutex);
-      celixThread_join(sender->thread.thread, NULL);
+    if (sender->url != NULL) {
+        sender->scope = strndup(scope, 1024 * 1024);
+        sender->topic = strndup(topic, 1024 * 1024);
+
+        celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+        celixThreadMutex_create(&sender->tcp.mutex, NULL);
+        celixThreadMutex_create(&sender->thread.mutex, NULL);
+        sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
     }
-    celix_bundleContext_unregisterService(sender->ctx, 
sender->publisher.svcId);
 
-    celixThreadMutex_lock(&sender->boundedServices.mutex);
-    hash_map_iterator_t iter = 
hashMapIterator_construct(sender->boundedServices.map);
-    while (hashMapIterator_hasNext(&iter)) {
-      psa_tcp_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-      if (entry != NULL) {
-        sender->serializer->destroySerializerMap(sender->serializer->handle, 
entry->msgTypes);
-        hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
-        while (hashMapIterator_hasNext(&iter2)) {
-          psa_tcp_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter2);
-          celixThreadMutex_destroy(&msgEntry->metrics.mutex);
-          free(msgEntry);
-        }
-        hashMap_destroy(entry->msgEntries, false, false);
-        free(entry);
-      }
+    if (sender->socketHandler != NULL) {
+        sender->thread.running = true;
+        celixThread_create(&sender->thread.thread, NULL, psa_tcp_sendThread, 
sender);
+        char name[64];
+        snprintf(name, 64, "TCP TS %s/%s", scope, topic);
+        celixThread_setName(&sender->thread.thread, name);
+        psa_tcp_setupTcpContext(sender->logHelper, &sender->thread.thread, 
topicProperties);
     }
-    hashMap_destroy(sender->boundedServices.map, false, false);
-    celixThreadMutex_unlock(&sender->boundedServices.mutex);
-    celixThreadMutex_destroy(&sender->boundedServices.mutex);
-    celixThreadMutex_destroy(&sender->tcp.mutex);
 
-    if ((sender->socketHandler)&&(sender->sharedSocketHandler == NULL)) {
-      pubsub_tcpHandler_destroy(sender->socketHandler);
-      sender->socketHandler = NULL;
+
+    //register publisher services using a service factory
+    if (sender->url != NULL) {
+        sender->publisher.factory.handle = sender;
+        sender->publisher.factory.getService = psa_tcp_getPublisherService;
+        sender->publisher.factory.ungetService = psa_tcp_ungetPublisherService;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+        celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+
+        celix_service_registration_options_t opts = 
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+        opts.factory = &sender->publisher.factory;
+        opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+        opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
+        opts.properties = props;
+
+        sender->publisher.svcId = 
celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+    }
+
+    if (sender->url == NULL) {
+        free(sender);
+        sender = NULL;
     }
 
-    free(sender->scope);
-    free(sender->topic);
-    free(sender->url);
-    free(sender);
-  }
+    return sender;
+}
+
+void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
+    if (sender != NULL) {
+        celixThreadMutex_lock(&sender->thread.mutex);
+        if (!sender->thread.running) {
+            sender->thread.running = false;
+            celixThreadMutex_unlock(&sender->thread.mutex);
+            celixThread_join(sender->thread.thread, NULL);
+        }
+        celix_bundleContext_unregisterService(sender->ctx, 
sender->publisher.svcId);
+
+        celixThreadMutex_lock(&sender->boundedServices.mutex);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(sender->boundedServices.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_tcp_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+            if (entry != NULL) {
+                
sender->serializer->destroySerializerMap(sender->serializer->handle, 
entry->msgTypes);
+                hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    psa_tcp_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter2);
+                    celixThreadMutex_destroy(&msgEntry->metrics.mutex);
+                    free(msgEntry);
+                }
+                hashMap_destroy(entry->msgEntries, false, false);
+                free(entry);
+            }
+        }
+        hashMap_destroy(sender->boundedServices.map, false, false);
+        celixThreadMutex_unlock(&sender->boundedServices.mutex);
+        celixThreadMutex_destroy(&sender->boundedServices.mutex);
+        celixThreadMutex_destroy(&sender->tcp.mutex);
+
+        if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) {
+            pubsub_tcpHandler_destroy(sender->socketHandler);
+            sender->socketHandler = NULL;
+        }
+
+        free(sender->scope);
+        free(sender->topic);
+        free(sender->url);
+        free(sender);
+    }
 }
 
 long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender) {
-  return sender->serializerSvcId;
+    return sender->serializerSvcId;
 }
 
 const char *pubsub_tcpTopicSender_scope(pubsub_tcp_topic_sender_t *sender) {
-  return sender->scope;
+    return sender->scope;
 }
 
 const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) {
-  return sender->topic;
+    return sender->topic;
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
-  return pubsub_tcpHandler_url(sender->socketHandler);
+    return pubsub_tcpHandler_url(sender->socketHandler);
 }
 
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
-  return sender->isStatic;
+    return sender->isStatic;
 }
 
 void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const 
celix_properties_t *endpoint) {
-  //TODO subscriber count -> topic info
+    //TODO subscriber count -> topic info
 }
 
 void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, 
const celix_properties_t *endpoint) {
-  //TODO
+    //TODO
 }
 
 static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t 
*requestingBundle,
                                          const celix_properties_t 
*svcProperties __attribute__((unused))) {
-  pubsub_tcp_topic_sender_t *sender = handle;
-  long bndId = celix_bundle_getId(requestingBundle);
-
-  celixThreadMutex_lock(&sender->boundedServices.mutex);
-  psa_tcp_bounded_service_entry_t *entry = 
hashMap_get(sender->boundedServices.map, (void *) bndId);
-  if (entry != NULL) {
-    entry->getCount += 1;
-  } else {
-    entry = calloc(1, sizeof(*entry));
-    entry->getCount = 1;
-    entry->parent = sender;
-    entry->bndId = bndId;
-    entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
-
-    int rc = 
sender->serializer->createSerializerMap(sender->serializer->handle, 
(celix_bundle_t *) requestingBundle,
-                                                     &entry->msgTypes);
-    if (rc == 0) {
-      hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes);
-      while (hashMapIterator_hasNext(&iter)) {
-        hash_map_entry_t *hashMapEntry = hashMapIterator_nextEntry(&iter);
-        void *key = hashMapEntry_getKey(hashMapEntry);
-        psa_tcp_send_msg_entry_t *sendEntry = calloc(1, sizeof(*sendEntry));
-        sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
-        sendEntry->header.type = (int32_t) sendEntry->msgSer->msgId;
-        int major;
-        int minor;
-        version_getMajor(sendEntry->msgSer->msgVersion, &major);
-        version_getMinor(sendEntry->msgSer->msgVersion, &minor);
-        sendEntry->header.major = (int8_t) major;
-        sendEntry->header.minor = (int8_t) minor;
-        uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
-        celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
-        hashMap_put(entry->msgEntries, key, sendEntry);
-      }
-      entry->service.handle = entry;
-      entry->service.localMsgTypeIdForMsgType = 
psa_tcp_localMsgTypeIdForMsgType;
-      entry->service.send = psa_tcp_topicPublicationSend;
-      hashMap_put(sender->boundedServices.map, (void *) bndId, entry);
+    pubsub_tcp_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_tcp_bounded_service_entry_t *entry = 
hashMap_get(sender->boundedServices.map, (void *) bndId);
+    if (entry != NULL) {
+        entry->getCount += 1;
     } else {
-      L_ERROR("Error creating serializer map for TCP TopicSender %s/%s", 
sender->scope, sender->topic);
+        entry = calloc(1, sizeof(*entry));
+        entry->getCount = 1;
+        entry->parent = sender;
+        entry->bndId = bndId;
+        entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
+
+        int rc = 
sender->serializer->createSerializerMap(sender->serializer->handle, 
(celix_bundle_t *) requestingBundle,
+                                                         &entry->msgTypes);
+        if (rc == 0) {
+            hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgTypes);
+            while (hashMapIterator_hasNext(&iter)) {
+                hash_map_entry_t *hashMapEntry = 
hashMapIterator_nextEntry(&iter);
+                void *key = hashMapEntry_getKey(hashMapEntry);
+                psa_tcp_send_msg_entry_t *sendEntry = calloc(1, 
sizeof(*sendEntry));
+                sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
+                sendEntry->header.type = (int32_t) sendEntry->msgSer->msgId;
+                int major;
+                int minor;
+                version_getMajor(sendEntry->msgSer->msgVersion, &major);
+                version_getMinor(sendEntry->msgSer->msgVersion, &minor);
+                sendEntry->header.major = (int8_t) major;
+                sendEntry->header.minor = (int8_t) minor;
+                uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
+                celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
+                hashMap_put(entry->msgEntries, key, sendEntry);
+            }
+            entry->service.handle = entry;
+            entry->service.localMsgTypeIdForMsgType = 
psa_tcp_localMsgTypeIdForMsgType;
+            entry->service.send = psa_tcp_topicPublicationSend;
+            hashMap_put(sender->boundedServices.map, (void *) bndId, entry);
+        } else {
+            L_ERROR("Error creating serializer map for TCP TopicSender %s/%s", 
sender->scope, sender->topic);
+        }
     }
-  }
-  celixThreadMutex_unlock(&sender->boundedServices.mutex);
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
 
-  return &entry->service;
+    return &entry->service;
 }
 
 static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t 
*requestingBundle,
                                           const celix_properties_t 
*svcProperties __attribute__((unused))) {
-  pubsub_tcp_topic_sender_t *sender = handle;
-  long bndId = celix_bundle_getId(requestingBundle);
-
-  celixThreadMutex_lock(&sender->boundedServices.mutex);
-  psa_tcp_bounded_service_entry_t *entry = 
hashMap_get(sender->boundedServices.map, (void *) bndId);
-  if (entry != NULL) {
-    entry->getCount -= 1;
-  }
-  if (entry != NULL && entry->getCount == 0) {
-    //free entry
-    hashMap_remove(sender->boundedServices.map, (void *) bndId);
-    int rc = 
sender->serializer->destroySerializerMap(sender->serializer->handle, 
entry->msgTypes);
-    if (rc != 0) {
-      L_ERROR("Error destroying publisher service, serializer not available / 
cannot get msg serializer map\n");
+    pubsub_tcp_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_tcp_bounded_service_entry_t *entry = 
hashMap_get(sender->boundedServices.map, (void *) bndId);
+    if (entry != NULL) {
+        entry->getCount -= 1;
     }
+    if (entry != NULL && entry->getCount == 0) {
+        //free entry
+        hashMap_remove(sender->boundedServices.map, (void *) bndId);
+        int rc = 
sender->serializer->destroySerializerMap(sender->serializer->handle, 
entry->msgTypes);
+        if (rc != 0) {
+            L_ERROR("Error destroying publisher service, serializer not 
available / cannot get msg serializer map\n");
+        }
 
-    hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
-    while (hashMapIterator_hasNext(&iter)) {
-      psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
-      celixThreadMutex_destroy(&msgEntry->metrics.mutex);
-      free(msgEntry);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgEntries);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_tcp_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter);
+            celixThreadMutex_destroy(&msgEntry->metrics.mutex);
+            free(msgEntry);
+        }
+        hashMap_destroy(entry->msgEntries, false, false);
+        free(entry);
     }
-    hashMap_destroy(entry->msgEntries, false, false);
-    free(entry);
-  }
-  celixThreadMutex_unlock(&sender->boundedServices.mutex);
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
 }
 
 static void *psa_tcp_sendThread(void *data) {
-  pubsub_tcp_topic_sender_t *sender = data;
-
-  celixThreadMutex_lock(&sender->thread.mutex);
-  bool running = sender->thread.running;
-  celixThreadMutex_unlock(&sender->thread.mutex);
-
-  while (running) {
-    pubsub_tcpHandler_handler(sender->socketHandler);
+    pubsub_tcp_topic_sender_t *sender = data;
 
     celixThreadMutex_lock(&sender->thread.mutex);
-    running = sender->thread.running;
+    bool running = sender->thread.running;
     celixThreadMutex_unlock(&sender->thread.mutex);
 
-  } // while
-  return NULL;
+    while (running) {
+        pubsub_tcpHandler_handler(sender->socketHandler);
+
+        celixThreadMutex_lock(&sender->thread.mutex);
+        running = sender->thread.running;
+        celixThreadMutex_unlock(&sender->thread.mutex);
+
+    } // while
+    return NULL;
 }
 
 pubsub_admin_sender_metrics_t 
*pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender) {
-  pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
-  snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope);
-  snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
-  celixThreadMutex_lock(&sender->boundedServices.mutex);
-  size_t count = 0;
-  hash_map_iterator_t iter = 
hashMapIterator_construct(sender->boundedServices.map);
-  while (hashMapIterator_hasNext(&iter)) {
-    psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
-    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
-    while (hashMapIterator_hasNext(&iter2)) {
-      hashMapIterator_nextValue(&iter2);
-      count += 1;
+    pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
sender->scope);
+    snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
sender->topic);
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    size_t count = 0;
+    hash_map_iterator_t iter = 
hashMapIterator_construct(sender->boundedServices.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
+        while (hashMapIterator_hasNext(&iter2)) {
+            hashMapIterator_nextValue(&iter2);
+            count += 1;
+        }
     }
-  }
-
-  result->msgMetrics = calloc(count, sizeof(*result));
-
-  iter = hashMapIterator_construct(sender->boundedServices.map);
-  int i = 0;
-  while (hashMapIterator_hasNext(&iter)) {
-    psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
-    hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
-    while (hashMapIterator_hasNext(&iter2)) {
-      psa_tcp_send_msg_entry_t *mEntry = hashMapIterator_nextValue(&iter2);
-      celixThreadMutex_lock(&mEntry->metrics.mutex);
-      result->msgMetrics[i].nrOfMessagesSend = 
mEntry->metrics.nrOfMessagesSend;
-      result->msgMetrics[i].nrOfMessagesSendFailed = 
mEntry->metrics.nrOfMessagesSendFailed;
-      result->msgMetrics[i].nrOfSerializationErrors = 
mEntry->metrics.nrOfSerializationErrors;
-      result->msgMetrics[i].averageSerializationTimeInSeconds = 
mEntry->metrics.averageSerializationTimeInSeconds;
-      result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = 
mEntry->metrics.averageTimeBetweenMessagesInSeconds;
-      result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
-      result->msgMetrics[i].bndId = entry->bndId;
-      result->msgMetrics[i].typeId = mEntry->header.type;
-      snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, 
"%s", mEntry->msgSer->msgName);
-      i += 1;
-      celixThreadMutex_unlock(&mEntry->metrics.mutex);
+
+    result->msgMetrics = calloc(count, sizeof(*result));
+
+    iter = hashMapIterator_construct(sender->boundedServices.map);
+    int i = 0;
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
+        while (hashMapIterator_hasNext(&iter2)) {
+            psa_tcp_send_msg_entry_t *mEntry = 
hashMapIterator_nextValue(&iter2);
+            celixThreadMutex_lock(&mEntry->metrics.mutex);
+            result->msgMetrics[i].nrOfMessagesSend = 
mEntry->metrics.nrOfMessagesSend;
+            result->msgMetrics[i].nrOfMessagesSendFailed = 
mEntry->metrics.nrOfMessagesSendFailed;
+            result->msgMetrics[i].nrOfSerializationErrors = 
mEntry->metrics.nrOfSerializationErrors;
+            result->msgMetrics[i].averageSerializationTimeInSeconds = 
mEntry->metrics.averageSerializationTimeInSeconds;
+            result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = 
mEntry->metrics.averageTimeBetweenMessagesInSeconds;
+            result->msgMetrics[i].lastMessageSend = 
mEntry->metrics.lastMessageSend;
+            result->msgMetrics[i].bndId = entry->bndId;
+            result->msgMetrics[i].typeId = mEntry->header.type;
+            snprintf(result->msgMetrics[i].typeFqn, 
PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName);
+            i += 1;
+            celixThreadMutex_unlock(&mEntry->metrics.mutex);
+        }
     }
-  }
 
-  celixThreadMutex_unlock(&sender->boundedServices.mutex);
-  result->nrOfmsgMetrics = (int) count;
-  return result;
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+    result->nrOfmsgMetrics = (int) count;
+    return result;
 }
 
 static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, 
const void *inMsg) {
-  int status = CELIX_SUCCESS;
-  psa_tcp_bounded_service_entry_t *bound = handle;
-  pubsub_tcp_topic_sender_t *sender = bound->parent;
-  bool monitor = sender->metricsEnabled;
-
-  psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) 
(uintptr_t) (msgTypeId));
-
-  //metrics updates
-  struct timespec sendTime;
-  struct timespec serializationStart;
-  struct timespec serializationEnd;
-  //int unknownMessageCountUpdate = 0;
-  int sendErrorUpdate = 0;
-  int serializationErrorUpdate = 0;
-  int sendCountUpdate = 0;
-
-  if (entry != NULL) {
-    delay_first_send_for_late_joiners(sender);
-    if (monitor) {
-      clock_gettime(CLOCK_REALTIME, &serializationStart);
-    }
+    int status = CELIX_SUCCESS;
+    psa_tcp_bounded_service_entry_t *bound = handle;
+    pubsub_tcp_topic_sender_t *sender = bound->parent;
+    bool monitor = sender->metricsEnabled;
+
+    psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) 
(uintptr_t)(msgTypeId));
+
+    //metrics updates
+    struct timespec sendTime;
+    struct timespec serializationStart;
+    struct timespec serializationEnd;
+    //int unknownMessageCountUpdate = 0;
+    int sendErrorUpdate = 0;
+    int serializationErrorUpdate = 0;
+    int sendCountUpdate = 0;
+
+    if (entry != NULL) {
+        delay_first_send_for_late_joiners(sender);
+        if (monitor) {
+            clock_gettime(CLOCK_REALTIME, &serializationStart);
+        }
 
-    void *serializedOutput = NULL;
-    size_t serializedOutputLen = 0;
-    status = entry->msgSer->serialize(entry->msgSer->handle, inMsg, 
&serializedOutput, &serializedOutputLen);
+        void *serializedOutput = NULL;
+        size_t serializedOutputLen = 0;
+        status = entry->msgSer->serialize(entry->msgSer->handle, inMsg, 
&serializedOutput, &serializedOutputLen);
 
-    if (monitor) {
-      clock_gettime(CLOCK_REALTIME, &serializationEnd);
-    }
+        if (monitor) {
+            clock_gettime(CLOCK_REALTIME, &serializationEnd);
+        }
 
-    if (status == CELIX_SUCCESS /*ser ok*/) {
-      //TODO refactor, is the mutex really needed?
-      celixThreadMutex_lock(&entry->sendLock);
-      pubsub_tcp_msg_header_t msg_hdr = entry->header;
-      msg_hdr.seqNr = -1;
-      msg_hdr.sendtimeSeconds = 0;
-      msg_hdr.sendTimeNanoseconds = 0;
-      if (monitor) {
-        clock_gettime(CLOCK_REALTIME, &sendTime);
-        msg_hdr.sendtimeSeconds = (int64_t) sendTime.tv_sec;
-        msg_hdr.sendTimeNanoseconds = (int64_t) sendTime.tv_nsec;
-        msg_hdr.seqNr = entry->seqNr++;
-      }
-
-      errno = 0;
-      bool sendOk = true;
-      {
-        int rc = pubsub_tcpHandler_write(sender->socketHandler, &msg_hdr, 
serializedOutput, serializedOutputLen, 0);
-        if (rc < 0) {
-          status = -1;
-          sendOk = false;
+        if (status == CELIX_SUCCESS /*ser ok*/) {
+            //TODO refactor, is the mutex really needed?
+            celixThreadMutex_lock(&entry->sendLock);
+            pubsub_tcp_msg_header_t msg_hdr = entry->header;
+            msg_hdr.seqNr = -1;
+            msg_hdr.sendtimeSeconds = 0;
+            msg_hdr.sendTimeNanoseconds = 0;
+            if (monitor) {
+                clock_gettime(CLOCK_REALTIME, &sendTime);
+                msg_hdr.sendtimeSeconds = (int64_t) sendTime.tv_sec;
+                msg_hdr.sendTimeNanoseconds = (int64_t) sendTime.tv_nsec;
+                msg_hdr.seqNr = entry->seqNr++;
+            }
+
+            errno = 0;
+            bool sendOk = true;
+            {
+                int rc = pubsub_tcpHandler_write(sender->socketHandler, 
&msg_hdr, serializedOutput, serializedOutputLen, 0);
+                if (rc < 0) {
+                    status = -1;
+                    sendOk = false;
+                }
+                free(serializedOutput);
+            }
+
+            celixThreadMutex_unlock(&entry->sendLock);
+            if (sendOk) {
+                sendCountUpdate = 1;
+            } else {
+                sendErrorUpdate = 1;
+                L_WARN("[PSA_TCP_TS] Error sending tcp. %s", strerror(errno));
+            }
+        } else {
+            serializationErrorUpdate = 1;
+            L_WARN("[PSA_TCP_TS] Error serialize message of type %s for 
scope/topic %s/%s", entry->msgSer->msgName,
+                   sender->scope, sender->topic);
         }
-        free(serializedOutput);
-      }
-
-      celixThreadMutex_unlock(&entry->sendLock);
-      if (sendOk) {
-        sendCountUpdate = 1;
-      } else {
-        sendErrorUpdate = 1;
-        L_WARN("[PSA_TCP_TS] Error sending tcp. %s", strerror(errno));
-      }
     } else {
-      serializationErrorUpdate = 1;
-      L_WARN("[PSA_TCP_TS] Error serialize message of type %s for scope/topic 
%s/%s", entry->msgSer->msgName,
-             sender->scope, sender->topic);
-    }
-  } else {
-    //unknownMessageCountUpdate = 1;
-    status = CELIX_SERVICE_EXCEPTION;
-    L_WARN("[PSA_TCP_TS] Error cannot serialize message with msg type id %i 
for scope/topic %s/%s", msgTypeId,
-           sender->scope, sender->topic);
-  }
-
-
-  if (monitor && entry != NULL) {
-    celixThreadMutex_lock(&entry->metrics.mutex);
-
-    long n = entry->metrics.nrOfMessagesSend + 
entry->metrics.nrOfMessagesSendFailed;
-    double diff = celix_difftime(&serializationStart, &serializationEnd);
-    double average = (entry->metrics.averageSerializationTimeInSeconds * n + 
diff) / (n + 1);
-    entry->metrics.averageSerializationTimeInSeconds = average;
-
-    if (entry->metrics.nrOfMessagesSend > 2) {
-      diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
-      n = entry->metrics.nrOfMessagesSend;
-      average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n + 
diff) / (n + 1);
-      entry->metrics.averageTimeBetweenMessagesInSeconds = average;
+        //unknownMessageCountUpdate = 1;
+        status = CELIX_SERVICE_EXCEPTION;
+        L_WARN("[PSA_TCP_TS] Error cannot serialize message with msg type id 
%i for scope/topic %s/%s", msgTypeId,
+               sender->scope, sender->topic);
     }
 
-    entry->metrics.lastMessageSend = sendTime;
-    entry->metrics.nrOfMessagesSend += sendCountUpdate;
-    entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
-    entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
 
-    celixThreadMutex_unlock(&entry->metrics.mutex);
-  }
+    if (monitor && entry != NULL) {
+        celixThreadMutex_lock(&entry->metrics.mutex);
+
+        long n = entry->metrics.nrOfMessagesSend + 
entry->metrics.nrOfMessagesSendFailed;
+        double diff = celix_difftime(&serializationStart, &serializationEnd);
+        double average = (entry->metrics.averageSerializationTimeInSeconds * n 
+ diff) / (n + 1);
+        entry->metrics.averageSerializationTimeInSeconds = average;
+
+        if (entry->metrics.nrOfMessagesSend > 2) {
+            diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
+            n = entry->metrics.nrOfMessagesSend;
+            average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n 
+ diff) / (n + 1);
+            entry->metrics.averageTimeBetweenMessagesInSeconds = average;
+        }
 
-  return status;
+        entry->metrics.lastMessageSend = sendTime;
+        entry->metrics.nrOfMessagesSend += sendCountUpdate;
+        entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
+        entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
+
+        celixThreadMutex_unlock(&entry->metrics.mutex);
+    }
+
+    return status;
 }
 
 static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t 
*sender) {
 
-  static bool firstSend = true;
+    static bool firstSend = true;
 
-  if (firstSend) {
-    L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
-    sleep(FIRST_SEND_DELAY_IN_SECONDS);
-    firstSend = false;
-  }
+    if (firstSend) {
+        L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
+        sleep(FIRST_SEND_DELAY_IN_SECONDS);
+        firstSend = false;
+    }
 }
 
 static unsigned int rand_range(unsigned int min, unsigned int max) {
-  double scaled = ((double) random()) / ((double) RAND_MAX);
-  return (unsigned int) ((max - min + 1) * scaled + min);
+    double scaled = ((double) random()) / ((double) RAND_MAX);
+    return (unsigned int) ((max - min + 1) * scaled + min);
 }
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c 
b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
index a077efd..770ad23 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c
@@ -111,9 +111,9 @@ pubsub_udpmc_admin_t* 
pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_
                 if (found_if_ip != NULL)
                     if_ip = strndup(found_if_ip, 16);
                 else
-                    L_WARN("Could not find interface for requested subnet %s", 
mcIpProp);
+                    L_WARN("[PSA_UDPMC] Could not find interface for requested 
subnet %s", mcIpProp);
             } else {
-                L_ERROR("Error while searching for available network interface 
for subnet %s", mcIpProp);
+                L_ERROR("[PSA_UDPMC] Error while searching for available 
network interface for subnet %s", mcIpProp);
             }
             free(found_if_ip);
         } else {
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c 
b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
index bb0890f..8fa14ad 100644
--- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c
@@ -48,8 +48,8 @@ struct pubsub_zmq_admin {
     log_helper_t *log;
     const char *fwUUID;
 
-    char* ipAddress;
-    zactor_t* zmq_auth;
+    char *ipAddress;
+    zactor_t *zmq_auth;
 
     unsigned int basePort;
     unsigned int maxPort;
@@ -111,9 +111,9 @@ pubsub_zmq_admin_t* 
pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_help
                 if (found_if_ip != NULL)
                     ip = strndup(found_if_ip, 16);
                 else
-                    L_WARN("Could not find interface for requested subnet %s", 
confIp);
+                    L_WARN("[PSA_ZMQ] Could not find interface for requested 
subnet %s", confIp);
             } else {
-                L_ERROR("Error while searching for available network interface 
for subnet %s", confIp);
+                L_ERROR("[PSA_ZMQ] Error while searching for available network 
interface for subnet %s", confIp);
             }
             free(found_if_ip);
         } else {

Reply via email to