This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/pubsub-interceptor-fix in repository https://gitbox.apache.org/repos/asf/celix.git
commit 6bf1ef1e7e71b4fea6589f4639eb3b3b30e09de3 Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Jul 26 15:35:37 2021 +0200 Adds interceptor support to the pubsub websocket --- .../v2/src/pubsub_tcp_topic_receiver.c | 11 +- .../v2/src/pubsub_websocket_common.h | 2 +- .../v2/src/pubsub_websocket_topic_receiver.c | 211 ++++++++++----------- .../v2/src/pubsub_websocket_topic_sender.c | 11 ++ .../v2/src/pubsub_zmq_topic_receiver.c | 11 +- .../v2/src/pubsub_zmq_topic_sender.c | 2 +- 6 files changed, 126 insertions(+), 122 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c index 36816f1..e9ef6b4 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c @@ -446,7 +446,7 @@ static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msg message->header.msgMinorVersion, &deSerializeBuffer, 0, msg); if (status != CELIX_SUCCESS) { - L_WARN("[PSA_TCO_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, + L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); break; } @@ -484,17 +484,14 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes } if (status == CELIX_SUCCESS) { - uint32_t msgId = message->header.msgId; celix_properties_t *metadata = message->metadata.metadata; - bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, - deSerializedMsg, &metadata); + bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, &metadata); if (cont) { bool release; callReceivers(receiver, msgFqn, message, &deSerializedMsg, &release, metadata); - pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata); + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, metadata); if (release) { - pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, - deSerializedMsg); + pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg); } } } else { diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h index 4c22319..8a764d1 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h +++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_common.h @@ -27,7 +27,7 @@ struct pubsub_websocket_msg_header { - const char *id; //FQN + const char *fqn; uint8_t major; uint8_t minor; uint32_t seqNr; diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c index ea997e9..56f4008 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c @@ -37,12 +37,14 @@ #include <jansson.h> #include <pubsub_utils.h> #include <celix_api.h> +#include "pubsub_interceptors_handler.h" #ifndef UUID_STR_LEN #define UUID_STR_LEN 37 #endif - +#define L_TRACE(...) \ + celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__) #define L_DEBUG(...) \ celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__) #define L_INFO(...) \ @@ -72,6 +74,7 @@ struct pubsub_websocket_topic_receiver { char *uri; pubsub_serializer_handler_t* serializerHandler; + pubsub_interceptors_handler_t *interceptorsHandler; celix_websocket_service_t sockSvc; long svcId; @@ -93,7 +96,7 @@ struct pubsub_websocket_topic_receiver { long subscriberTrackerId; struct { celix_thread_mutex_t mutex; - hash_map_t *map; //key = bnd id, value = psa_websocket_subscriber_entry_t + hash_map_t *map; //key = long svc id, value = psa_websocket_subscriber_entry_t bool allInitialized; } subscribers; }; @@ -111,13 +114,13 @@ typedef struct psa_websocket_requested_connection_entry { } psa_websocket_requested_connection_entry_t; typedef struct psa_websocket_subscriber_entry { - hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t* + pubsub_subscriber_t* subscriberSvc; bool initialized; //true if the init function is called through the receive thread } psa_websocket_subscriber_entry_t; -static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner); -static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner); +static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props); +static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props); static void* psa_websocket_recvThread(void * data); static void psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t *receiver); static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t *receiver); @@ -138,6 +141,8 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu receiver->ctx = ctx; receiver->logHelper = logHelper; receiver->serializerHandler = serializerHandler; + receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_WEBSOCKET_ADMIN_TYPE, + pubsub_serializerHandler_getSerializationType(serializerHandler)); receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024); receiver->topic = strndup(topic, 1024 * 1024); receiver->admin = admin; @@ -166,8 +171,8 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; opts.filter.filter = buf; opts.callbackHandle = receiver; - opts.addWithOwner = pubsub_websocketTopicReceiver_addSubscriber; - opts.removeWithOwner = pubsub_websocketTopicReceiver_removeSubscriber; + opts.addWithProperties = pubsub_websocketTopicReceiver_addSubscriber; + opts.removeWithProperties = pubsub_websocketTopicReceiver_removeSubscriber; receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); } @@ -262,22 +267,13 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re celix_bundleContext_unregisterService(receiver->ctx, receiver->svcId); celixThreadMutex_lock(&receiver->subscribers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - hashMap_destroy(entry->subscriberServices, false, false); - free(entry); - } - - } - hashMap_destroy(receiver->subscribers.map, false, false); + hashMap_destroy(receiver->subscribers.map, false, true); celixThreadMutex_unlock(&receiver->subscribers.mutex); celixThreadMutex_lock(&receiver->requestedConnections.mutex); - iter = hashMapIterator_construct(receiver->requestedConnections.map); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); while (hashMapIterator_hasNext(&iter)) { psa_websocket_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); if (entry != NULL) { @@ -307,6 +303,8 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re } celix_arrayList_destroy(receiver->recvBuffer.list); + pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler); + free(receiver->uri); free(receiver->scope); free(receiver->topic); @@ -394,10 +392,9 @@ void pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receive free(key); } -static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { +static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) { pubsub_websocket_topic_receiver_t *receiver = handle; - long bndId = celix_bundle_getId(bnd); long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1); const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL); if (receiver->scope == NULL){ @@ -414,96 +411,113 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, return; } + psa_websocket_subscriber_entry_t* entry = calloc(1, sizeof(*entry)); + entry->subscriberSvc = svc; + entry->initialized = false; + celixThreadMutex_lock(&receiver->subscribers.mutex); - psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId); - if (entry == NULL) { - //new create entry - entry = calloc(1, sizeof(*entry)); - entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL); - entry->initialized = false; - hashMap_put(receiver->subscribers.map, (void*)bndId, entry); - } - hashMap_put(entry->subscriberServices, (void*)svcId, svc); + hashMap_put(receiver->subscribers.map, (void*)svcId, entry); + receiver->subscribers.allInitialized = false; celixThreadMutex_unlock(&receiver->subscribers.mutex); } -static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props __attribute__((unused)), const celix_bundle_t *bnd) { +static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) { pubsub_websocket_topic_receiver_t *receiver = handle; - long bndId = celix_bundle_getId(bnd); long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1); + celixThreadMutex_lock(&receiver->subscribers.mutex); + psa_websocket_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void*)svcId); + free(entry); + celixThreadMutex_unlock(&receiver->subscribers.mutex); +} +static void callReceivers( + pubsub_websocket_topic_receiver_t *receiver, + uint32_t msgId, + const pubsub_websocket_msg_header_t* header, + const char *payload, + size_t payloadSize, + void** msg, + bool* release, + const celix_properties_t* metadata) { + *release = true; celixThreadMutex_lock(&receiver->subscribers.mutex); - psa_websocket_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId); - if (entry != NULL) { - hashMap_remove(entry->subscriberServices, (void*)svcId); - } - if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) { - //remove entry - hashMap_remove(receiver->subscribers.map, (void*)bndId); - hashMap_destroy(entry->subscriberServices, false, false); - free(entry); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_websocket_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter); + if (entry != NULL && entry->subscriberSvc->receive != NULL) { + entry->subscriberSvc->receive(entry->subscriberSvc->handle, header->fqn, msgId, *msg, metadata, release); + if (!(*release)) { + //receive function has taken ownership, deserialize again for new message + struct iovec deSerializeBuffer; + deSerializeBuffer.iov_base = (void*) payload; + deSerializeBuffer.iov_len = payloadSize; + celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, + msgId, + header->major, + header->minor, + &deSerializeBuffer, 0, msg); + if (status != CELIX_SUCCESS) { + L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", header->fqn, + receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); + break; + } + } + *release = true; + } } celixThreadMutex_unlock(&receiver->subscribers.mutex); } -static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr, const char* payload, size_t payloadSize) { - //NOTE receiver->subscribers.mutex locked - - uint32_t msgId = pubsub_serializerHandler_getMsgId(receiver->serializerHandler, hdr->id); - +static void processJsonMsg(pubsub_websocket_topic_receiver_t *receiver, const pubsub_websocket_msg_header_t* header, const char *payload, size_t payloadSize) { + uint32_t msgId = pubsub_serializerHandler_getMsgId(receiver->serializerHandler, header->fqn); if (msgId == 0) { - L_WARN("Cannot find msg id for msg fqn %s", hdr->id); + L_WARN("Cannot find msg id for msg fqn %s", header->fqn); return; } - void *deSerializedMsg = NULL; - bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, msgId, hdr->major, hdr->minor); + void *deserializedMsg = NULL; + bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, msgId, header->major, header->minor); if (validVersion) { struct iovec deSerializeBuffer; - deSerializeBuffer.iov_base = (void *)payload; - deSerializeBuffer.iov_len = payloadSize; - celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg); + deSerializeBuffer.iov_base = (void*)payload; + deSerializeBuffer.iov_len = payloadSize; + celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, + header->major, + header->minor, + &deSerializeBuffer, 0, &deserializedMsg); if (status == CELIX_SUCCESS) { - hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices); - bool release = true; - while (hashMapIterator_hasNext(&iter)) { - pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); - svc->receive(svc->handle, hdr->id, msgId, deSerializedMsg, NULL, &release); - if (!release && hashMapIterator_hasNext(&iter)) { - //receive function has taken ownership and still more receive function to come .. - //deserialize again for new message - status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg); - if (status != CELIX_SUCCESS) { - L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); - break; - } - release = true; + celix_properties_t *metadata = NULL; //NOTE metadata not supported for websocket + bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, header->fqn, msgId, + deserializedMsg, &metadata); + if (cont) { + bool release; + callReceivers(receiver, msgId, header, payload, payloadSize, &deserializedMsg, &release, metadata); + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, header->fqn, msgId, deserializedMsg, metadata); + if (release) { + pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deserializedMsg); } } - if (release) { - pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deSerializedMsg); - } } else { - L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); + L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", header->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); } } else { - L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x", - hdr->id, + L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x", + header->fqn, pubsub_serializerHandler_getSerializationType(receiver->serializerHandler), - (int)hdr->major, - (int)hdr->minor, + (int) header->major, + (int) header->minor, pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, msgId), pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, msgId)); } } -static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const char *msg, size_t msgSize) { +static void processMsg(pubsub_websocket_topic_receiver_t *receiver, const char *msg, size_t msgSize) { json_error_t error; json_t *jsMsg = json_loadb(msg, msgSize, 0, &error); - if(jsMsg != NULL) { - json_t *jsId = json_object_get(jsMsg, "id"); + if (jsMsg != NULL) { + json_t *jsId = json_object_get(jsMsg, "id"); //NOTE called id, but is the msgFqn json_t *jsMajor = json_object_get(jsMsg, "major"); json_t *jsMinor = json_object_get(jsMsg, "minor"); json_t *jsSeqNr = json_object_get(jsMsg, "seqNr"); @@ -511,24 +525,15 @@ static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const if (jsId && jsMajor && jsMinor && jsSeqNr && jsData) { pubsub_websocket_msg_header_t hdr; - hdr.id = json_string_value(jsId); + hdr.fqn = json_string_value(jsId); hdr.major = (uint8_t) json_integer_value(jsMajor); hdr.minor = (uint8_t) json_integer_value(jsMinor); hdr.seqNr = (uint32_t) json_integer_value(jsSeqNr); - const char *payload = json_dumps(jsData, 0); + char *payload = json_dumps(jsData, 0); size_t payloadSize = strlen(payload); - printf("Received msg: id %s\tmajor %u\tminor %u\tseqNr %u\tdata %s\n", hdr.id, hdr.major, hdr.minor, hdr.seqNr, payload); - - celixThreadMutex_lock(&receiver->subscribers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - processMsgForSubscriberEntry(receiver, entry, &hdr, payload, payloadSize); - } - } - celixThreadMutex_unlock(&receiver->subscribers.mutex); - free((void *) payload); + L_TRACE("Received msg: fqn %s\tmajor %u\tminor %u\tseqNr %u\tdata %s\n", hdr.fqn, hdr.major, hdr.minor, hdr.seqNr, payload); + processJsonMsg(receiver, &hdr, payload, payloadSize); + free(payload); } else { L_WARN("[PSA_WEBSOCKET_TR] Received unsupported message: " "ID = %s, major = %d, minor = %d, seqNr = %d, data valid? %s", @@ -539,9 +544,7 @@ static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const json_decref(jsMsg); } else { L_WARN("[PSA_WEBSOCKET_TR] Failed to load websocket JSON message, error line: %d, error message: %s", error.line, error.text); - return; } - } static void* psa_websocket_recvThread(void * data) { @@ -728,20 +731,16 @@ static void psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiv while (hashMapIterator_hasNext(&iter)) { psa_websocket_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); if (!entry->initialized) { - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices); - while (hashMapIterator_hasNext(&iter2)) { - pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2); - int rc = 0; - if (svc != NULL && svc->init != NULL) { - rc = svc->init(svc->handle); - } - if (rc == 0) { - //note now only initialized on first subscriber entries added. - entry->initialized = true; - } else { - L_WARN("Cannot initialize subscriber svc. Got rc %i", rc); - allInitialized = false; - } + int rc = 0; + if (entry->subscriberSvc != NULL && entry->subscriberSvc->init != NULL) { + rc = entry->subscriberSvc->init(entry->subscriberSvc->handle); + } + if (rc == 0) { + //note now only initialized on first subscriber entries added. + entry->initialized = true; + } else { + L_WARN("Cannot initialize subscriber svc. Got rc %i", rc); + allInitialized = false; } } } diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c index adc5ffe..e435093 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c @@ -33,6 +33,7 @@ #include "http_admin/api.h" #include "civetweb.h" #include "pubsub_websocket_admin.h" +#include "pubsub_interceptors_handler.h" #define FIRST_SEND_DELAY_IN_SECONDS 2 @@ -56,6 +57,7 @@ struct pubsub_websocket_topic_sender { char *uri; pubsub_serializer_handler_t* serializerHandler; + pubsub_interceptors_handler_t *interceptorsHandler; int seqNr; //atomic @@ -102,6 +104,7 @@ pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create( sender->ctx = ctx; sender->logHelper = logHelper; sender->serializerHandler = serializerHandler; + sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_WEBSOCKET_ADMIN_TYPE, pubsub_serializerHandler_getSerializationType(serializerHandler)); psa_websocket_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter); sender->uri = psa_websocket_createURI(scope, topic); @@ -177,6 +180,7 @@ void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender if (sender->scope != NULL) { free(sender->scope); } + pubsubInterceptorsHandler_destroy(sender->interceptorsHandler); free(sender->topic); free(sender->uri); free(sender); @@ -263,6 +267,11 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType return status; } + bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata); + if (!cont) { + L_DEBUG("Cancel send based on pubsub interceptor cancel return"); + return status; + } if (sender->sockConnection != NULL) { delay_first_send_for_late_joiners(sender); @@ -306,6 +315,8 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType status = CELIX_SUCCESS; // Not an error, just nothing to do } + pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata); + return status; } diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c index d6c5805..f5e70b0 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c @@ -64,13 +64,13 @@ struct pubsub_zmq_topic_receiver { celix_bundle_context_t *ctx; celix_log_helper_t *logHelper; - pubsub_serializer_handler_t* serializerHandler; void *admin; long protocolSvcId; pubsub_protocol_service_t *protocol; char *scope; char *topic; + pubsub_serializer_handler_t* serializerHandler; pubsub_interceptors_handler_t *interceptorsHandler; void *zmqCtx; @@ -466,17 +466,14 @@ static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_prot message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg); if (status == CELIX_SUCCESS) { - uint32_t msgId = message->header.msgId; celix_properties_t *metadata = message->metadata.metadata; - bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, - deserializedMsg, &metadata); + bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deserializedMsg, &metadata); if (cont) { bool release; callReceivers(receiver, msgFqn, message, &deserializedMsg, &release, metadata); - pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata); + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deserializedMsg, metadata); if (release) { - pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, - deserializedMsg); + pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg); } } } else { diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c index d1f36a5..155cb19 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c @@ -52,13 +52,13 @@ struct pubsub_zmq_topic_sender { celix_bundle_context_t *ctx; celix_log_helper_t *logHelper; - pubsub_serializer_handler_t* serializerHandler; void *admin; long protocolSvcId; pubsub_protocol_service_t *protocol; uuid_t fwUUID; bool zeroCopyEnabled; + pubsub_serializer_handler_t* serializerHandler; pubsub_interceptors_handler_t *interceptorsHandler; char *scope;
