This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v3 in repository https://gitbox.apache.org/repos/asf/celix.git
commit 050c3962ec60fbb797bc2265656fb7bab8c85537 Author: Roy Bulter <[email protected]> AuthorDate: Mon Jun 29 22:15:45 2020 +0200 Add interceptor --- .../src/pubsub_tcp_topic_receiver.c | 71 ++++++++++++++-------- .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 8 ++- 2 files changed, 51 insertions(+), 28 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c index 71890d9..a795f92 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c @@ -38,6 +38,7 @@ #include <uuid/uuid.h> #include <pubsub_admin_metrics.h> #include <pubsub_utils.h> +#include "pubsub_interceptors_handler.h" #include <celix_api.h> #define MAX_EPOLL_EVENTS 16 @@ -67,6 +68,7 @@ struct pubsub_tcp_topic_receiver { bool metricsEnabled; pubsub_tcpHandler_t *socketHandler; pubsub_tcpHandler_t *sharedSocketHandler; + pubsub_interceptors_handler_t *interceptorsHandler; struct { celix_thread_t thread; @@ -164,6 +166,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context const char *staticServerEndPointUrls = NULL; const char *staticConnectUrls = NULL; + pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler); staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope); if (topicProperties != NULL) { @@ -344,6 +347,7 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) { receiver->socketHandler = NULL; } + pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler); if (receiver->scope != NULL) { free(receiver->scope); } @@ -553,31 +557,48 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs } if (status == CELIX_SUCCESS) { - hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices); + const char *msgType = msgSer->msgName; + uint32_t msgId = message->header.msgId; + celix_properties_t *metadata = message->metadata.metadata; + bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata); bool release = true; - while (hashMapIterator_hasNext(&iter)) { - pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); - svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, - &release); - if (!release && hashMapIterator_hasNext(&iter)) { - //receive function has taken ownership and still more receive function to come .. - //deserialize again for new message - status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg); - if (status != CELIX_SUCCESS) { - L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, - receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); - break; + if (cont) { + hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices); + while (hashMapIterator_hasNext(&iter)) { + pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); + svc->receive(svc->handle, + msgSer->msgName, + msgSer->msgId, + deSerializedMsg, + message->metadata.metadata, + &release); + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, + msgType, + msgId, + deSerializedMsg, + metadata); + if (!release && hashMapIterator_hasNext(&iter)) { + //receive function has taken ownership and still more receive function to come .. + //deserialize again for new message + status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg); + if (status != CELIX_SUCCESS) { + L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", + msgSer->msgName, + receiver->scope == NULL ? "(null)" : receiver->scope, + receiver->topic); + break; + } + release = true; } - release = true; } + if (release) { + msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg); + } + if (message->metadata.metadata) { + celix_properties_destroy(message->metadata.metadata); + } + updateReceiveCount += 1; } - if (release) { - msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg); - } - if (message->metadata.metadata) { - celix_properties_destroy(message->metadata.metadata); - } - updateReceiveCount += 1; } else { updateSerError += 1; L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, @@ -644,10 +665,7 @@ static void *psa_tcp_recvThread(void *data) { pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) { pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result)); - snprintf(result->scope, - PUBSUB_AMDIN_METRICS_NAME_MAX, - "%s", - receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope); + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope); snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic); int msgTypesCount = 0; @@ -671,8 +689,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics); while (hashMapIterator_hasNext(&iter2)) { hash_map_t *origins = hashMapIterator_nextValue(&iter2); - result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), - sizeof(*(result->msgTypes[i].origins))); + result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), sizeof(*(result->msgTypes[i].origins))); result->msgTypes[i].nrOfOrigins = hashMap_size(origins); int k = 0; hash_map_iterator_t iter3 = hashMapIterator_construct(origins); diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c index 29dc50e..754e769 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c @@ -35,6 +35,7 @@ #include "celix_constants.h" #include <signal.h> #include <pubsub_utils.h> +#include "pubsub_interceptors_handler.h" #define FIRST_SEND_DELAY_IN_SECONDS 2 #define TCP_BIND_MAX_RETRY 10 @@ -59,6 +60,7 @@ struct pubsub_tcp_topic_sender { bool metricsEnabled; pubsub_tcpHandler_t *socketHandler; pubsub_tcpHandler_t *sharedSocketHandler; + pubsub_interceptors_handler_t *interceptorsHandler; char *scope; char *topic; @@ -144,6 +146,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( } sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED); + pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler); bool isEndpoint = false; char *urls = NULL; const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL); @@ -312,6 +315,7 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) { celixThreadMutex_unlock(&sender->boundedServices.mutex); celixThreadMutex_destroy(&sender->boundedServices.mutex); + pubsubInterceptorsHandler_destroy(sender->interceptorsHandler); if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) { pubsub_tcpHandler_destroy(sender->socketHandler); sender->socketHandler = NULL; @@ -533,7 +537,8 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i clock_gettime(CLOCK_REALTIME, &serializationEnd); } - if (status == CELIX_SUCCESS /*ser ok*/) { + bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata); + if (status == CELIX_SUCCESS /*ser ok*/ && cont) { pubsub_protocol_message_t message; message.metadata.metadata = NULL; message.payload.payload = NULL; @@ -561,6 +566,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i status = -1; sendOk = false; } + pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata); if (message.metadata.metadata) celix_properties_destroy(message.metadata.metadata); if (serializedIoVecOutput) {
