This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/tcp_admin_msg_segmentation in repository https://gitbox.apache.org/repos/asf/celix.git
commit 954d64d02f167413ad8e68d1b9e2874afd084f53 Author: Roy Bulter <[email protected]> AuthorDate: Sat Aug 29 09:06:33 2020 +0200 Fix tests --- CMakeLists.txt | 20 +++---- .../pubsub/pubsub_admin_tcp/src/psa_activator.c | 4 +- .../src/pubsub_psa_tcp_constants.h | 3 ++ .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 30 ++++++----- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 29 ++++++++-- .../pubsub_admin_tcp/src/pubsub_tcp_handler.h | 1 + .../src/pubsub_tcp_topic_receiver.c | 61 ++++++---------------- .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 6 +-- bundles/pubsub/test/CMakeLists.txt | 4 +- bundles/pubsub/test/test/sut_endpoint_activator.c | 2 - 10 files changed, 79 insertions(+), 81 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a298fdd..8b7a406 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,6 +32,16 @@ IF (${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION} EQUAL 3.3 AND ${CMAKE_GENERATO message( FATAL_ERROR "Building Celix using CMake 3.3 and makefiles is not supported due to a bug in the Makefile Generator (see Bug 15696). Please change the used CMake version - both, CMake 3.2 and CMake 3.4 are working fine. Or use a different generator (e.g. Ninja)." ) ENDIF() +# Options +option(ENABLE_TESTING "Enables unit/bundle testing" TRUE) +if (ENABLE_TESTING) + find_package(GTest CONFIG QUIET) + if (NOT GTest_FOUND) + include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/AddGTest.cmake) + endif() + enable_testing() +endif () + set(ENABLE_MORE_WARNINGS OFF) # Set C specific flags @@ -113,16 +123,6 @@ set(CELIX_MICRO "1") # Default bundle version set(DEFAULT_VERSION 1.0.0) -# Options -option(ENABLE_TESTING "Enables unit/bundle testing" FALSE) -if (ENABLE_TESTING) - find_package(GTest CONFIG QUIET) - if (NOT GTest_FOUND) - include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/AddGTest.cmake) - endif() - enable_testing() -endif() - option(CELIX_INSTALL_DEPRECATED_API "whether to install (and use) deprecated apis (i.e. header without a celix_ prefix." ON) option(CELIX_ADD_DEPRECATED_ATTRIBUTES "If enabled add deprecated attributes to deprecated services/functions." ON) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c index a5ae576..d93c478 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c @@ -121,9 +121,7 @@ int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) { celix_properties_t *props = celix_properties_create(); celix_properties_set(props, CELIX_SHELL_COMMAND_NAME, "celix::psa_tcp"); celix_properties_set(props, CELIX_SHELL_COMMAND_USAGE, "psa_tcp"); - celix_properties_set(props, - CELIX_SHELL_COMMAND_DESCRIPTION, - "Print the information about the TopicSender and TopicReceivers for the TCP PSA"); + celix_properties_set(props, CELIX_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the TCP PSA"); act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, CELIX_SHELL_COMMAND_SERVICE_NAME, props); } diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h index 9f03d13..dee36e6 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h @@ -65,6 +65,9 @@ #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT" #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT 5.0 +#define PUBSUB_TCP_SUBSCRIBER_BLOCKING_KEY "PUBSUB_TCP_SUBSCRIBER_BLOCKING" +#define PUBSUB_TCP_SUBSCRIBER_BLOCKING_DEFAULT false + #define PUBSUB_TCP_PSA_IP_KEY "PSA_IP" #define PUBSUB_TCP_ADMIN_TYPE "tcp" diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c index e7ad284..b1c1ea9 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c @@ -636,24 +636,13 @@ pubsub_tcpAdmin_disconnectEndpointFromReceiver(pubsub_tcp_admin_t *psa, pubsub_t //note can be called with discoveredEndpoint.mutex lock celix_status_t status = CELIX_SUCCESS; - const char *scope = pubsub_tcpTopicReceiver_scope(receiver); - const char *topic = pubsub_tcpTopicReceiver_topic(receiver); - - const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); - const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL); if (url == NULL) { L_WARN("[PSA TCP] Error got endpoint without tcp url"); status = CELIX_BUNDLE_EXCEPTION; } else { - if (eTopic != NULL && topic != NULL && strncmp(eTopic, topic, 1024 * 1024) == 0) { - if (scope == NULL && eScope == NULL) { - pubsub_tcpTopicReceiver_disconnectFrom(receiver, url); - } else if (scope != NULL && eScope != NULL && strncmp(eScope, scope, 1024 * 1024) == 0) { - pubsub_tcpTopicReceiver_disconnectFrom(receiver, url); - } - } + pubsub_tcpTopicReceiver_disconnectFrom(receiver, url); } return status; @@ -690,6 +679,23 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr pubsub_tcp_admin_t *psa = handle; celix_status_t status = CELIX_SUCCESS; + + char *line = celix_utils_strdup(commandLine); + char *token = line; + strtok_r(line, " ", &token); //first token is command name + strtok_r(NULL, " ", &token); //second token is sub command + + if (celix_utils_stringEquals(token, "nr_of_receivers")) { + celixThreadMutex_lock(&psa->topicReceivers.mutex); + fprintf(out,"%i\n", hashMap_size(psa->topicReceivers.map)); + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + } + if (celix_utils_stringEquals(token, "nr_of_senders")) { + celixThreadMutex_lock(&psa->topicSenders.mutex); + fprintf(out, "%i\n", hashMap_size(psa->topicSenders.map)); + celixThreadMutex_unlock(&psa->topicSenders.mutex); + } + fprintf(out, "\n"); fprintf(out, "Topic Senders:\n"); celixThreadMutex_lock(&psa->serializers.mutex); diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index be5a694..daf36dc 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -97,6 +97,8 @@ typedef struct psa_tcp_connection_entry { size_t writeMetaBufferSize; void *writeMetaBuffer; unsigned int retryCount; + celix_thread_mutex_t writeMutex; + celix_thread_mutex_t readMutex; } psa_tcp_connection_entry_t; // @@ -129,6 +131,7 @@ struct pubsub_tcpHandler { celix_thread_t thread; bool running; bool isEndPoint; + bool isBlocking; }; static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock); @@ -304,6 +307,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch if (fd >= 0) { entry = calloc(sizeof(psa_tcp_connection_entry_t), 1); entry->fd = fd; + celixThreadMutex_create(&entry->writeMutex, NULL); + celixThreadMutex_create(&entry->readMutex, NULL); if (url) entry->url = strndup(url, 1024 * 1024); if (interface_url) { @@ -359,6 +364,8 @@ pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) { if (entry->writeFooterBuffer) free(entry->writeFooterBuffer); if (entry->readMetaBuffer) free(entry->readMetaBuffer); if (entry->writeMetaBuffer) free(entry->writeMetaBuffer); + celixThreadMutex_destroy(&entry->writeMutex); + celixThreadMutex_destroy(&entry->readMutex); free(entry); } } @@ -421,7 +428,9 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno)); entry = NULL; } - rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd); + if (!handle->isBlocking) { + rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd); + } if (rc < 0) { pubsub_tcpHandler_freeEntry(entry); L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno)); @@ -750,6 +759,14 @@ void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle,bool isEndPoint) } } +void pubsub_tcpHandler_setBlocking(pubsub_tcpHandler_t *handle,bool isBlocking) { + if (handle != NULL) { + celixThreadRwlock_writeLock(&handle->dbLock); + handle->isBlocking = isBlocking; + celixThreadRwlock_unlock(&handle->dbLock); + } +} + static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) { @@ -774,7 +791,7 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec // If the message is completely reassembled true is returned and the index and size have valid values // int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { - celixThreadRwlock_writeLock(&handle->dbLock); + celixThreadRwlock_readLock(&handle->dbLock); psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd); if (entry == NULL) entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); @@ -788,7 +805,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { celixThreadRwlock_unlock(&handle->dbLock); return -1; } - + celixThreadMutex_lock(&entry->readMutex); if (entry->readHeaderBufferSize && entry->readHeaderBuffer) { entry->readHeaderBuffer = malloc(entry->readHeaderBufferSize); } @@ -936,6 +953,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { nbytes = 0; //Return 0 as indicator to close the connection } } + celixThreadMutex_unlock(&entry->readMutex); celixThreadRwlock_unlock(&handle->dbLock); return (int)nbytes; } @@ -984,12 +1002,13 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message int connFdCloseQueue[hashMap_size(handle->connection_fd_map)]; int nofConnToClose = 0; if (handle) { - celixThreadRwlock_writeLock(&handle->dbLock); + celixThreadRwlock_readLock(&handle->dbLock); hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map); size_t max_msg_iov_len = IOV_MAX - 2; // header , footer, padding while (hashMapIterator_hasNext(&iter)) { psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter); if (!entry->connected) continue; + celixThreadMutex_lock(&entry->writeMutex); void *payloadData = NULL; size_t payloadSize = 0; if (msg_iov_len == 1) { @@ -1029,6 +1048,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message ((!isMessageSegmentationSupported) && (totalMessageSize > entry->maxMsgSize))) { L_WARN("[TCP Socket] Failed to send message (fd: %d), Message segmentation is not supported\n", entry->fd); + celixThreadMutex_unlock(&entry->writeMutex); continue; } @@ -1174,6 +1194,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message free(payloadData); } } + celixThreadMutex_unlock(&entry->writeMutex); } celixThreadRwlock_unlock(&handle->dbLock); } diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h index a08911c..70eb29a 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h @@ -66,6 +66,7 @@ void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout); void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout); void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle, bool isEndPoint); +void pubsub_tcpHandler_setBlocking(pubsub_tcpHandler_t *handle, bool isBlocking); int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd); int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c index 533e773..ccfbb57 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c @@ -24,17 +24,13 @@ #include <pubsub/subscriber.h> #include <memory.h> #include <pubsub_constants.h> -#include <assert.h> -#include <pubsub_endpoint.h> #include <arpa/inet.h> #include <celix_log_helper.h> -#include <math.h> #include "pubsub_tcp_handler.h" #include "pubsub_tcp_topic_receiver.h" #include "pubsub_psa_tcp_constants.h" #include "pubsub_tcp_common.h" -#include "celix_utils_api.h" #include <uuid/uuid.h> #include <pubsub_admin_metrics.h> #include <pubsub_utils.h> @@ -120,24 +116,14 @@ typedef struct psa_tcp_subscriber_entry { bool initialized; //true if the init function is called through the receive thread } psa_tcp_subscriber_entry_t; -static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, - const celix_bundle_t *owner); - -static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, - const celix_bundle_t *owner); - +static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner); +static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner); static void *psa_tcp_recvThread(void *data); - static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver); - static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver); - static void processMsg(void *handle, const pubsub_protocol_message_t *hdr, bool *release, struct timespec *receiveTime); - static void psa_tcp_connectHandler(void *handle, const char *url, bool lock); - static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock); - static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor); pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx, @@ -219,6 +205,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE, PSA_TCP_DEFAULT_RECV_BUFFER_SIZE); long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT); + bool blocking = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_TCP_SUBSCRIBER_BLOCKING_KEY, PUBSUB_TCP_SUBSCRIBER_BLOCKING_DEFAULT); + pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope); pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize); pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout); @@ -228,10 +216,10 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context pubsub_tcpHandler_setThreadPriority(receiver->socketHandler, prio, sched); pubsub_tcpHandler_setReceiveRetryCnt(receiver->socketHandler, (unsigned int) retryCnt); pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout); + pubsub_tcpHandler_setBlocking(receiver->socketHandler, blocking); } receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, - PSA_TCP_DEFAULT_METRICS_ENABLED); - + PSA_TCP_DEFAULT_METRICS_ENABLED); celixThreadMutex_create(&receiver->subscribers.mutex, NULL); celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL); celixThreadMutex_create(&receiver->thread.mutex, NULL); @@ -346,7 +334,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) { pubsub_tcpHandler_destroy(receiver->socketHandler); receiver->socketHandler = NULL; } - + pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler); if (receiver->scope != NULL) { free(receiver->scope); } @@ -371,8 +359,7 @@ long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver return receiver->protocolSvcId; } -void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, - celix_array_list_t *unconnectedUrls) { +void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) { celixThreadMutex_lock(&receiver->requestedConnections.mutex); hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); while (hashMapIterator_hasNext(&iter)) { @@ -466,8 +453,7 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const hashMap_put(entry->subscriberServices, (void*)svcId, svc); - int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd, - &entry->msgTypes); + int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd, &entry->msgTypes); if (rc == 0) { entry->metrics = hashMap_create(NULL, NULL, NULL, NULL); @@ -498,7 +484,6 @@ static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, co long bndId = celix_bundle_getId(bnd); long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1); - celixThreadMutex_lock(&receiver->subscribers.mutex); psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId); if (entry != NULL) { @@ -568,17 +553,8 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices); while (hashMapIterator_hasNext(&iter)) { pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); - svc->receive(svc->handle, - msgSer->msgName, - msgSer->msgId, - deSerializedMsg, - message->metadata.metadata, - &release); - pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, - msgType, - msgId, - deSerializedMsg, - metadata); + svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release); + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata); if (!release && hashMapIterator_hasNext(&iter)) { //receive function has taken ownership and still more receive function to come .. //deserialize again for new message @@ -707,10 +683,8 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds; result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds; result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds; - result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = - metrics->averageTimeBetweenMessagesInSeconds; - result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = - metrics->averageSerializationTimeInSeconds; + result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds; + result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds; result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived; result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers; @@ -720,7 +694,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi metrics->msgTypeId); } } - i += 1; + i +=1 ; } } celixThreadMutex_unlock(&receiver->subscribers.mutex); @@ -825,12 +799,11 @@ static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t int versionMajor; int versionMinor; - if (msgVersion != NULL) { + if (msgVersion!=NULL) { version_getMajor(msgVersion, &versionMajor); version_getMinor(msgVersion, &versionMinor); - if (major == ((unsigned char) versionMajor)) { /* Different major means incompatible */ - check = (minor >= - ((unsigned char) versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ + if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */ + check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ } } diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c index f7598f9..c49f642 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c @@ -490,8 +490,7 @@ pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_se result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed; result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors; result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds; - result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = - mEntry->metrics.averageTimeBetweenMessagesInSeconds; + result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds; result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend; result->msgMetrics[i].bndId = entry->bndId; result->msgMetrics[i].typeId = mEntry->type; @@ -563,8 +562,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i entry->seqNr++; bool sendOk = true; { - int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, - serializedIoVecOutputLen, 0); + int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0); if (rc < 0) { status = -1; sendOk = false; diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt index a78958a..b18f252 100644 --- a/bundles/pubsub/test/CMakeLists.txt +++ b/bundles/pubsub/test/CMakeLists.txt @@ -38,7 +38,7 @@ celix_bundle_files(pubsub_endpoint_sut add_celix_bundle(pubsub_endpoint_tst #Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher SOURCES - test/tst_activator.c + test/tst_endpoint_activator.c VERSION 1.0.0 ) target_link_libraries(pubsub_endpoint_tst PRIVATE Celix::framework Celix::pubsub_api) @@ -213,7 +213,7 @@ if (BUILD_PUBSUB_PSA_TCP) Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_tcp - Celix::pubsub_protocol_wire_v1 + Celix::pubsub_protocol_wire_v2 pubsub_loopback pubsub_endpoint_sut pubsub_endpoint_tst diff --git a/bundles/pubsub/test/test/sut_endpoint_activator.c b/bundles/pubsub/test/test/sut_endpoint_activator.c index c52ebf7..f3d8fa8 100644 --- a/bundles/pubsub/test/test/sut_endpoint_activator.c +++ b/bundles/pubsub/test/test/sut_endpoint_activator.c @@ -97,9 +97,7 @@ static void* sut_sendThread(void *data) { if (msg.seqNr % 1000 == 0) { printf("Send %i messages\n", msg.seqNr); } - msg.seqNr += 1; - } pthread_mutex_unlock(&act->mutex);
