This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/pubsub-interceptor-fix in repository https://gitbox.apache.org/repos/asf/celix.git
commit f0cf24484d7fe0902f8bf49081576224048d100c Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Jul 26 20:04:09 2021 +0200 Adds some additional pubsub interceptor integration tests --- bundles/pubsub/integration/CMakeLists.txt | 66 ++++-- .../gtest/PubSubInterceptorTestSuite.cc | 245 +++++++++++++++------ .../v2/src/pubsub_tcp_topic_receiver.c | 12 +- .../v2/src/pubsub_tcp_topic_sender.c | 1 + .../v2/src/pubsub_websocket_topic_receiver.c | 11 +- .../v2/src/pubsub_websocket_topic_sender.c | 2 + .../v2/src/pubsub_zmq_topic_receiver.c | 13 +- .../v2/src/pubsub_zmq_topic_sender.c | 5 +- 8 files changed, 264 insertions(+), 91 deletions(-) diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt index 8f21fda..eaefde9 100644 --- a/bundles/pubsub/integration/CMakeLists.txt +++ b/bundles/pubsub/integration/CMakeLists.txt @@ -760,6 +760,7 @@ if (BUILD_PUBSUB_PSA_ZMQ) gtest/PubSubTopicAndScopeIntegrationTestSuite.cc ) target_link_libraries(test_pubsub_topic_and_scope_integration PRIVATE Celix::framework Celix::pubsub_api GTest::gtest GTest::gtest_main) + add_test(NAME test_pubsub_topic_and_scope_integration COMMAND test_pubsub_topic_and_scope_integration) setup_target_for_coverage(test_pubsub_topic_and_scope_integration SCAN_DIR ..) #configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles @@ -781,29 +782,68 @@ if (BUILD_PUBSUB_PSA_ZMQ) ) endif () -if (BUILD_PUBSUB_PSA_ZMQ) +#[[ +Add a integration test with use interceptors for a configurable PSA and wire protocol + +ARGV0 is test target name +ARGV1 is PSA target name +ARGV2 is wire protocol target name + +]] +function(add_celix_interceptors_test_for_psa_and_wire) + set(TEST_TARGET_NAME ${ARGV0}) + set(PSA ${ARGV1}) + set(WIRE ${ARGV2}) + #Test suite to test if pusbub interceptors - add_executable(test_pubsub_interceptors_integration - gtest/PubSubInterceptorTestSuite.cc - ) - target_link_libraries(test_pubsub_interceptors_integration PRIVATE Celix::framework Celix::pubsub_api GTest::gtest GTest::gtest_main Celix::pubsub_spi) - target_include_directories(test_pubsub_interceptors_integration PRIVATE src) - setup_target_for_coverage(test_pubsub_interceptors_integration SCAN_DIR ..) + add_executable(${TEST_TARGET_NAME} + gtest/PubSubInterceptorTestSuite.cc + ) + target_link_libraries(${TEST_TARGET_NAME} PRIVATE Celix::framework Celix::pubsub_api GTest::gtest GTest::gtest_main Celix::pubsub_spi) + target_include_directories(${TEST_TARGET_NAME} PRIVATE src) + add_test(NAME ${TEST_TARGET_NAME} COMMAND ${TEST_TARGET_NAME}) + setup_target_for_coverage(${TEST_TARGET_NAME} SCAN_DIR ..) - #configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles + #configure topology manager and pubsub admin, json serializer and wire protocol bundles celix_get_bundle_file(Celix::celix_pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE) celix_get_bundle_file(Celix::celix_pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE) - celix_get_bundle_file(Celix::celix_pubsub_admin_zmq_v2 PUBSUB_ZMQ_BUNDLE_FILE) - celix_get_bundle_file(Celix::celix_pubsub_protocol_wire_v1 PUBSUB_WIRE_BUNDLE_FILE) + celix_get_bundle_file(${PSA} PUBSUB_PSA_BUNDLE_FILE) + celix_get_bundle_file(${WIRE} PUBSUB_WIRE_BUNDLE_FILE) + celix_get_bundle_file(pubsub_sut PUBSUB_PUBLISHER_BUNDLE_FILE) celix_get_bundle_file(pubsub_tst PUBSUB_SUBSCRIBER_BUNDLE_FILE) - add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::celix_pubsub_serializer_json Celix::celix_pubsub_topology_manager Celix::celix_pubsub_admin_zmq_v2 Celix::celix_pubsub_protocol_wire_v1 pubsub_sut pubsub_tst) - target_compile_definitions(test_pubsub_interceptors_integration PRIVATE + add_celix_bundle_dependencies(${TEST_TARGET_NAME} Celix::celix_pubsub_serializer_json Celix::celix_pubsub_topology_manager ${PSA} ${WIRE} pubsub_sut pubsub_tst) + target_compile_definitions(${TEST_TARGET_NAME} PRIVATE PUBSUB_JSON_BUNDLE_FILE="${PUBSUB_JSON_BUNDLE_FILE}" PUBSUB_TOPMAN_BUNDLE_FILE="${PUBSUB_TOPMAN_BUNDLE_FILE}" - PUBSUB_ZMQ_BUNDLE_FILE="${PUBSUB_ZMQ_BUNDLE_FILE}" + PUBSUB_PSA_BUNDLE_FILE="${PUBSUB_PSA_BUNDLE_FILE}" PUBSUB_WIRE_BUNDLE_FILE="${PUBSUB_WIRE_BUNDLE_FILE}" PUBSUB_PUBLISHER_BUNDLE_FILE="${PUBSUB_PUBLISHER_BUNDLE_FILE}" PUBSUB_SUBSCRIBER_BUNDLE_FILE="${PUBSUB_SUBSCRIBER_BUNDLE_FILE}" ) + + #if PSA websocket is enabled add http_admin bundle + if (BUILD_PUBSUB_PSA_WS) + target_link_libraries(${TEST_TARGET_NAME} PRIVATE Celix::http_admin_api) + celix_get_bundle_file(Celix::http_admin HTTP_ADMIN_BUNDLE_FILE) + add_celix_bundle_dependencies(${TEST_TARGET_NAME} Celix::http_admin) + target_compile_definitions(${TEST_TARGET_NAME} PRIVATE HTTP_ADMIN_BUNDLE_FILE="${HTTP_ADMIN_BUNDLE_FILE}") + endif () +endfunction() + + +if (BUILD_PUBSUB_PSA_WS) + add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_ws_and_wire_v1_integration Celix::celix_pubsub_admin_websocket_v2 Celix::celix_pubsub_protocol_wire_v1) + add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_ws_and_wire_v2_integration Celix::celix_pubsub_admin_websocket_v2 Celix::celix_pubsub_protocol_wire_v2) +endif () + +if (BUILD_PUBSUB_PSA_TCP) + message(STATUS "TODO enable tcp and interceptors. Currently has a memleak") + #add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v1_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v1) + #add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v2_integration Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v2) +endif () + +if (BUILD_PUBSUB_PSA_ZMQ) + add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_zmq_and_wire_v1_integration Celix::celix_pubsub_admin_zmq_v2 Celix::celix_pubsub_protocol_wire_v1) + add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_zmq_and_wire_v2_integration Celix::celix_pubsub_admin_zmq_v2 Celix::celix_pubsub_protocol_wire_v2) endif () \ No newline at end of file diff --git a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc index 8f500f5..21cbdc3 100644 --- a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc +++ b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc @@ -19,96 +19,217 @@ #include <gtest/gtest.h> +#include <memory> +#include <mutex> +#include <condition_variable> +#include <memory> + #include "pubsub_serializer_handler.h" #include "celix/FrameworkFactory.h" #include "msg.h" #include "pubsub_interceptor.h" +struct TestData { + TestData(const std::shared_ptr<celix::BundleContext>& ctx) { + serHandler = std::shared_ptr<pubsub_serializer_handler>{pubsub_serializerHandler_create(ctx->getCBundleContext(), "json", true), [](pubsub_serializer_handler_t* h) { + pubsub_serializerHandler_destroy(h); + }}; + } + + std::shared_ptr<pubsub_serializer_handler_t> serHandler{}; + + std::mutex mutex{}; //protects below + int preSendCount{0}; + int postSendCount{0}; + int preReceiveCount{0}; + int postReceiveCount{0}; + std::condition_variable cond{}; +}; + +static void serializeAndPrint(TestData* testData, uint32_t msgId, const void *msg) { + struct iovec* vec = nullptr; + size_t vecLen = 0; + pubsub_serializerHandler_serialize(testData->serHandler.get(), msgId, msg, &vec, &vecLen); + if (vecLen > 0) { + for (size_t i = 0; i < vecLen; ++i) { + fwrite(vec[i].iov_base, sizeof(char), vec[i].iov_len, stdout); + } + } + fputc('\n', stdout); + pubsub_serializerHandler_freeSerializedMsg(testData->serHandler.get(), msgId, vec, vecLen); +} + class PubSubInterceptorTestSuite : public ::testing::Test { public: PubSubInterceptorTestSuite() { fw = celix::createFramework({ - {"CELIX_PUBSUB_TEST_ADD_METADATA", "true"} /*TODO memleak in pubsub zmq v2 when metadata is empty*/ + {"CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL", "info"}, + {"CELIX_PUBSUB_TEST_ADD_METADATA", "true"} }); ctx = fw->getFrameworkBundleContext(); + testData = std::make_shared<TestData>(ctx); EXPECT_GE(ctx->installBundle(PUBSUB_JSON_BUNDLE_FILE), 0); EXPECT_GE(ctx->installBundle(PUBSUB_TOPMAN_BUNDLE_FILE), 0); - EXPECT_GE(ctx->installBundle(PUBSUB_ZMQ_BUNDLE_FILE), 0); + EXPECT_GE(ctx->installBundle(PUBSUB_PSA_BUNDLE_FILE), 0); EXPECT_GE(ctx->installBundle(PUBSUB_WIRE_BUNDLE_FILE), 0); +#ifdef HTTP_ADMIN_BUNDLE_FILE + EXPECT_GE(ctx->installBundle(HTTP_ADMIN_BUNDLE_FILE), 0); +#endif } - std::shared_ptr<celix::Framework> fw{}; - std::shared_ptr<celix::BundleContext> ctx{}; -}; + std::shared_ptr<celix::ServiceRegistration> createInterceptor(bool cancelSend, bool cancelReceive) { + auto interceptor = std::make_shared<pubsub_interceptor>(pubsub_interceptor{}); + interceptor->handle = (void*)testData.get(); + interceptor->preSend = [](void* handle, const pubsub_interceptor_properties_t *, const char *, const uint32_t, + const void *, celix_properties_t* metadata) { + auto* td = (TestData*)handle; + celix_properties_set(metadata, "test", "preSend"); -static void serializeAndPrint(pubsub_serializer_handler_t* ser, uint32_t msgId, const void *msg) { - struct iovec* vec = nullptr; - size_t vecLen = 0; - pubsub_serializerHandler_serialize(ser, msgId, msg, &vec, &vecLen); - if (vecLen > 0) { - for (size_t i = 0; i < vecLen; ++i) { - fwrite(vec[i].iov_base, sizeof(char), vec[i].iov_len, stdout); + std::lock_guard<std::mutex> lck{td->mutex}; + td->preSendCount += 1; + td->cond.notify_all(); + return true; + }; + if (cancelSend) { + interceptor->preSend = [](void* handle, const pubsub_interceptor_properties_t *, const char *, const uint32_t, + const void *, celix_properties_t*) { + auto* td = (TestData*)handle; + std::lock_guard<std::mutex> lck{td->mutex}; + td->preSendCount += 1; + td->cond.notify_all(); + return false; + }; + } + interceptor->postSend = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg, + const celix_properties_t* metadata) { + auto* td = (TestData*)handle; + serializeAndPrint(td, msgId, rawMsg); + EXPECT_STREQ(msgType, "msg"); + const auto *msg = static_cast<const msg_t*>(rawMsg); + EXPECT_GE(msg->seqNr, 0); + EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preSend"); + const char *key; + CELIX_PROPERTIES_FOR_EACH(metadata, key) { + printf("got property %s=%s\n", key, celix_properties_get(metadata, key, nullptr)); + } + fprintf(stdout, "Got message in postSend interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps->serializationType, msg->seqNr); + + std::lock_guard<std::mutex> lck{td->mutex}; + td->postSendCount += 1; + td->cond.notify_all(); + }; + interceptor->preReceive = [](void* handle, const pubsub_interceptor_properties_t *, const char *, const uint32_t, + const void *, celix_properties_t* metadata) { + auto* td = (TestData*)handle; + celix_properties_set(metadata, "test", "preReceive"); + + std::lock_guard<std::mutex> lck{td->mutex}; + td->preReceiveCount += 1; + td->cond.notify_all(); + return true; + }; + if (cancelReceive) { + interceptor->preReceive = [](void* handle, const pubsub_interceptor_properties_t *, const char *, const uint32_t, + const void *, celix_properties_t*) { + auto* td = (TestData*)handle; + std::lock_guard<std::mutex> lck{td->mutex}; + td->preReceiveCount += 1; + td->cond.notify_all(); + return false; + }; } + interceptor->postReceive = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg, + const celix_properties_t* metadata) { + auto* td = (TestData*)handle; + serializeAndPrint(td, msgId, rawMsg); + EXPECT_STREQ(msgType, "msg"); + const auto *msg = static_cast<const msg_t*>(rawMsg); + EXPECT_GE(msg->seqNr, 0); + EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preReceive"); + fprintf(stdout, "Got message in postReceive interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps-> serializationType, msg->seqNr); + + std::lock_guard<std::mutex> lck{td->mutex}; + td->postReceiveCount += 1; + td->cond.notify_all(); + }; + //note registering identical services to validate multiple interceptors + return ctx->registerService<pubsub_interceptor>(std::move(interceptor), PUBSUB_INTERCEPTOR_SERVICE_NAME) + .setUnregisterAsync(false) //note to ensure test data is still valid when service is registered + .build(); } - fputc('\n', stdout); - pubsub_serializerHandler_freeSerializedMsg(ser, msgId, vec, vecLen); -} -std::shared_ptr<celix::ServiceRegistration> createInterceptor(std::shared_ptr<celix::BundleContext>& ctx) { - auto interceptor = std::shared_ptr<pubsub_interceptor>{new pubsub_interceptor{}, [](pubsub_interceptor* inter) { - auto* handler = (pubsub_serializer_handler_t*)inter->handle; - pubsub_serializerHandler_destroy(handler); - delete inter; - }}; - interceptor->handle = pubsub_serializerHandler_create(ctx->getCBundleContext(), "json", true); - interceptor->preSend = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t, - const void *, celix_properties_t* metadata) { - celix_properties_set(metadata, "test", "preSend"); - return true; - }; - interceptor->postSend = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg, - const celix_properties_t* metadata) { - auto* ser = (pubsub_serializer_handler_t*)handle; - serializeAndPrint(ser, msgId, rawMsg); - EXPECT_STREQ(msgType, "msg"); - const auto *msg = static_cast<const msg_t*>(rawMsg); - EXPECT_GE(msg->seqNr, 0); - EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preSend"); - fprintf(stdout, "Got message in postSend interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps->serializationType, msg->seqNr); - }; - interceptor->preReceive = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t, - const void *, celix_properties_t* metadata) { - celix_properties_set(metadata, "test", "preReceive"); - return true; - }; - interceptor->postReceive = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg, - const celix_properties_t* metadata) { - auto* ser = (pubsub_serializer_handler_t*)handle; - serializeAndPrint(ser, msgId, rawMsg); - EXPECT_STREQ(msgType, "msg"); - const auto *msg = static_cast<const msg_t*>(rawMsg); - EXPECT_GE(msg->seqNr, 0); - EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preReceive"); - fprintf(stdout, "Got message in postReceive interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps-> serializationType, msg->seqNr); - }; - //note registering identical services to validate multiple interceptors - return ctx->registerService<pubsub_interceptor>(interceptor, PUBSUB_INTERCEPTOR_SERVICE_NAME).build(); -} + std::shared_ptr<celix::Framework> fw{}; + std::shared_ptr<celix::BundleContext> ctx{}; + std::shared_ptr<TestData> testData{}; +}; TEST_F(PubSubInterceptorTestSuite, InterceptorWithSinglePublishersAndMultipleReceivers) { //Given a publisher (PUBSUB_PUBLISHER_BUNDLE_FILE) and 2 receivers (PUBSUB_SUBSCRIBER_BUNDLE_FILE) - //And a registered interceptor + //And several registered interceptors //Then the interceptor receives a correct msg type. + auto reg1 = createInterceptor(false, false); + auto reg2 = createInterceptor(false, false); + auto reg3 = createInterceptor(false, false); + ctx->waitForEvents(); + EXPECT_GE(ctx->installBundle(PUBSUB_PUBLISHER_BUNDLE_FILE), 0); EXPECT_GE(ctx->installBundle(PUBSUB_SUBSCRIBER_BUNDLE_FILE), 0); - auto reg1 = createInterceptor(ctx); - auto reg2 = createInterceptor(ctx); - auto reg3 = createInterceptor(ctx); + std::unique_lock<std::mutex> lck{testData->mutex}; + auto isTestDone = testData->cond.wait_for(lck, std::chrono::seconds{5}, [this]{ + return testData->preSendCount > 10 && + testData->postSendCount > 10 && + testData->preReceiveCount > 10 && + testData->postReceiveCount > 10; + }); - //TODO stop after a certain amount of messages send - //TODO also test with tcp v2. - sleep(5); + EXPECT_TRUE(isTestDone); } + +TEST_F(PubSubInterceptorTestSuite, InterceptorWithPreSendCancelWillPreventSends) { + //Given a publisher (PUBSUB_PUBLISHER_BUNDLE_FILE) and 2 receivers (PUBSUB_SUBSCRIBER_BUNDLE_FILE) + //And a interceptor which cancel a send + //Then only the preSend count will be increased, but the rest of the count will be 0 + + auto reg1 = createInterceptor(true, false); + ctx->waitForEvents(); + + EXPECT_GE(ctx->installBundle(PUBSUB_PUBLISHER_BUNDLE_FILE), 0); + EXPECT_GE(ctx->installBundle(PUBSUB_SUBSCRIBER_BUNDLE_FILE), 0); + + std::unique_lock<std::mutex> lck{testData->mutex}; + auto isTestDone = testData->cond.wait_for(lck, std::chrono::seconds{5}, [this]{ + return testData->preSendCount > 10 ; + }); + + EXPECT_EQ(testData->postSendCount, 0); + EXPECT_EQ(testData->preReceiveCount, 0); + EXPECT_EQ(testData->postReceiveCount, 0); + + EXPECT_TRUE(isTestDone); +} + +TEST_F(PubSubInterceptorTestSuite, InterceptorWithPreRedeiveCancelWillPreventPostReceive) { + //Given a publisher (PUBSUB_PUBLISHER_BUNDLE_FILE) and 2 receivers (PUBSUB_SUBSCRIBER_BUNDLE_FILE) + //And a interceptor which cancel a receive + //Then the preSend, postSend and preReceive count will be increased, but the postReceive count will be 0 + + auto reg1 = createInterceptor(false, true); + ctx->waitForEvents(); + + EXPECT_GE(ctx->installBundle(PUBSUB_PUBLISHER_BUNDLE_FILE), 0); + EXPECT_GE(ctx->installBundle(PUBSUB_SUBSCRIBER_BUNDLE_FILE), 0); + + std::unique_lock<std::mutex> lck{testData->mutex}; + auto isTestDone = testData->cond.wait_for(lck, std::chrono::seconds{5}, [this]{ + return testData->preSendCount > 10 && + testData->postSendCount > 10 && + testData->preReceiveCount > 10; + }); + + EXPECT_EQ(testData->postReceiveCount, 0); + + EXPECT_TRUE(isTestDone); +} \ No newline at end of file diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c index e9ef6b4..fe6fd53 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c @@ -40,6 +40,8 @@ #define UUID_STR_LEN 37 #endif +#define L_TRACE(...) \ + celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_TRACE, __VA_ARGS__) #define L_DEBUG(...) \ celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__) #define L_INFO(...) \ @@ -486,13 +488,15 @@ static inline void processMsg(void* handle, const pubsub_protocol_message_t *mes if (status == CELIX_SUCCESS) { celix_properties_t *metadata = message->metadata.metadata; bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, &metadata); + bool release = true; if (cont) { - bool release; callReceivers(receiver, msgFqn, message, &deSerializedMsg, &release, metadata); pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deSerializedMsg, metadata); - if (release) { - pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg); - } + } else { + L_TRACE("Skipping receive for msg type %s, based on pre receive interceptor result", msgFqn); + } + if (release) { + pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg); } } else { L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c index 3c58e84..a817a3e 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c @@ -374,6 +374,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata); if (!cont) { L_DEBUG("Cancel send based on pubsub interceptor cancel return"); + celix_properties_destroy(metadata); return status; } diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c index 56f4008..7421b09 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c @@ -491,14 +491,17 @@ static void processJsonMsg(pubsub_websocket_topic_receiver_t *receiver, const pu celix_properties_t *metadata = NULL; //NOTE metadata not supported for websocket bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, header->fqn, msgId, deserializedMsg, &metadata); + bool release = true; if (cont) { - bool release; callReceivers(receiver, msgId, header, payload, payloadSize, &deserializedMsg, &release, metadata); pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, header->fqn, msgId, deserializedMsg, metadata); - if (release) { - pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deserializedMsg); - } + } else { + L_TRACE("Skipping receive for msg type %s, based on pre receive interceptor result", header->fqn); + } + if (release) { + pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deserializedMsg); } + celix_properties_destroy(metadata); } else { L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", header->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); } diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c index e435093..7ecf546 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c @@ -270,6 +270,7 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata); if (!cont) { L_DEBUG("Cancel send based on pubsub interceptor cancel return"); + celix_properties_destroy(metadata); return status; } @@ -316,6 +317,7 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType } pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata); + celix_properties_destroy(metadata); return status; } diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c index f5e70b0..90d93cb 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c @@ -51,7 +51,8 @@ #define UUID_STR_LEN 37 #endif - +#define L_TRACE(...) \ + celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_TRACE, __VA_ARGS__) #define L_DEBUG(...) \ celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__) #define L_INFO(...) \ @@ -468,13 +469,15 @@ static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_prot if (status == CELIX_SUCCESS) { celix_properties_t *metadata = message->metadata.metadata; bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deserializedMsg, &metadata); + bool release = true; if (cont) { - bool release; callReceivers(receiver, msgFqn, message, &deserializedMsg, &release, metadata); pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, message->header.msgId, deserializedMsg, metadata); - if (release) { - pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg); - } + } else { + L_TRACE("Skipping receive for msg type %s, based on pre receive interceptor result", msgFqn); + } + if (release) { + pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg); } } else { L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c index 155cb19..64c3b41 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c @@ -410,6 +410,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata); if (!cont) { L_DEBUG("Cancel send based on pubsub interceptor cancel return"); + celix_properties_destroy(metadata); return status; } @@ -541,10 +542,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co } __atomic_store_n(&sender->zmqBuffers.dataLock, false, __ATOMIC_RELEASE); pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata); + celix_properties_destroy(metadata); - if (message.metadata.metadata) { - celix_properties_destroy(message.metadata.metadata); - } if (!bound->parent->zeroCopyEnabled && serializedIoVecOutput) { pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen); }
