This is an automated email from the ASF dual-hosted git repository. rlenferink pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/celix.git
commit 4b59c692bc60bee7b9e5cbc571ee678b6fd37530 Author: Roy Lenferink <[email protected]> AuthorDate: Sun Jul 28 11:25:14 2019 +0200 Added support for subnet mask to tcp and nanomsg admins --- .../src/pubsub_nanomsg_admin.cc | 20 +- .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 23 +- .../pubsub_admin_tcp/src/pubsub_tcp_common.h | 2 +- .../src/pubsub_tcp_topic_receiver.c | 1096 ++++++++++---------- .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 853 ++++++++------- .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c | 4 +- .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 8 +- 7 files changed, 1015 insertions(+), 991 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc index 27b264d..5245d03 100644 --- a/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc +++ b/bundles/pubsub/pubsub_admin_nanomsg/src/pubsub_nanomsg_admin.cc @@ -45,8 +45,24 @@ pubsub_nanomsg_admin::pubsub_nanomsg_admin(celix_bundle_context_t *_ctx): char *ip = nullptr; const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_NANOMSG_PSA_IP_KEY , nullptr); - if (confIp != nullptr) { - ip = strndup(confIp, 1024); + if (confIp != NULL) { + if (strchr(confIp, '/') != NULL) { + // IP with subnet prefix specified + char *found_if_ip = calloc(16, sizeof(char)); + celix_status_t ip_status = ipUtils_findIpBySubnet(confIp, &found_if_ip); + if (ip_status == CELIX_SUCCESS) { + if (found_if_ip != NULL) + ip = strndup(found_if_ip, 16); + else + L_WARN("[PSA_NANOMSG] Could not find interface for requested subnet %s", confIp); + } else { + L_ERROR("[PSA_NANOMSG] Error while searching for available network interface for subnet %s", confIp); + } + free(found_if_ip); + } else { + // IP address specified + ip = strndup(confIp, 1024); + } } if (ip == nullptr) { diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c index 3ac9555..473d051 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c @@ -25,6 +25,7 @@ #include <ifaddrs.h> #include <pubsub_endpoint.h> #include <pubsub_serializer.h> +#include <ip_utils.h> #include "pubsub_utils.h" #include "pubsub_tcp_admin.h" @@ -102,11 +103,23 @@ pubsub_tcp_admin_t* pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, log_help char *ip = NULL; const char *confIp = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY , NULL); if (confIp != NULL) { - ip = strndup(confIp, 1024); - } - - if (ip == NULL) { - //TODO try to get ip from subnet (CIDR) + if (strchr(confIp, '/') != NULL) { + // IP with subnet prefix specified + char *found_if_ip = calloc(16, sizeof(char)); + celix_status_t ip_status = ipUtils_findIpBySubnet(confIp, &found_if_ip); + if (ip_status == CELIX_SUCCESS) { + if (found_if_ip != NULL) + ip = strndup(found_if_ip, 16); + else + L_WARN("[PSA_TCP] Could not find interface for requested subnet %s", confIp); + } else { + L_ERROR("[PSA_TCP] Error while searching for available network interface for subnet %s", confIp); + } + free(found_if_ip); + } else { + // IP address specified + ip = strndup(confIp, 1024); + } } if (ip == NULL) { diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h index 09b5842..b6f4d5a 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h @@ -40,7 +40,7 @@ typedef struct pubsub_tcp_endPointStore{ * 1) A subscription filter. * This is a 5 char string of the first two chars of scope and topic combined and terminated with a '\0'. * - * 2) The pubsub_tcp_msg_header_t is send containg the type id and major/minor version + * 2) The pubsub_tcp_msg_header_t is send containing the type id and major/minor version * * 3) The actual payload */ diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c index 5533331..6456f27 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c @@ -52,68 +52,68 @@ logHelper_log(receiver->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) struct pubsub_tcp_topic_receiver { - celix_bundle_context_t *ctx; - log_helper_t *logHelper; - long serializerSvcId; - pubsub_serializer_service_t *serializer; - char *scope; - char *topic; - char scopeAndTopicFilter[5]; - bool metricsEnabled; - pubsub_tcpHandler_pt socketHandler; - pubsub_tcpHandler_pt sharedSocketHandler; - - struct { - celix_thread_t thread; - celix_thread_mutex_t mutex; - bool running; - } thread; - - struct { - celix_thread_mutex_t mutex; - hash_map_t *map; //key = tcp url, value = psa_tcp_requested_connection_entry_t* - bool allConnected; //true if all requestedConnectection are connected - } requestedConnections; - - long subscriberTrackerId; - struct { - celix_thread_mutex_t mutex; - hash_map_t *map; //key = bnd id, value = psa_tcp_subscriber_entry_t - bool allInitialized; - } subscribers; + celix_bundle_context_t *ctx; + log_helper_t *logHelper; + long serializerSvcId; + pubsub_serializer_service_t *serializer; + char *scope; + char *topic; + char scopeAndTopicFilter[5]; + bool metricsEnabled; + pubsub_tcpHandler_pt socketHandler; + pubsub_tcpHandler_pt sharedSocketHandler; + + struct { + celix_thread_t thread; + celix_thread_mutex_t mutex; + bool running; + } thread; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = tcp url, value = psa_tcp_requested_connection_entry_t* + bool allConnected; //true if all requestedConnection are connected + } requestedConnections; + + long subscriberTrackerId; + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = bnd id, value = psa_tcp_subscriber_entry_t + bool allInitialized; + } subscribers; }; typedef struct psa_tcp_requested_connection_entry { - pubsub_tcp_topic_receiver_t *parent; - char *key; - char *url; - int fd; - bool connected; - bool statically; //true if the connection is statically configured through the topic properties. + pubsub_tcp_topic_receiver_t *parent; + char *key; + char *url; + int fd; + bool connected; + bool statically; //true if the connection is statically configured through the topic properties. } psa_tcp_requested_connection_entry_t; typedef struct psa_tcp_subscriber_metrics_entry_t { - unsigned int msgTypeId; - uuid_t origin; - - unsigned long nrOfMessagesReceived; - unsigned long nrOfSerializationErrors; - struct timespec lastMessageReceived; - double averageTimeBetweenMessagesInSeconds; - double averageSerializationTimeInSeconds; - double averageDelayInSeconds; - double maxDelayInSeconds; - double minDelayInSeconds; - unsigned int lastSeqNr; - unsigned long nrOfMissingSeqNumbers; + unsigned int msgTypeId; + uuid_t origin; + + unsigned long nrOfMessagesReceived; + unsigned long nrOfSerializationErrors; + struct timespec lastMessageReceived; + double averageTimeBetweenMessagesInSeconds; + double averageSerializationTimeInSeconds; + double averageDelayInSeconds; + double maxDelayInSeconds; + double minDelayInSeconds; + unsigned int lastSeqNr; + unsigned long nrOfMissingSeqNumbers; } psa_tcp_subscriber_metrics_entry_t; typedef struct psa_tcp_subscriber_entry { - int usageCount; - hash_map_t *msgTypes; //map from serializer svc - hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t* - pubsub_subscriber_t *svc; - bool initialized; //true if the init function is called through the receive thread + int usageCount; + hash_map_t *msgTypes; //map from serializer svc + hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin uuid, value = psa_tcp_subscriber_metrics_entry_t* + pubsub_subscriber_t *svc; + bool initialized; //true if the init function is called through the receive thread } psa_tcp_subscriber_entry_t; @@ -129,619 +129,615 @@ static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver); -static void processMsg(void* handle, const pubsub_tcp_msg_header_t *hdr, const unsigned char *payload, size_t payloadSize, struct timespec *receiveTime); +static void processMsg(void *handle, const pubsub_tcp_msg_header_t *hdr, const unsigned char *payload, size_t payloadSize, struct timespec *receiveTime); static void psa_tcp_connectHandler(void *handle, const char *url, bool lock); static void psa_tcp_disConnectHandler(void *handle, const char *url); - pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx, log_helper_t *logHelper, const char *scope, const char *topic, const celix_properties_t *topicProperties, - pubsub_tcp_endPointStore_t* endPointStore, + pubsub_tcp_endPointStore_t *endPointStore, long serializerSvcId, pubsub_serializer_service_t *serializer) { - pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver)); - receiver->ctx = ctx; - receiver->logHelper = logHelper; - receiver->serializerSvcId = serializerSvcId; - receiver->serializer = serializer; - receiver->scope = strndup(scope, 1024 * 1024); - receiver->topic = strndup(topic, 1024 * 1024); - - long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS, PSA_TCP_DEFAULT_MAX_RECV_SESSIONS); - long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE, PSA_TCP_DEFAULT_RECV_BUFFER_SIZE); - long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT); - const char *staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL); - - /* Check if it's a static endpoint */ - bool isEndPointTypeClient = false; - bool isEndPointTypeServer = false; - const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL); - if (endPointType != NULL) { - if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) ==0) { - isEndPointTypeClient = true; + pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver)); + receiver->ctx = ctx; + receiver->logHelper = logHelper; + receiver->serializerSvcId = serializerSvcId; + receiver->serializer = serializer; + receiver->scope = strndup(scope, 1024 * 1024); + receiver->topic = strndup(topic, 1024 * 1024); + + long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS, PSA_TCP_DEFAULT_MAX_RECV_SESSIONS); + long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE, PSA_TCP_DEFAULT_RECV_BUFFER_SIZE); + long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT); + const char *staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL); + + /* Check if it's a static endpoint */ + bool isEndPointTypeClient = false; + bool isEndPointTypeServer = false; + const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL); + if (endPointType != NULL) { + if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) { + isEndPointTypeClient = true; + } + if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) { + isEndPointTypeServer = true; + } } - if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) ==0) { - isEndPointTypeServer = true; + // When endpoint is server, use the bind url as a key. + const char *staticBindUrl = ((topicProperties != NULL) && isEndPointTypeServer) ? celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_BIND_URL, NULL) : NULL; + /* When it's an endpoint share the socket with the receiver */ + if (staticBindUrl != NULL || (isEndPointTypeClient && staticConnectUrls != NULL)) { + celixThreadMutex_lock(&receiver->thread.mutex); + pubsub_tcpHandler_pt entry = hashMap_get(endPointStore->map, (isEndPointTypeServer) ? staticBindUrl : staticConnectUrls); + if (entry != NULL) { + receiver->socketHandler = entry; + receiver->sharedSocketHandler = entry; + } else { + L_ERROR("[PSA_TCP] Cannot find static Endpoint URL for %s/%s", scope, topic); + } + celixThreadMutex_unlock(&receiver->thread.mutex); } - } - // When endpoint is server, use the bind url as a key. - const char *staticBindUrl = ((topicProperties != NULL) && isEndPointTypeServer) ? celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_BIND_URL, NULL) : NULL; - /* When it's an endpoint share the socket with the receiver */ - if (staticBindUrl != NULL || (isEndPointTypeClient && staticConnectUrls != NULL)) { - celixThreadMutex_lock(&receiver->thread.mutex); - pubsub_tcpHandler_pt entry = hashMap_get(endPointStore->map, (isEndPointTypeServer) ? staticBindUrl : staticConnectUrls); - if(entry != NULL) { - receiver->socketHandler = entry; - receiver->sharedSocketHandler = entry; - } else { - L_ERROR("[PSA_TCP] Cannot find static Endpoint URL for %s/%s", scope, topic); + + if (receiver->socketHandler == NULL) { + receiver->socketHandler = pubsub_tcpHandler_create(receiver->logHelper); } - celixThreadMutex_unlock(&receiver->thread.mutex); - } - - if (receiver->socketHandler == NULL) { - receiver->socketHandler = pubsub_tcpHandler_create(receiver->logHelper); - } - - if (receiver->socketHandler != NULL) { - pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, (unsigned int) sessions, (unsigned int) buffer_size); - pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout); - pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg); - pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler,psa_tcp_disConnectHandler); - } - - psa_tcp_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter); - receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, - PSA_TCP_DEFAULT_METRICS_ENABLED); - - celixThreadMutex_create(&receiver->subscribers.mutex, NULL); - celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL); - celixThreadMutex_create(&receiver->thread.mutex, NULL); - - receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); - receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - receiver->requestedConnections.allConnected = false; - - if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (staticBindUrl == NULL)) { - char *urlsCopy = strndup(staticConnectUrls, 1024*1024); - char* url; - char* save = urlsCopy; - while ((url = strtok_r(save, " ", &save))) { - psa_tcp_requested_connection_entry_t *entry = calloc(1, sizeof(*entry)); - entry->statically = true; - entry->connected = false; - entry->url = strndup(url, 1024*1024); - entry->parent = receiver; - hashMap_put(receiver->requestedConnections.map, entry->url, entry); + + if (receiver->socketHandler != NULL) { + pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, (unsigned int) sessions, (unsigned int) buffer_size); + pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout); + pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg); + pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler, psa_tcp_disConnectHandler); } - free(urlsCopy); - - // Configure Receiver thread - receiver->thread.running = true; - celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, receiver); - char name[64]; - snprintf(name, 64, "TCP TR %s/%s", scope, topic); - celixThread_setName(&receiver->thread.thread, name); - psa_tcp_setupTcpContext(receiver->logHelper, &receiver->thread.thread, topicProperties); - } - - //track subscribers - if (receiver->socketHandler != NULL) { - int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); - char buf[size+1]; - snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); - celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; - opts.filter.ignoreServiceLanguage = true; - opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; - opts.filter.filter = buf; - opts.callbackHandle = receiver; - opts.addWithOwner = pubsub_tcpTopicReceiver_addSubscriber; - opts.removeWithOwner = pubsub_tcpTopicReceiver_removeSubscriber; - receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); - } - - if (receiver->socketHandler == NULL) { - free(receiver->scope); - free(receiver->topic); - free(receiver); - receiver = NULL; - L_ERROR("[PSA_TCP] Cannot create TopicReceiver for %s/%s", scope, topic); - } - return receiver; -} -void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) { - if (receiver != NULL) { + psa_tcp_setScopeAndTopicFilter(scope, topic, receiver->scopeAndTopicFilter); + receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, + PSA_TCP_DEFAULT_METRICS_ENABLED); + celixThreadMutex_create(&receiver->subscribers.mutex, NULL); + celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL); + celixThreadMutex_create(&receiver->thread.mutex, NULL); - celixThreadMutex_lock(&receiver->thread.mutex); - if (!receiver->thread.running) { - receiver->thread.running = false; - celixThreadMutex_unlock(&receiver->thread.mutex); - celixThread_join(receiver->thread.thread, NULL); + receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); + receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + receiver->requestedConnections.allConnected = false; + + if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (staticBindUrl == NULL)) { + char *urlsCopy = strndup(staticConnectUrls, 1024 * 1024); + char *url; + char *save = urlsCopy; + while ((url = strtok_r(save, " ", &save))) { + psa_tcp_requested_connection_entry_t *entry = calloc(1, sizeof(*entry)); + entry->statically = true; + entry->connected = false; + entry->url = strndup(url, 1024 * 1024); + entry->parent = receiver; + hashMap_put(receiver->requestedConnections.map, entry->url, entry); + } + free(urlsCopy); + + // Configure Receiver thread + receiver->thread.running = true; + celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, receiver); + char name[64]; + snprintf(name, 64, "TCP TR %s/%s", scope, topic); + celixThread_setName(&receiver->thread.thread, name); + psa_tcp_setupTcpContext(receiver->logHelper, &receiver->thread.thread, topicProperties); } - celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId); + //track subscribers + if (receiver->socketHandler != NULL) { + int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); + char buf[size + 1]; + snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.filter.ignoreServiceLanguage = true; + opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; + opts.filter.filter = buf; + opts.callbackHandle = receiver; + opts.addWithOwner = pubsub_tcpTopicReceiver_addSubscriber; + opts.removeWithOwner = pubsub_tcpTopicReceiver_removeSubscriber; + receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + } - celixThreadMutex_lock(&receiver->subscribers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes); - free(entry); - } - - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics); - while (hashMapIterator_hasNext(&iter2)) { - hash_map_t *origins = hashMapIterator_nextValue(&iter2); - hashMap_destroy(origins, true, true); - } - hashMap_destroy(entry->metrics, false, false); + if (receiver->socketHandler == NULL) { + free(receiver->scope); + free(receiver->topic); + free(receiver); + receiver = NULL; + L_ERROR("[PSA_TCP] Cannot create TopicReceiver for %s/%s", scope, topic); } - hashMap_destroy(receiver->subscribers.map, false, false); + return receiver; +} +void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) { + if (receiver != NULL) { - celixThreadMutex_unlock(&receiver->subscribers.mutex); - celixThreadMutex_lock(&receiver->requestedConnections.mutex); - iter = hashMapIterator_construct(receiver->requestedConnections.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - free(entry->url); - free(entry); - } - } - hashMap_destroy(receiver->requestedConnections.map, false, false); - celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + celixThreadMutex_lock(&receiver->thread.mutex); + if (!receiver->thread.running) { + receiver->thread.running = false; + celixThreadMutex_unlock(&receiver->thread.mutex); + celixThread_join(receiver->thread.thread, NULL); + } - celixThreadMutex_destroy(&receiver->subscribers.mutex); - celixThreadMutex_destroy(&receiver->requestedConnections.mutex); - celixThreadMutex_destroy(&receiver->thread.mutex); + celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId); + + celixThreadMutex_lock(&receiver->subscribers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL) { + receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes); + free(entry); + } + + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics); + while (hashMapIterator_hasNext(&iter2)) { + hash_map_t *origins = hashMapIterator_nextValue(&iter2); + hashMap_destroy(origins, true, true); + } + hashMap_destroy(entry->metrics, false, false); + } + hashMap_destroy(receiver->subscribers.map, false, false); - pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL, NULL); - pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, NULL, NULL, NULL); - if ((receiver->socketHandler)&&(receiver->sharedSocketHandler == NULL)) { - pubsub_tcpHandler_destroy(receiver->socketHandler); - receiver->socketHandler = NULL; - } - free(receiver->scope); - free(receiver->topic); - } - free(receiver); + celixThreadMutex_unlock(&receiver->subscribers.mutex); + + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + iter = hashMapIterator_construct(receiver->requestedConnections.map); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL) { + free(entry->url); + free(entry); + } + } + hashMap_destroy(receiver->requestedConnections.map, false, false); + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + + celixThreadMutex_destroy(&receiver->subscribers.mutex); + celixThreadMutex_destroy(&receiver->requestedConnections.mutex); + celixThreadMutex_destroy(&receiver->thread.mutex); + + pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL, NULL); + pubsub_tcpHandler_addConnectionCallback(receiver->socketHandler, NULL, NULL, NULL); + if ((receiver->socketHandler) && (receiver->sharedSocketHandler == NULL)) { + pubsub_tcpHandler_destroy(receiver->socketHandler); + receiver->socketHandler = NULL; + } + + free(receiver->scope); + free(receiver->topic); + } + free(receiver); } const char *pubsub_tcpTopicReceiver_scope(pubsub_tcp_topic_receiver_t *receiver) { - return receiver->scope; + return receiver->scope; } const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver) { - return receiver->topic; + return receiver->topic; } long pubsub_tcpTopicReceiver_serializerSvcId(pubsub_tcp_topic_receiver_t *receiver) { - return receiver->serializerSvcId; + return receiver->serializerSvcId; } void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) { - celixThreadMutex_lock(&receiver->requestedConnections.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); - char *url = NULL; - asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : ""); - if (entry->connected) { - celix_arrayList_add(connectedUrls, url); - } else { - celix_arrayList_add(unconnectedUrls, url); + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); + char *url = NULL; + asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : ""); + if (entry->connected) { + celix_arrayList_add(connectedUrls, url); + } else { + celix_arrayList_add(unconnectedUrls, url); + } } - } - celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); } - void pubsub_tcpTopicReceiver_connectTo( pubsub_tcp_topic_receiver_t *receiver, const char *url) { - L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s", receiver->scope, receiver->topic, url); - - celixThreadMutex_lock(&receiver->requestedConnections.mutex); - psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url); - if (entry == NULL) { - entry = calloc(1, sizeof(*entry)); - entry->url = strndup(url, 1024*1024); - entry->connected = false; - entry->statically = false; - entry->parent = receiver; - hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry); - receiver->requestedConnections.allConnected = false; - } - celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s", receiver->scope, receiver->topic, url); - psa_tcp_connectToAllRequestedConnections(receiver); + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url); + if (entry == NULL) { + entry = calloc(1, sizeof(*entry)); + entry->url = strndup(url, 1024 * 1024); + entry->connected = false; + entry->statically = false; + entry->parent = receiver; + hashMap_put(receiver->requestedConnections.map, (void *) entry->url, entry); + receiver->requestedConnections.allConnected = false; + } + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + + psa_tcp_connectToAllRequestedConnections(receiver); } void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url) { - L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", receiver->scope, receiver->topic, url); - - celixThreadMutex_lock(&receiver->requestedConnections.mutex); - psa_tcp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url); - if (entry != NULL) { - int rc = pubsub_tcpHandler_closeConnection(receiver->socketHandler, entry->url); - if (rc < 0) L_WARN("[PSA_TCP] Error disconnecting from tcp url %s. (%s)", url, strerror(errno)); - } - if (entry != NULL) { - free(entry->url); - free(entry); - } - celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", receiver->scope, receiver->topic, url); + + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + psa_tcp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url); + if (entry != NULL) { + int rc = pubsub_tcpHandler_closeConnection(receiver->socketHandler, entry->url); + if (rc < 0) L_WARN("[PSA_TCP] Error disconnecting from tcp url %s. (%s)", url, strerror(errno)); + } + if (entry != NULL) { + free(entry->url); + free(entry); + } + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); } static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { - pubsub_tcp_topic_receiver_t *receiver = handle; - - long bndId = celix_bundle_getId(bnd); - const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); - if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) { - //not the same scope. ignore - return; - } - - celixThreadMutex_lock(&receiver->subscribers.mutex); - psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId); - if (entry != NULL) { - entry->usageCount += 1; - } else { - //new create entry - entry = calloc(1, sizeof(*entry)); - entry->usageCount = 1; - entry->svc = svc; - entry->initialized = false; - receiver->subscribers.allInitialized = false; - - int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd, - &entry->msgTypes); - - if (rc == 0) { - entry->metrics = hashMap_create(NULL, NULL, NULL, NULL); - hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes); - while (hashMapIterator_hasNext(&iter)) { - pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter); - hash_map_t *origins = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - hashMap_put(entry->metrics, (void *) (uintptr_t) msgSer->msgId, origins); - } + pubsub_tcp_topic_receiver_t *receiver = handle; + + long bndId = celix_bundle_getId(bnd); + const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) { + //not the same scope. ignore + return; } - if (rc == 0) { - hashMap_put(receiver->subscribers.map, (void *) bndId, entry); + celixThreadMutex_lock(&receiver->subscribers.mutex); + psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId); + if (entry != NULL) { + entry->usageCount += 1; } else { - L_ERROR("[PSA_TCP] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic); - free(entry); + //new create entry + entry = calloc(1, sizeof(*entry)); + entry->usageCount = 1; + entry->svc = svc; + entry->initialized = false; + receiver->subscribers.allInitialized = false; + + int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd, + &entry->msgTypes); + + if (rc == 0) { + entry->metrics = hashMap_create(NULL, NULL, NULL, NULL); + hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes); + while (hashMapIterator_hasNext(&iter)) { + pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter); + hash_map_t *origins = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + hashMap_put(entry->metrics, (void *) (uintptr_t) msgSer->msgId, origins); + } + } + + if (rc == 0) { + hashMap_put(receiver->subscribers.map, (void *) bndId, entry); + } else { + L_ERROR("[PSA_TCP] Cannot create msg serializer map for TopicReceiver %s/%s", receiver->scope, receiver->topic); + free(entry); + } } - } - celixThreadMutex_unlock(&receiver->subscribers.mutex); + celixThreadMutex_unlock(&receiver->subscribers.mutex); } static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { - pubsub_tcp_topic_receiver_t *receiver = handle; - - long bndId = celix_bundle_getId(bnd); - - celixThreadMutex_lock(&receiver->subscribers.mutex); - psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId); - if (entry != NULL) { - entry->usageCount -= 1; - } - if (entry != NULL && entry->usageCount <= 0) { - //remove entry - hashMap_remove(receiver->subscribers.map, (void *) bndId); - int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes); - if (rc != 0) { - L_ERROR("[PSA_TCP] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic); + pubsub_tcp_topic_receiver_t *receiver = handle; + + long bndId = celix_bundle_getId(bnd); + + celixThreadMutex_lock(&receiver->subscribers.mutex); + psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId); + if (entry != NULL) { + entry->usageCount -= 1; } - hash_map_iterator_t iter = hashMapIterator_construct(entry->metrics); - while (hashMapIterator_hasNext(&iter)) { - hash_map_t *origins = hashMapIterator_nextValue(&iter); - hashMap_destroy(origins, true, true); + if (entry != NULL && entry->usageCount <= 0) { + //remove entry + hashMap_remove(receiver->subscribers.map, (void *) bndId); + int rc = receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes); + if (rc != 0) { + L_ERROR("[PSA_TCP] Cannot destroy msg serializers map for TopicReceiver %s/%s", receiver->scope, receiver->topic); + } + hash_map_iterator_t iter = hashMapIterator_construct(entry->metrics); + while (hashMapIterator_hasNext(&iter)) { + hash_map_t *origins = hashMapIterator_nextValue(&iter); + hashMap_destroy(origins, true, true); + } + hashMap_destroy(entry->metrics, false, false); + free(entry); } - hashMap_destroy(entry->metrics, false, false); - free(entry); - } - celixThreadMutex_unlock(&receiver->subscribers.mutex); + celixThreadMutex_unlock(&receiver->subscribers.mutex); } static inline void processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry, const pubsub_tcp_msg_header_t *hdr, const unsigned char *payload, size_t payloadSize, struct timespec *receiveTime) { - //NOTE receiver->subscribers.mutex locked - pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) (hdr->type)); - pubsub_subscriber_t *svc = entry->svc; - bool monitor = receiver->metricsEnabled; - - //monitoring - struct timespec beginSer; - struct timespec endSer; - int updateReceiveCount = 0; - int updateSerError = 0; - - if (msgSer != NULL) { - void *deserializedMsg = NULL; - bool validVersion = psa_tcp_checkVersion(msgSer->msgVersion, hdr); - if (validVersion) { - if (monitor) { - clock_gettime(CLOCK_REALTIME, &beginSer); - } - celix_status_t status = msgSer->deserialize(msgSer->handle, payload, payloadSize, &deserializedMsg); - if (monitor) { - clock_gettime(CLOCK_REALTIME, &endSer); - } - if (status == CELIX_SUCCESS) { - bool release = true; - svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release); - if (release) { - msgSer->freeMsg(msgSer->handle, deserializedMsg); + //NOTE receiver->subscribers.mutex locked + pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t)(hdr->type)); + pubsub_subscriber_t *svc = entry->svc; + bool monitor = receiver->metricsEnabled; + + //monitoring + struct timespec beginSer; + struct timespec endSer; + int updateReceiveCount = 0; + int updateSerError = 0; + + if (msgSer != NULL) { + void *deserializedMsg = NULL; + bool validVersion = psa_tcp_checkVersion(msgSer->msgVersion, hdr); + if (validVersion) { + if (monitor) { + clock_gettime(CLOCK_REALTIME, &beginSer); + } + celix_status_t status = msgSer->deserialize(msgSer->handle, payload, payloadSize, &deserializedMsg); + if (monitor) { + clock_gettime(CLOCK_REALTIME, &endSer); + } + if (status == CELIX_SUCCESS) { + bool release = true; + svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, &release); + if (release) { + msgSer->freeMsg(msgSer->handle, deserializedMsg); + } + updateReceiveCount += 1; + } else { + updateSerError += 1; + L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, + receiver->topic); + } } - updateReceiveCount += 1; - } else { - updateSerError += 1; - L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope, - receiver->topic); - } - } - } else { - L_WARN("[PSA_TCP_TR] Cannot find serializer for type id 0x%X", hdr->type); - } - - if (msgSer != NULL && monitor) { - hash_map_t *origins = hashMap_get(entry->metrics, (void *) (uintptr_t) hdr->type); - char uuidStr[UUID_STR_LEN + 1]; - uuid_unparse(hdr->originUUID, uuidStr); - psa_tcp_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr); - - if (metrics == NULL) { - metrics = calloc(1, sizeof(*metrics)); - hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN + 1), metrics); - uuid_copy(metrics->origin, hdr->originUUID); - metrics->msgTypeId = hdr->type; - metrics->maxDelayInSeconds = -INFINITY; - metrics->minDelayInSeconds = INFINITY; - metrics->lastSeqNr = 0; + } else { + L_WARN("[PSA_TCP_TR] Cannot find serializer for type id 0x%X", hdr->type); } - double diff = celix_difftime(&beginSer, &endSer); - long n = metrics->nrOfMessagesReceived; - metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n + 1); + if (msgSer != NULL && monitor) { + hash_map_t *origins = hashMap_get(entry->metrics, (void *) (uintptr_t) hdr->type); + char uuidStr[UUID_STR_LEN + 1]; + uuid_unparse(hdr->originUUID, uuidStr); + psa_tcp_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr); + + if (metrics == NULL) { + metrics = calloc(1, sizeof(*metrics)); + hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN + 1), metrics); + uuid_copy(metrics->origin, hdr->originUUID); + metrics->msgTypeId = hdr->type; + metrics->maxDelayInSeconds = -INFINITY; + metrics->minDelayInSeconds = INFINITY; + metrics->lastSeqNr = 0; + } - diff = celix_difftime(&metrics->lastMessageReceived, receiveTime); - n = metrics->nrOfMessagesReceived; - if (metrics->nrOfMessagesReceived >= 1) { - metrics->averageTimeBetweenMessagesInSeconds = - (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1); - } - metrics->lastMessageReceived = *receiveTime; + double diff = celix_difftime(&beginSer, &endSer); + long n = metrics->nrOfMessagesReceived; + metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n + 1); + diff = celix_difftime(&metrics->lastMessageReceived, receiveTime); + n = metrics->nrOfMessagesReceived; + if (metrics->nrOfMessagesReceived >= 1) { + metrics->averageTimeBetweenMessagesInSeconds = + (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1); + } + metrics->lastMessageReceived = *receiveTime; - int incr = hdr->seqNr - metrics->lastSeqNr; - if (metrics->lastSeqNr > 0 && incr > 1) { - metrics->nrOfMissingSeqNumbers += (incr - 1); - L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr); - } - metrics->lastSeqNr = hdr->seqNr; - - struct timespec sendTime; - sendTime.tv_sec = (time_t) hdr->sendtimeSeconds; - sendTime.tv_nsec = (long) hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct - diff = celix_difftime(&sendTime, receiveTime); - metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n + 1); - if (diff < metrics->minDelayInSeconds) { - metrics->minDelayInSeconds = diff; - } - if (diff > metrics->maxDelayInSeconds) { - metrics->maxDelayInSeconds = diff; - } - metrics->nrOfMessagesReceived += updateReceiveCount; - metrics->nrOfSerializationErrors += updateSerError; - } -} -static void -processMsg(void* handle, const pubsub_tcp_msg_header_t *hdr, const unsigned char *payload, size_t payloadSize, struct timespec *receiveTime) { - pubsub_tcp_topic_receiver_t *receiver = handle; - celixThreadMutex_lock(&receiver->subscribers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize, receiveTime); + int incr = hdr->seqNr - metrics->lastSeqNr; + if (metrics->lastSeqNr > 0 && incr > 1) { + metrics->nrOfMissingSeqNumbers += (incr - 1); + L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr); + } + metrics->lastSeqNr = hdr->seqNr; + + struct timespec sendTime; + sendTime.tv_sec = (time_t) hdr->sendtimeSeconds; + sendTime.tv_nsec = (long) hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct + diff = celix_difftime(&sendTime, receiveTime); + metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n + 1); + if (diff < metrics->minDelayInSeconds) { + metrics->minDelayInSeconds = diff; + } + if (diff > metrics->maxDelayInSeconds) { + metrics->maxDelayInSeconds = diff; + } + + metrics->nrOfMessagesReceived += updateReceiveCount; + metrics->nrOfSerializationErrors += updateSerError; } - } - celixThreadMutex_unlock(&receiver->subscribers.mutex); } - +static void processMsg(void *handle, const pubsub_tcp_msg_header_t *hdr, const unsigned char *payload, size_t payloadSize, struct timespec *receiveTime) { + pubsub_tcp_topic_receiver_t *receiver = handle; + celixThreadMutex_lock(&receiver->subscribers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL) { + processMsgForSubscriberEntry(receiver, entry, hdr, payload, payloadSize, receiveTime); + } + } + celixThreadMutex_unlock(&receiver->subscribers.mutex); +} static void *psa_tcp_recvThread(void *data) { - pubsub_tcp_topic_receiver_t *receiver = data; - - celixThreadMutex_lock(&receiver->thread.mutex); - bool running = receiver->thread.running; - celixThreadMutex_unlock(&receiver->thread.mutex); - - celixThreadMutex_lock(&receiver->requestedConnections.mutex); - bool allConnected = receiver->requestedConnections.allConnected; - celixThreadMutex_unlock(&receiver->requestedConnections.mutex); - - celixThreadMutex_lock(&receiver->subscribers.mutex); - bool allInitialized = receiver->subscribers.allInitialized; - celixThreadMutex_unlock(&receiver->subscribers.mutex); - - while (running) { - if (!allConnected) { - psa_tcp_connectToAllRequestedConnections(receiver); - } - if (!allInitialized) { - psa_tcp_initializeAllSubscribers(receiver); - } - pubsub_tcpHandler_handler(receiver->socketHandler); + pubsub_tcp_topic_receiver_t *receiver = data; celixThreadMutex_lock(&receiver->thread.mutex); - running = receiver->thread.running; + bool running = receiver->thread.running; celixThreadMutex_unlock(&receiver->thread.mutex); celixThreadMutex_lock(&receiver->requestedConnections.mutex); - allConnected = receiver->requestedConnections.allConnected; + bool allConnected = receiver->requestedConnections.allConnected; celixThreadMutex_unlock(&receiver->requestedConnections.mutex); celixThreadMutex_lock(&receiver->subscribers.mutex); - allInitialized = receiver->subscribers.allInitialized; + bool allInitialized = receiver->subscribers.allInitialized; celixThreadMutex_unlock(&receiver->subscribers.mutex); - } // while - return NULL; + + while (running) { + if (!allConnected) { + psa_tcp_connectToAllRequestedConnections(receiver); + } + if (!allInitialized) { + psa_tcp_initializeAllSubscribers(receiver); + } + pubsub_tcpHandler_handler(receiver->socketHandler); + + celixThreadMutex_lock(&receiver->thread.mutex); + running = receiver->thread.running; + celixThreadMutex_unlock(&receiver->thread.mutex); + + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + allConnected = receiver->requestedConnections.allConnected; + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + + celixThreadMutex_lock(&receiver->subscribers.mutex); + allInitialized = receiver->subscribers.allInitialized; + celixThreadMutex_unlock(&receiver->subscribers.mutex); + } // while + return NULL; } pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) { - pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result)); - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope); - snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic); - - int msgTypesCount = 0; - celixThreadMutex_lock(&receiver->subscribers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics); - while (hashMapIterator_hasNext(&iter2)) { - hashMapIterator_nextValue(&iter2); - msgTypesCount += 1; + pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result)); + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope); + snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic); + + int msgTypesCount = 0; + celixThreadMutex_lock(&receiver->subscribers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics); + while (hashMapIterator_hasNext(&iter2)) { + hashMapIterator_nextValue(&iter2); + msgTypesCount += 1; + } } - } - - result->nrOfMsgTypes = (unsigned long) msgTypesCount; - result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes)); - int i = 0; - iter = hashMapIterator_construct(receiver->subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics); - while (hashMapIterator_hasNext(&iter2)) { - hash_map_t *origins = hashMapIterator_nextValue(&iter2); - result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), sizeof(*(result->msgTypes[i].origins))); - result->msgTypes[i].nrOfOrigins = hashMap_size(origins); - int k = 0; - hash_map_iterator_t iter3 = hashMapIterator_construct(origins); - while (hashMapIterator_hasNext(&iter3)) { - psa_tcp_subscriber_metrics_entry_t *metrics = hashMapIterator_nextValue(&iter3); - result->msgTypes[i].typeId = metrics->msgTypeId; - pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) metrics->msgTypeId); - if (msgSer) { - snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName); - uuid_copy(result->msgTypes[i].origins[k].originUUID, metrics->origin); - result->msgTypes[i].origins[k].nrOfMessagesReceived = metrics->nrOfMessagesReceived; - result->msgTypes[i].origins[k].nrOfSerializationErrors = metrics->nrOfSerializationErrors; - result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds; - result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds; - result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds; - result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds; - result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds; - result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived; - result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers; - - k += 1; - } else { - L_WARN("[PSA_TCP]: Error cannot find key 0x%X in msg map during metrics collection!\n", metrics->msgTypeId); + + result->nrOfMsgTypes = (unsigned long) msgTypesCount; + result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes)); + int i = 0; + iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics); + while (hashMapIterator_hasNext(&iter2)) { + hash_map_t *origins = hashMapIterator_nextValue(&iter2); + result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), sizeof(*(result->msgTypes[i].origins))); + result->msgTypes[i].nrOfOrigins = hashMap_size(origins); + int k = 0; + hash_map_iterator_t iter3 = hashMapIterator_construct(origins); + while (hashMapIterator_hasNext(&iter3)) { + psa_tcp_subscriber_metrics_entry_t *metrics = hashMapIterator_nextValue(&iter3); + result->msgTypes[i].typeId = metrics->msgTypeId; + pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) metrics->msgTypeId); + if (msgSer) { + snprintf(result->msgTypes[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName); + uuid_copy(result->msgTypes[i].origins[k].originUUID, metrics->origin); + result->msgTypes[i].origins[k].nrOfMessagesReceived = metrics->nrOfMessagesReceived; + result->msgTypes[i].origins[k].nrOfSerializationErrors = metrics->nrOfSerializationErrors; + result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds; + result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds; + result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds; + result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds; + result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds; + result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived; + result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers; + + k += 1; + } else { + L_WARN("[PSA_TCP]: Error cannot find key 0x%X in msg map during metrics collection!\n", metrics->msgTypeId); + } + } + i += 1; } - } - i += 1; } - } - celixThreadMutex_unlock(&receiver->subscribers.mutex); - return result; + celixThreadMutex_unlock(&receiver->subscribers.mutex); + return result; } static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver) { - celixThreadMutex_lock(&receiver->requestedConnections.mutex); - if (!receiver->requestedConnections.allConnected) { - bool allConnected = true; - hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); - if (!entry->connected){ - entry->fd = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url); - if (entry->fd < 0) { - L_WARN("[PSA_TCP] Error connecting to tcp url %s\n", entry->url); - allConnected = false; + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + if (!receiver->requestedConnections.allConnected) { + bool allConnected = true; + hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); + if (!entry->connected) { + entry->fd = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url); + if (entry->fd < 0) { + L_WARN("[PSA_TCP] Error connecting to tcp url %s\n", entry->url); + allConnected = false; + } + } } - } + receiver->requestedConnections.allConnected = allConnected; } - receiver->requestedConnections.allConnected = allConnected; - } - celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); } static void psa_tcp_connectHandler(void *handle, const char *url, bool lock) { - pubsub_tcp_topic_receiver_t *receiver = handle; - L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s", receiver->scope, receiver->topic, url); - if (lock) celixThreadMutex_lock(&receiver->requestedConnections.mutex); - psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url); - if (entry == NULL) { - entry = calloc(1, sizeof(*entry)); - entry->parent = receiver; - entry->url = strndup(url, 1024*1024); - entry->statically = true; - hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry); - receiver->requestedConnections.allConnected = false; - } - entry->connected = true; - if (lock) celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + pubsub_tcp_topic_receiver_t *receiver = handle; + L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s", receiver->scope, receiver->topic, url); + if (lock) celixThreadMutex_lock(&receiver->requestedConnections.mutex); + psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url); + if (entry == NULL) { + entry = calloc(1, sizeof(*entry)); + entry->parent = receiver; + entry->url = strndup(url, 1024 * 1024); + entry->statically = true; + hashMap_put(receiver->requestedConnections.map, (void *) entry->url, entry); + receiver->requestedConnections.allConnected = false; + } + entry->connected = true; + if (lock) celixThreadMutex_unlock(&receiver->requestedConnections.mutex); } static void psa_tcp_disConnectHandler(void *handle, const char *url) { - pubsub_tcp_topic_receiver_t *receiver = handle; - L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", receiver->scope, receiver->topic, url); - celixThreadMutex_lock(&receiver->requestedConnections.mutex); - psa_tcp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url); - if (entry != NULL) { - free(entry->url); - free(entry); - } - celixThreadMutex_unlock(&receiver->requestedConnections.mutex); + pubsub_tcp_topic_receiver_t *receiver = handle; + L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s", receiver->scope, receiver->topic, url); + celixThreadMutex_lock(&receiver->requestedConnections.mutex); + psa_tcp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url); + if (entry != NULL) { + free(entry->url); + free(entry); + } + celixThreadMutex_unlock(&receiver->requestedConnections.mutex); } static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver) { - celixThreadMutex_lock(&receiver->subscribers.mutex); - if (!receiver->subscribers.allInitialized) { - bool allInitialized = true; - hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); - if (!entry->initialized) { - int rc = 0; - if (entry->svc != NULL && entry->svc->init != NULL) { - rc = entry->svc->init(entry->svc->handle); - } - if (rc == 0) { - entry->initialized = true; - } else { - L_WARN("Cannot initialize subscriber svc. Got rc %i", rc); - allInitialized = false; + celixThreadMutex_lock(&receiver->subscribers.mutex); + if (!receiver->subscribers.allInitialized) { + bool allInitialized = true; + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); + if (!entry->initialized) { + int rc = 0; + if (entry->svc != NULL && entry->svc->init != NULL) { + rc = entry->svc->init(entry->svc->handle); + } + if (rc == 0) { + entry->initialized = true; + } else { + L_WARN("Cannot initialize subscriber svc. Got rc %i", rc); + allInitialized = false; + } + } } - } + receiver->subscribers.allInitialized = allInitialized; } - receiver->subscribers.allInitialized = allInitialized; - } - celixThreadMutex_unlock(&receiver->subscribers.mutex); + celixThreadMutex_unlock(&receiver->subscribers.mutex); } diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c index 607508c..b8a2aa9 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c @@ -47,65 +47,65 @@ logHelper_log(sender->logHelper, OSGI_LOGSERVICE_ERROR, __VA_ARGS__) struct pubsub_tcp_topic_sender { - celix_bundle_context_t *ctx; - log_helper_t *logHelper; - long serializerSvcId; - pubsub_serializer_service_t *serializer; - uuid_t fwUUID; - bool metricsEnabled; - pubsub_tcpHandler_pt socketHandler; - pubsub_tcpHandler_pt sharedSocketHandler; - - char *scope; - char *topic; - char scopeAndTopicFilter[5]; - char *url; - bool isStatic; - - struct { - celix_thread_mutex_t mutex; - } tcp; - - struct { - celix_thread_t thread; - celix_thread_mutex_t mutex; - bool running; - } thread; - - struct { - long svcId; - celix_service_factory_t factory; - } publisher; - - struct { - celix_thread_mutex_t mutex; - hash_map_t *map; //key = bndId, value = psa_tcp_bounded_service_entry_t - } boundedServices; + celix_bundle_context_t *ctx; + log_helper_t *logHelper; + long serializerSvcId; + pubsub_serializer_service_t *serializer; + uuid_t fwUUID; + bool metricsEnabled; + pubsub_tcpHandler_pt socketHandler; + pubsub_tcpHandler_pt sharedSocketHandler; + + char *scope; + char *topic; + char scopeAndTopicFilter[5]; + char *url; + bool isStatic; + + struct { + celix_thread_mutex_t mutex; + } tcp; + + struct { + celix_thread_t thread; + celix_thread_mutex_t mutex; + bool running; + } thread; + + struct { + long svcId; + celix_service_factory_t factory; + } publisher; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = bndId, value = psa_tcp_bounded_service_entry_t + } boundedServices; }; typedef struct psa_tcp_send_msg_entry { - pubsub_tcp_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send) - pubsub_msg_serializer_t *msgSer; - celix_thread_mutex_t sendLock; //protects send & Seqnr - int seqNr; - struct { - celix_thread_mutex_t mutex; //protects entries in struct - long nrOfMessagesSend; - long nrOfMessagesSendFailed; - long nrOfSerializationErrors; - struct timespec lastMessageSend; - double averageTimeBetweenMessagesInSeconds; - double averageSerializationTimeInSeconds; - } metrics; + pubsub_tcp_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send) + pubsub_msg_serializer_t *msgSer; + celix_thread_mutex_t sendLock; //protects send & Seqnr + int seqNr; + struct { + celix_thread_mutex_t mutex; //protects entries in struct + long nrOfMessagesSend; + long nrOfMessagesSendFailed; + long nrOfSerializationErrors; + struct timespec lastMessageSend; + double averageTimeBetweenMessagesInSeconds; + double averageSerializationTimeInSeconds; + } metrics; } psa_tcp_send_msg_entry_t; typedef struct psa_tcp_bounded_service_entry { - pubsub_tcp_topic_sender_t *parent; - pubsub_publisher_t service; - long bndId; - hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t - hash_map_t *msgEntries; //key = msg type id, value = psa_tcp_send_msg_entry_t - int getCount; + pubsub_tcp_topic_sender_t *parent; + pubsub_publisher_t service; + long bndId; + hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t + hash_map_t *msgEntries; //key = msg type id, value = psa_tcp_send_msg_entry_t + int getCount; } psa_tcp_bounded_service_entry_t; static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, @@ -128,465 +128,464 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( const char *scope, const char *topic, const celix_properties_t *topicProperties, - pubsub_tcp_endPointStore_t* endPointStore, + pubsub_tcp_endPointStore_t *endPointStore, long serializerSvcId, pubsub_serializer_service_t *ser, const char *bindIP, const char *staticBindUrl, unsigned int basePort, unsigned int maxPort) { - pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender)); - sender->ctx = ctx; - sender->logHelper = logHelper; - sender->serializerSvcId = serializerSvcId; - sender->serializer = ser; - sender->socketHandler = pubsub_tcpHandler_create(sender->logHelper); - psa_tcp_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter); - const char *uuid = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL); - if (uuid != NULL) { - uuid_parse(uuid, sender->fwUUID); - } - sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED); - - /* Check if it's a static endpoint */ - bool isEndPointTypeClient = false; - bool isEndPointTypeServer = false; - const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL); - if (endPointType != NULL) { - if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) ==0) { - isEndPointTypeClient = true; + pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender)); + sender->ctx = ctx; + sender->logHelper = logHelper; + sender->serializerSvcId = serializerSvcId; + sender->serializer = ser; + sender->socketHandler = pubsub_tcpHandler_create(sender->logHelper); + psa_tcp_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter); + const char *uuid = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL); + if (uuid != NULL) { + uuid_parse(uuid, sender->fwUUID); } - if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) ==0) { - isEndPointTypeServer = true; - } - } - - // When endpoint is client, use the connection urls as a key. - const char *staticConnectUrls = ((topicProperties != NULL) && isEndPointTypeClient) ? celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL) : NULL; - - /* When it's an endpoint share the socket with the receiver */ - if (staticConnectUrls != NULL || (isEndPointTypeServer && staticBindUrl != NULL)) { - celixThreadMutex_lock(&endPointStore->mutex); - sender->sharedSocketHandler = sender->socketHandler; - pubsub_tcpHandler_pt entry = hashMap_get(endPointStore->map, staticConnectUrls); - if (entry == NULL) { - entry = sender->socketHandler; - hashMap_put(endPointStore->map, (void *)(isEndPointTypeClient ? staticConnectUrls : staticBindUrl), entry); + sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED); + + /* Check if it's a static endpoint */ + bool isEndPointTypeClient = false; + bool isEndPointTypeServer = false; + const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL); + if (endPointType != NULL) { + if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) { + isEndPointTypeClient = true; + } + if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) { + isEndPointTypeServer = true; + } } - celixThreadMutex_unlock(&endPointStore->mutex); - } - - //setting up tcp socket for TCP TopicSender - { - if (staticConnectUrls != NULL) { - // Store url for client static endpoint - sender->url = strndup(staticConnectUrls, 1024 * 1024); - sender->isStatic = true; + + // When endpoint is client, use the connection urls as a key. + const char *staticConnectUrls = ((topicProperties != NULL) && isEndPointTypeClient) ? celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL) : NULL; + + /* When it's an endpoint share the socket with the receiver */ + if (staticConnectUrls != NULL || (isEndPointTypeServer && staticBindUrl != NULL)) { + celixThreadMutex_lock(&endPointStore->mutex); + sender->sharedSocketHandler = sender->socketHandler; + pubsub_tcpHandler_pt entry = hashMap_get(endPointStore->map, staticConnectUrls); + if (entry == NULL) { + entry = sender->socketHandler; + hashMap_put(endPointStore->map, (void *) (isEndPointTypeClient ? staticConnectUrls : staticBindUrl), entry); + } + celixThreadMutex_unlock(&endPointStore->mutex); } - else if (staticBindUrl != NULL) { - int rv = pubsub_tcpHandler_listen(sender->socketHandler, (char *) staticBindUrl); - if (rv == -1) { - L_WARN("Error for tcp_bind using static bind url '%s'. %s", staticBindUrl, strerror(errno)); - } else { - sender->url = strndup(staticBindUrl, 1024 * 1024); - sender->isStatic = true; - } - } else { - int retry = 0; - while (sender->url == NULL && retry < TCP_BIND_MAX_RETRY) { - /* Randomized part due to same bundle publishing on different topics */ - unsigned int port = rand_range(basePort, maxPort); - char *url = NULL; - asprintf(&url, "tcp://%s:%u", bindIP, port); - char *bindUrl = NULL; - asprintf(&bindUrl, "tcp://0.0.0.0:%u", port); - int rv = pubsub_tcpHandler_listen(sender->socketHandler, bindUrl); - if (rv == -1) { - L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", bindUrl, strerror(errno)); - free(url); + + //setting up tcp socket for TCP TopicSender + { + if (staticConnectUrls != NULL) { + // Store url for client static endpoint + sender->url = strndup(staticConnectUrls, 1024 * 1024); + sender->isStatic = true; + } else if (staticBindUrl != NULL) { + int rv = pubsub_tcpHandler_listen(sender->socketHandler, (char *) staticBindUrl); + if (rv == -1) { + L_WARN("Error for tcp_bind using static bind url '%s'. %s", staticBindUrl, strerror(errno)); + } else { + sender->url = strndup(staticBindUrl, 1024 * 1024); + sender->isStatic = true; + } } else { - sender->url = url; + int retry = 0; + while (sender->url == NULL && retry < TCP_BIND_MAX_RETRY) { + /* Randomized part due to same bundle publishing on different topics */ + unsigned int port = rand_range(basePort, maxPort); + char *url = NULL; + asprintf(&url, "tcp://%s:%u", bindIP, port); + char *bindUrl = NULL; + asprintf(&bindUrl, "tcp://0.0.0.0:%u", port); + int rv = pubsub_tcpHandler_listen(sender->socketHandler, bindUrl); + if (rv == -1) { + L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", bindUrl, strerror(errno)); + free(url); + } else { + sender->url = url; + } + retry++; + free(bindUrl); + } } - retry++; - free(bindUrl); - } } - } - - if (sender->url != NULL) { - sender->scope = strndup(scope, 1024 * 1024); - sender->topic = strndup(topic, 1024 * 1024); - - celixThreadMutex_create(&sender->boundedServices.mutex, NULL); - celixThreadMutex_create(&sender->tcp.mutex, NULL); - celixThreadMutex_create(&sender->thread.mutex, NULL); - sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL); - } - - if (sender->socketHandler != NULL) { - sender->thread.running = true; - celixThread_create(&sender->thread.thread, NULL, psa_tcp_sendThread, sender); - char name[64]; - snprintf(name, 64, "TCP TS %s/%s", scope, topic); - celixThread_setName(&sender->thread.thread, name); - psa_tcp_setupTcpContext(sender->logHelper, &sender->thread.thread, topicProperties); - } - - - //register publisher services using a service factory - if (sender->url != NULL) { - sender->publisher.factory.handle = sender; - sender->publisher.factory.getService = psa_tcp_getPublisherService; - sender->publisher.factory.ungetService = psa_tcp_ungetPublisherService; - - celix_properties_t *props = celix_properties_create(); - celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic); - celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope); - - celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; - opts.factory = &sender->publisher.factory; - opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME; - opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION; - opts.properties = props; - - sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts); - } - - if (sender->url == NULL) { - free(sender); - sender = NULL; - } - - return sender; -} -void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) { - if (sender != NULL) { - celixThreadMutex_lock(&sender->thread.mutex); - if (!sender->thread.running) { - sender->thread.running = false; - celixThreadMutex_unlock(&sender->thread.mutex); - celixThread_join(sender->thread.thread, NULL); + if (sender->url != NULL) { + sender->scope = strndup(scope, 1024 * 1024); + sender->topic = strndup(topic, 1024 * 1024); + + celixThreadMutex_create(&sender->boundedServices.mutex, NULL); + celixThreadMutex_create(&sender->tcp.mutex, NULL); + celixThreadMutex_create(&sender->thread.mutex, NULL); + sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL); } - celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId); - celixThreadMutex_lock(&sender->boundedServices.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes); - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries); - while (hashMapIterator_hasNext(&iter2)) { - psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2); - celixThreadMutex_destroy(&msgEntry->metrics.mutex); - free(msgEntry); - } - hashMap_destroy(entry->msgEntries, false, false); - free(entry); - } + if (sender->socketHandler != NULL) { + sender->thread.running = true; + celixThread_create(&sender->thread.thread, NULL, psa_tcp_sendThread, sender); + char name[64]; + snprintf(name, 64, "TCP TS %s/%s", scope, topic); + celixThread_setName(&sender->thread.thread, name); + psa_tcp_setupTcpContext(sender->logHelper, &sender->thread.thread, topicProperties); } - hashMap_destroy(sender->boundedServices.map, false, false); - celixThreadMutex_unlock(&sender->boundedServices.mutex); - celixThreadMutex_destroy(&sender->boundedServices.mutex); - celixThreadMutex_destroy(&sender->tcp.mutex); - if ((sender->socketHandler)&&(sender->sharedSocketHandler == NULL)) { - pubsub_tcpHandler_destroy(sender->socketHandler); - sender->socketHandler = NULL; + + //register publisher services using a service factory + if (sender->url != NULL) { + sender->publisher.factory.handle = sender; + sender->publisher.factory.getService = psa_tcp_getPublisherService; + sender->publisher.factory.ungetService = psa_tcp_ungetPublisherService; + + celix_properties_t *props = celix_properties_create(); + celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic); + celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope); + + celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; + opts.factory = &sender->publisher.factory; + opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME; + opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION; + opts.properties = props; + + sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts); + } + + if (sender->url == NULL) { + free(sender); + sender = NULL; } - free(sender->scope); - free(sender->topic); - free(sender->url); - free(sender); - } + return sender; +} + +void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) { + if (sender != NULL) { + celixThreadMutex_lock(&sender->thread.mutex); + if (!sender->thread.running) { + sender->thread.running = false; + celixThreadMutex_unlock(&sender->thread.mutex); + celixThread_join(sender->thread.thread, NULL); + } + celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId); + + celixThreadMutex_lock(&sender->boundedServices.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL) { + sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes); + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries); + while (hashMapIterator_hasNext(&iter2)) { + psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2); + celixThreadMutex_destroy(&msgEntry->metrics.mutex); + free(msgEntry); + } + hashMap_destroy(entry->msgEntries, false, false); + free(entry); + } + } + hashMap_destroy(sender->boundedServices.map, false, false); + celixThreadMutex_unlock(&sender->boundedServices.mutex); + celixThreadMutex_destroy(&sender->boundedServices.mutex); + celixThreadMutex_destroy(&sender->tcp.mutex); + + if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) { + pubsub_tcpHandler_destroy(sender->socketHandler); + sender->socketHandler = NULL; + } + + free(sender->scope); + free(sender->topic); + free(sender->url); + free(sender); + } } long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender) { - return sender->serializerSvcId; + return sender->serializerSvcId; } const char *pubsub_tcpTopicSender_scope(pubsub_tcp_topic_sender_t *sender) { - return sender->scope; + return sender->scope; } const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) { - return sender->topic; + return sender->topic; } const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) { - return pubsub_tcpHandler_url(sender->socketHandler); + return pubsub_tcpHandler_url(sender->socketHandler); } bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) { - return sender->isStatic; + return sender->isStatic; } void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) { - //TODO subscriber count -> topic info + //TODO subscriber count -> topic info } void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) { - //TODO + //TODO } static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) { - pubsub_tcp_topic_sender_t *sender = handle; - long bndId = celix_bundle_getId(requestingBundle); - - celixThreadMutex_lock(&sender->boundedServices.mutex); - psa_tcp_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void *) bndId); - if (entry != NULL) { - entry->getCount += 1; - } else { - entry = calloc(1, sizeof(*entry)); - entry->getCount = 1; - entry->parent = sender; - entry->bndId = bndId; - entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL); - - int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t *) requestingBundle, - &entry->msgTypes); - if (rc == 0) { - hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_t *hashMapEntry = hashMapIterator_nextEntry(&iter); - void *key = hashMapEntry_getKey(hashMapEntry); - psa_tcp_send_msg_entry_t *sendEntry = calloc(1, sizeof(*sendEntry)); - sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry); - sendEntry->header.type = (int32_t) sendEntry->msgSer->msgId; - int major; - int minor; - version_getMajor(sendEntry->msgSer->msgVersion, &major); - version_getMinor(sendEntry->msgSer->msgVersion, &minor); - sendEntry->header.major = (int8_t) major; - sendEntry->header.minor = (int8_t) minor; - uuid_copy(sendEntry->header.originUUID, sender->fwUUID); - celixThreadMutex_create(&sendEntry->metrics.mutex, NULL); - hashMap_put(entry->msgEntries, key, sendEntry); - } - entry->service.handle = entry; - entry->service.localMsgTypeIdForMsgType = psa_tcp_localMsgTypeIdForMsgType; - entry->service.send = psa_tcp_topicPublicationSend; - hashMap_put(sender->boundedServices.map, (void *) bndId, entry); + pubsub_tcp_topic_sender_t *sender = handle; + long bndId = celix_bundle_getId(requestingBundle); + + celixThreadMutex_lock(&sender->boundedServices.mutex); + psa_tcp_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void *) bndId); + if (entry != NULL) { + entry->getCount += 1; } else { - L_ERROR("Error creating serializer map for TCP TopicSender %s/%s", sender->scope, sender->topic); + entry = calloc(1, sizeof(*entry)); + entry->getCount = 1; + entry->parent = sender; + entry->bndId = bndId; + entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL); + + int rc = sender->serializer->createSerializerMap(sender->serializer->handle, (celix_bundle_t *) requestingBundle, + &entry->msgTypes); + if (rc == 0) { + hash_map_iterator_t iter = hashMapIterator_construct(entry->msgTypes); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_t *hashMapEntry = hashMapIterator_nextEntry(&iter); + void *key = hashMapEntry_getKey(hashMapEntry); + psa_tcp_send_msg_entry_t *sendEntry = calloc(1, sizeof(*sendEntry)); + sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry); + sendEntry->header.type = (int32_t) sendEntry->msgSer->msgId; + int major; + int minor; + version_getMajor(sendEntry->msgSer->msgVersion, &major); + version_getMinor(sendEntry->msgSer->msgVersion, &minor); + sendEntry->header.major = (int8_t) major; + sendEntry->header.minor = (int8_t) minor; + uuid_copy(sendEntry->header.originUUID, sender->fwUUID); + celixThreadMutex_create(&sendEntry->metrics.mutex, NULL); + hashMap_put(entry->msgEntries, key, sendEntry); + } + entry->service.handle = entry; + entry->service.localMsgTypeIdForMsgType = psa_tcp_localMsgTypeIdForMsgType; + entry->service.send = psa_tcp_topicPublicationSend; + hashMap_put(sender->boundedServices.map, (void *) bndId, entry); + } else { + L_ERROR("Error creating serializer map for TCP TopicSender %s/%s", sender->scope, sender->topic); + } } - } - celixThreadMutex_unlock(&sender->boundedServices.mutex); + celixThreadMutex_unlock(&sender->boundedServices.mutex); - return &entry->service; + return &entry->service; } static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) { - pubsub_tcp_topic_sender_t *sender = handle; - long bndId = celix_bundle_getId(requestingBundle); - - celixThreadMutex_lock(&sender->boundedServices.mutex); - psa_tcp_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void *) bndId); - if (entry != NULL) { - entry->getCount -= 1; - } - if (entry != NULL && entry->getCount == 0) { - //free entry - hashMap_remove(sender->boundedServices.map, (void *) bndId); - int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes); - if (rc != 0) { - L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n"); + pubsub_tcp_topic_sender_t *sender = handle; + long bndId = celix_bundle_getId(requestingBundle); + + celixThreadMutex_lock(&sender->boundedServices.mutex); + psa_tcp_bounded_service_entry_t *entry = hashMap_get(sender->boundedServices.map, (void *) bndId); + if (entry != NULL) { + entry->getCount -= 1; } + if (entry != NULL && entry->getCount == 0) { + //free entry + hashMap_remove(sender->boundedServices.map, (void *) bndId); + int rc = sender->serializer->destroySerializerMap(sender->serializer->handle, entry->msgTypes); + if (rc != 0) { + L_ERROR("Error destroying publisher service, serializer not available / cannot get msg serializer map\n"); + } - hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter); - celixThreadMutex_destroy(&msgEntry->metrics.mutex); - free(msgEntry); + hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter); + celixThreadMutex_destroy(&msgEntry->metrics.mutex); + free(msgEntry); + } + hashMap_destroy(entry->msgEntries, false, false); + free(entry); } - hashMap_destroy(entry->msgEntries, false, false); - free(entry); - } - celixThreadMutex_unlock(&sender->boundedServices.mutex); + celixThreadMutex_unlock(&sender->boundedServices.mutex); } static void *psa_tcp_sendThread(void *data) { - pubsub_tcp_topic_sender_t *sender = data; - - celixThreadMutex_lock(&sender->thread.mutex); - bool running = sender->thread.running; - celixThreadMutex_unlock(&sender->thread.mutex); - - while (running) { - pubsub_tcpHandler_handler(sender->socketHandler); + pubsub_tcp_topic_sender_t *sender = data; celixThreadMutex_lock(&sender->thread.mutex); - running = sender->thread.running; + bool running = sender->thread.running; celixThreadMutex_unlock(&sender->thread.mutex); - } // while - return NULL; + while (running) { + pubsub_tcpHandler_handler(sender->socketHandler); + + celixThreadMutex_lock(&sender->thread.mutex); + running = sender->thread.running; + celixThreadMutex_unlock(&sender->thread.mutex); + + } // while + return NULL; } pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender) { - pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result)); - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope); - snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic); - celixThreadMutex_lock(&sender->boundedServices.mutex); - size_t count = 0; - hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter); - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries); - while (hashMapIterator_hasNext(&iter2)) { - hashMapIterator_nextValue(&iter2); - count += 1; + pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result)); + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope); + snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic); + celixThreadMutex_lock(&sender->boundedServices.mutex); + size_t count = 0; + hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter); + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries); + while (hashMapIterator_hasNext(&iter2)) { + hashMapIterator_nextValue(&iter2); + count += 1; + } } - } - - result->msgMetrics = calloc(count, sizeof(*result)); - - iter = hashMapIterator_construct(sender->boundedServices.map); - int i = 0; - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter); - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries); - while (hashMapIterator_hasNext(&iter2)) { - psa_tcp_send_msg_entry_t *mEntry = hashMapIterator_nextValue(&iter2); - celixThreadMutex_lock(&mEntry->metrics.mutex); - result->msgMetrics[i].nrOfMessagesSend = mEntry->metrics.nrOfMessagesSend; - result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed; - result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors; - result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds; - result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds; - result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend; - result->msgMetrics[i].bndId = entry->bndId; - result->msgMetrics[i].typeId = mEntry->header.type; - snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName); - i += 1; - celixThreadMutex_unlock(&mEntry->metrics.mutex); + + result->msgMetrics = calloc(count, sizeof(*result)); + + iter = hashMapIterator_construct(sender->boundedServices.map); + int i = 0; + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter); + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries); + while (hashMapIterator_hasNext(&iter2)) { + psa_tcp_send_msg_entry_t *mEntry = hashMapIterator_nextValue(&iter2); + celixThreadMutex_lock(&mEntry->metrics.mutex); + result->msgMetrics[i].nrOfMessagesSend = mEntry->metrics.nrOfMessagesSend; + result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed; + result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors; + result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds; + result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds; + result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend; + result->msgMetrics[i].bndId = entry->bndId; + result->msgMetrics[i].typeId = mEntry->header.type; + snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName); + i += 1; + celixThreadMutex_unlock(&mEntry->metrics.mutex); + } } - } - celixThreadMutex_unlock(&sender->boundedServices.mutex); - result->nrOfmsgMetrics = (int) count; - return result; + celixThreadMutex_unlock(&sender->boundedServices.mutex); + result->nrOfmsgMetrics = (int) count; + return result; } static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg) { - int status = CELIX_SUCCESS; - psa_tcp_bounded_service_entry_t *bound = handle; - pubsub_tcp_topic_sender_t *sender = bound->parent; - bool monitor = sender->metricsEnabled; - - psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId)); - - //metrics updates - struct timespec sendTime; - struct timespec serializationStart; - struct timespec serializationEnd; - //int unknownMessageCountUpdate = 0; - int sendErrorUpdate = 0; - int serializationErrorUpdate = 0; - int sendCountUpdate = 0; - - if (entry != NULL) { - delay_first_send_for_late_joiners(sender); - if (monitor) { - clock_gettime(CLOCK_REALTIME, &serializationStart); - } + int status = CELIX_SUCCESS; + psa_tcp_bounded_service_entry_t *bound = handle; + pubsub_tcp_topic_sender_t *sender = bound->parent; + bool monitor = sender->metricsEnabled; + + psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t)(msgTypeId)); + + //metrics updates + struct timespec sendTime; + struct timespec serializationStart; + struct timespec serializationEnd; + //int unknownMessageCountUpdate = 0; + int sendErrorUpdate = 0; + int serializationErrorUpdate = 0; + int sendCountUpdate = 0; + + if (entry != NULL) { + delay_first_send_for_late_joiners(sender); + if (monitor) { + clock_gettime(CLOCK_REALTIME, &serializationStart); + } - void *serializedOutput = NULL; - size_t serializedOutputLen = 0; - status = entry->msgSer->serialize(entry->msgSer->handle, inMsg, &serializedOutput, &serializedOutputLen); + void *serializedOutput = NULL; + size_t serializedOutputLen = 0; + status = entry->msgSer->serialize(entry->msgSer->handle, inMsg, &serializedOutput, &serializedOutputLen); - if (monitor) { - clock_gettime(CLOCK_REALTIME, &serializationEnd); - } + if (monitor) { + clock_gettime(CLOCK_REALTIME, &serializationEnd); + } - if (status == CELIX_SUCCESS /*ser ok*/) { - //TODO refactor, is the mutex really needed? - celixThreadMutex_lock(&entry->sendLock); - pubsub_tcp_msg_header_t msg_hdr = entry->header; - msg_hdr.seqNr = -1; - msg_hdr.sendtimeSeconds = 0; - msg_hdr.sendTimeNanoseconds = 0; - if (monitor) { - clock_gettime(CLOCK_REALTIME, &sendTime); - msg_hdr.sendtimeSeconds = (int64_t) sendTime.tv_sec; - msg_hdr.sendTimeNanoseconds = (int64_t) sendTime.tv_nsec; - msg_hdr.seqNr = entry->seqNr++; - } - - errno = 0; - bool sendOk = true; - { - int rc = pubsub_tcpHandler_write(sender->socketHandler, &msg_hdr, serializedOutput, serializedOutputLen, 0); - if (rc < 0) { - status = -1; - sendOk = false; + if (status == CELIX_SUCCESS /*ser ok*/) { + //TODO refactor, is the mutex really needed? + celixThreadMutex_lock(&entry->sendLock); + pubsub_tcp_msg_header_t msg_hdr = entry->header; + msg_hdr.seqNr = -1; + msg_hdr.sendtimeSeconds = 0; + msg_hdr.sendTimeNanoseconds = 0; + if (monitor) { + clock_gettime(CLOCK_REALTIME, &sendTime); + msg_hdr.sendtimeSeconds = (int64_t) sendTime.tv_sec; + msg_hdr.sendTimeNanoseconds = (int64_t) sendTime.tv_nsec; + msg_hdr.seqNr = entry->seqNr++; + } + + errno = 0; + bool sendOk = true; + { + int rc = pubsub_tcpHandler_write(sender->socketHandler, &msg_hdr, serializedOutput, serializedOutputLen, 0); + if (rc < 0) { + status = -1; + sendOk = false; + } + free(serializedOutput); + } + + celixThreadMutex_unlock(&entry->sendLock); + if (sendOk) { + sendCountUpdate = 1; + } else { + sendErrorUpdate = 1; + L_WARN("[PSA_TCP_TS] Error sending tcp. %s", strerror(errno)); + } + } else { + serializationErrorUpdate = 1; + L_WARN("[PSA_TCP_TS] Error serialize message of type %s for scope/topic %s/%s", entry->msgSer->msgName, + sender->scope, sender->topic); } - free(serializedOutput); - } - - celixThreadMutex_unlock(&entry->sendLock); - if (sendOk) { - sendCountUpdate = 1; - } else { - sendErrorUpdate = 1; - L_WARN("[PSA_TCP_TS] Error sending tcp. %s", strerror(errno)); - } } else { - serializationErrorUpdate = 1; - L_WARN("[PSA_TCP_TS] Error serialize message of type %s for scope/topic %s/%s", entry->msgSer->msgName, - sender->scope, sender->topic); - } - } else { - //unknownMessageCountUpdate = 1; - status = CELIX_SERVICE_EXCEPTION; - L_WARN("[PSA_TCP_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, - sender->scope, sender->topic); - } - - - if (monitor && entry != NULL) { - celixThreadMutex_lock(&entry->metrics.mutex); - - long n = entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed; - double diff = celix_difftime(&serializationStart, &serializationEnd); - double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n + 1); - entry->metrics.averageSerializationTimeInSeconds = average; - - if (entry->metrics.nrOfMessagesSend > 2) { - diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime); - n = entry->metrics.nrOfMessagesSend; - average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1); - entry->metrics.averageTimeBetweenMessagesInSeconds = average; + //unknownMessageCountUpdate = 1; + status = CELIX_SERVICE_EXCEPTION; + L_WARN("[PSA_TCP_TS] Error cannot serialize message with msg type id %i for scope/topic %s/%s", msgTypeId, + sender->scope, sender->topic); } - entry->metrics.lastMessageSend = sendTime; - entry->metrics.nrOfMessagesSend += sendCountUpdate; - entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate; - entry->metrics.nrOfSerializationErrors += serializationErrorUpdate; - celixThreadMutex_unlock(&entry->metrics.mutex); - } + if (monitor && entry != NULL) { + celixThreadMutex_lock(&entry->metrics.mutex); + + long n = entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed; + double diff = celix_difftime(&serializationStart, &serializationEnd); + double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n + 1); + entry->metrics.averageSerializationTimeInSeconds = average; + + if (entry->metrics.nrOfMessagesSend > 2) { + diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime); + n = entry->metrics.nrOfMessagesSend; + average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1); + entry->metrics.averageTimeBetweenMessagesInSeconds = average; + } - return status; + entry->metrics.lastMessageSend = sendTime; + entry->metrics.nrOfMessagesSend += sendCountUpdate; + entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate; + entry->metrics.nrOfSerializationErrors += serializationErrorUpdate; + + celixThreadMutex_unlock(&entry->metrics.mutex); + } + + return status; } static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender) { - static bool firstSend = true; + static bool firstSend = true; - if (firstSend) { - L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n"); - sleep(FIRST_SEND_DELAY_IN_SECONDS); - firstSend = false; - } + if (firstSend) { + L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n"); + sleep(FIRST_SEND_DELAY_IN_SECONDS); + firstSend = false; + } } static unsigned int rand_range(unsigned int min, unsigned int max) { - double scaled = ((double) random()) / ((double) RAND_MAX); - return (unsigned int) ((max - min + 1) * scaled + min); + double scaled = ((double) random()) / ((double) RAND_MAX); + return (unsigned int) ((max - min + 1) * scaled + min); } diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c index a077efd..770ad23 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c @@ -111,9 +111,9 @@ pubsub_udpmc_admin_t* pubsub_udpmcAdmin_create(celix_bundle_context_t *ctx, log_ if (found_if_ip != NULL) if_ip = strndup(found_if_ip, 16); else - L_WARN("Could not find interface for requested subnet %s", mcIpProp); + L_WARN("[PSA_UDPMC] Could not find interface for requested subnet %s", mcIpProp); } else { - L_ERROR("Error while searching for available network interface for subnet %s", mcIpProp); + L_ERROR("[PSA_UDPMC] Error while searching for available network interface for subnet %s", mcIpProp); } free(found_if_ip); } else { diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c index bb0890f..8fa14ad 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c @@ -48,8 +48,8 @@ struct pubsub_zmq_admin { log_helper_t *log; const char *fwUUID; - char* ipAddress; - zactor_t* zmq_auth; + char *ipAddress; + zactor_t *zmq_auth; unsigned int basePort; unsigned int maxPort; @@ -111,9 +111,9 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, log_help if (found_if_ip != NULL) ip = strndup(found_if_ip, 16); else - L_WARN("Could not find interface for requested subnet %s", confIp); + L_WARN("[PSA_ZMQ] Could not find interface for requested subnet %s", confIp); } else { - L_ERROR("Error while searching for available network interface for subnet %s", confIp); + L_ERROR("[PSA_ZMQ] Error while searching for available network interface for subnet %s", confIp); } free(found_if_ip); } else {
