This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/pubsub-interceptor-fix
in repository https://gitbox.apache.org/repos/asf/celix.git

commit f0cf24484d7fe0902f8bf49081576224048d100c
Author: Pepijn Noltes <[email protected]>
AuthorDate: Mon Jul 26 20:04:09 2021 +0200

    Adds some additional pubsub interceptor integration tests
---
 bundles/pubsub/integration/CMakeLists.txt          |  66 ++++--
 .../gtest/PubSubInterceptorTestSuite.cc            | 245 +++++++++++++++------
 .../v2/src/pubsub_tcp_topic_receiver.c             |  12 +-
 .../v2/src/pubsub_tcp_topic_sender.c               |   1 +
 .../v2/src/pubsub_websocket_topic_receiver.c       |  11 +-
 .../v2/src/pubsub_websocket_topic_sender.c         |   2 +
 .../v2/src/pubsub_zmq_topic_receiver.c             |  13 +-
 .../v2/src/pubsub_zmq_topic_sender.c               |   5 +-
 8 files changed, 264 insertions(+), 91 deletions(-)

diff --git a/bundles/pubsub/integration/CMakeLists.txt 
b/bundles/pubsub/integration/CMakeLists.txt
index 8f21fda..eaefde9 100644
--- a/bundles/pubsub/integration/CMakeLists.txt
+++ b/bundles/pubsub/integration/CMakeLists.txt
@@ -760,6 +760,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             gtest/PubSubTopicAndScopeIntegrationTestSuite.cc
     )
     target_link_libraries(test_pubsub_topic_and_scope_integration PRIVATE 
Celix::framework Celix::pubsub_api GTest::gtest GTest::gtest_main)
+    add_test(NAME test_pubsub_topic_and_scope_integration COMMAND 
test_pubsub_topic_and_scope_integration)
     setup_target_for_coverage(test_pubsub_topic_and_scope_integration SCAN_DIR 
..)
 
     #configure topology manager and pubsub zmq, json serializer and wire 
protocol v2 bundles
@@ -781,29 +782,68 @@ if (BUILD_PUBSUB_PSA_ZMQ)
     )
 endif ()
 
-if (BUILD_PUBSUB_PSA_ZMQ)
+#[[
+Add a integration test with use interceptors for a configurable PSA and wire 
protocol
+
+ARGV0 is test target name
+ARGV1 is PSA target name
+ARGV2 is wire protocol target name
+
+]]
+function(add_celix_interceptors_test_for_psa_and_wire)
+    set(TEST_TARGET_NAME ${ARGV0})
+    set(PSA ${ARGV1})
+    set(WIRE ${ARGV2})
+
     #Test suite to test if pusbub interceptors
-    add_executable(test_pubsub_interceptors_integration
-            gtest/PubSubInterceptorTestSuite.cc
-        )
-    target_link_libraries(test_pubsub_interceptors_integration PRIVATE 
Celix::framework Celix::pubsub_api GTest::gtest GTest::gtest_main 
Celix::pubsub_spi)
-    target_include_directories(test_pubsub_interceptors_integration PRIVATE 
src)
-    setup_target_for_coverage(test_pubsub_interceptors_integration SCAN_DIR ..)
+    add_executable(${TEST_TARGET_NAME}
+        gtest/PubSubInterceptorTestSuite.cc
+    )
+    target_link_libraries(${TEST_TARGET_NAME} PRIVATE Celix::framework 
Celix::pubsub_api GTest::gtest GTest::gtest_main Celix::pubsub_spi)
+    target_include_directories(${TEST_TARGET_NAME} PRIVATE src)
+    add_test(NAME ${TEST_TARGET_NAME} COMMAND ${TEST_TARGET_NAME})
+    setup_target_for_coverage(${TEST_TARGET_NAME} SCAN_DIR ..)
 
-    #configure topology manager and pubsub zmq, json serializer and wire 
protocol v2 bundles
+    #configure topology manager and pubsub admin, json serializer and wire 
protocol bundles
     celix_get_bundle_file(Celix::celix_pubsub_serializer_json 
PUBSUB_JSON_BUNDLE_FILE)
     celix_get_bundle_file(Celix::celix_pubsub_topology_manager 
PUBSUB_TOPMAN_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::celix_pubsub_admin_zmq_v2 
PUBSUB_ZMQ_BUNDLE_FILE)
-    celix_get_bundle_file(Celix::celix_pubsub_protocol_wire_v1 
PUBSUB_WIRE_BUNDLE_FILE)
+    celix_get_bundle_file(${PSA} PUBSUB_PSA_BUNDLE_FILE)
+    celix_get_bundle_file(${WIRE} PUBSUB_WIRE_BUNDLE_FILE)
+
     celix_get_bundle_file(pubsub_sut PUBSUB_PUBLISHER_BUNDLE_FILE)
     celix_get_bundle_file(pubsub_tst PUBSUB_SUBSCRIBER_BUNDLE_FILE)
-    add_celix_bundle_dependencies(test_pubsub_interceptors_integration 
Celix::celix_pubsub_serializer_json Celix::celix_pubsub_topology_manager 
Celix::celix_pubsub_admin_zmq_v2 Celix::celix_pubsub_protocol_wire_v1 
pubsub_sut pubsub_tst)
-    target_compile_definitions(test_pubsub_interceptors_integration PRIVATE
+    add_celix_bundle_dependencies(${TEST_TARGET_NAME} 
Celix::celix_pubsub_serializer_json Celix::celix_pubsub_topology_manager ${PSA} 
${WIRE} pubsub_sut pubsub_tst)
+    target_compile_definitions(${TEST_TARGET_NAME} PRIVATE
             PUBSUB_JSON_BUNDLE_FILE="${PUBSUB_JSON_BUNDLE_FILE}"
             PUBSUB_TOPMAN_BUNDLE_FILE="${PUBSUB_TOPMAN_BUNDLE_FILE}"
-            PUBSUB_ZMQ_BUNDLE_FILE="${PUBSUB_ZMQ_BUNDLE_FILE}"
+            PUBSUB_PSA_BUNDLE_FILE="${PUBSUB_PSA_BUNDLE_FILE}"
             PUBSUB_WIRE_BUNDLE_FILE="${PUBSUB_WIRE_BUNDLE_FILE}"
             PUBSUB_PUBLISHER_BUNDLE_FILE="${PUBSUB_PUBLISHER_BUNDLE_FILE}"
             PUBSUB_SUBSCRIBER_BUNDLE_FILE="${PUBSUB_SUBSCRIBER_BUNDLE_FILE}"
     )
+
+    #if PSA websocket is enabled add http_admin bundle
+    if (BUILD_PUBSUB_PSA_WS)
+        target_link_libraries(${TEST_TARGET_NAME} PRIVATE 
Celix::http_admin_api)
+        celix_get_bundle_file(Celix::http_admin HTTP_ADMIN_BUNDLE_FILE)
+        add_celix_bundle_dependencies(${TEST_TARGET_NAME} Celix::http_admin)
+        target_compile_definitions(${TEST_TARGET_NAME} PRIVATE 
HTTP_ADMIN_BUNDLE_FILE="${HTTP_ADMIN_BUNDLE_FILE}")
+    endif ()
+endfunction()
+
+
+if (BUILD_PUBSUB_PSA_WS)
+    
add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_ws_and_wire_v1_integration
 Celix::celix_pubsub_admin_websocket_v2 Celix::celix_pubsub_protocol_wire_v1)
+    
add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_ws_and_wire_v2_integration
 Celix::celix_pubsub_admin_websocket_v2 Celix::celix_pubsub_protocol_wire_v2)
+endif ()
+
+if (BUILD_PUBSUB_PSA_TCP)
+    message(STATUS "TODO enable tcp and interceptors. Currently has a memleak")
+    
#add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v1_integration
 Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v1)
+    
#add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_tcp_and_wire_v2_integration
 Celix::celix_pubsub_admin_tcp_v2 Celix::celix_pubsub_protocol_wire_v2)
+endif ()
+
+if (BUILD_PUBSUB_PSA_ZMQ)
+    
add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_zmq_and_wire_v1_integration
 Celix::celix_pubsub_admin_zmq_v2 Celix::celix_pubsub_protocol_wire_v1)
+    
add_celix_interceptors_test_for_psa_and_wire(test_pubsub_interceptors_zmq_and_wire_v2_integration
 Celix::celix_pubsub_admin_zmq_v2 Celix::celix_pubsub_protocol_wire_v2)
 endif ()
\ No newline at end of file
diff --git a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc 
b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
index 8f500f5..21cbdc3 100644
--- a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
+++ b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc
@@ -19,96 +19,217 @@
 
 #include <gtest/gtest.h>
 
+#include <memory>
+#include <mutex>
+#include <condition_variable>
+#include <memory>
+
 #include "pubsub_serializer_handler.h"
 #include "celix/FrameworkFactory.h"
 #include "msg.h"
 #include "pubsub_interceptor.h"
 
+struct TestData {
+    TestData(const std::shared_ptr<celix::BundleContext>& ctx) {
+        serHandler = 
std::shared_ptr<pubsub_serializer_handler>{pubsub_serializerHandler_create(ctx->getCBundleContext(),
 "json", true), [](pubsub_serializer_handler_t* h) {
+            pubsub_serializerHandler_destroy(h);
+        }};
+    }
+
+    std::shared_ptr<pubsub_serializer_handler_t> serHandler{};
+
+    std::mutex mutex{}; //protects below
+    int preSendCount{0};
+    int postSendCount{0};
+    int preReceiveCount{0};
+    int postReceiveCount{0};
+    std::condition_variable cond{};
+};
+
+static void serializeAndPrint(TestData* testData, uint32_t msgId, const void 
*msg) {
+    struct iovec* vec = nullptr;
+    size_t vecLen = 0;
+    pubsub_serializerHandler_serialize(testData->serHandler.get(), msgId, msg, 
&vec, &vecLen);
+    if (vecLen > 0) {
+        for (size_t i = 0; i < vecLen; ++i) {
+            fwrite(vec[i].iov_base, sizeof(char), vec[i].iov_len, stdout);
+        }
+    }
+    fputc('\n', stdout);
+    pubsub_serializerHandler_freeSerializedMsg(testData->serHandler.get(), 
msgId, vec, vecLen);
+}
+
 class PubSubInterceptorTestSuite : public ::testing::Test {
 public:
     PubSubInterceptorTestSuite() {
         fw = celix::createFramework({
-            {"CELIX_PUBSUB_TEST_ADD_METADATA", "true"} /*TODO memleak in 
pubsub zmq v2 when metadata is empty*/
+            {"CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL", "info"},
+            {"CELIX_PUBSUB_TEST_ADD_METADATA", "true"}
         });
         ctx = fw->getFrameworkBundleContext();
+        testData = std::make_shared<TestData>(ctx);
 
         EXPECT_GE(ctx->installBundle(PUBSUB_JSON_BUNDLE_FILE), 0);
         EXPECT_GE(ctx->installBundle(PUBSUB_TOPMAN_BUNDLE_FILE), 0);
-        EXPECT_GE(ctx->installBundle(PUBSUB_ZMQ_BUNDLE_FILE), 0);
+        EXPECT_GE(ctx->installBundle(PUBSUB_PSA_BUNDLE_FILE), 0);
         EXPECT_GE(ctx->installBundle(PUBSUB_WIRE_BUNDLE_FILE), 0);
+#ifdef HTTP_ADMIN_BUNDLE_FILE
+        EXPECT_GE(ctx->installBundle(HTTP_ADMIN_BUNDLE_FILE), 0);
+#endif
     }
 
-    std::shared_ptr<celix::Framework> fw{};
-    std::shared_ptr<celix::BundleContext> ctx{};
-};
+    std::shared_ptr<celix::ServiceRegistration> createInterceptor(bool 
cancelSend, bool cancelReceive) {
+        auto interceptor = 
std::make_shared<pubsub_interceptor>(pubsub_interceptor{});
+        interceptor->handle = (void*)testData.get();
+        interceptor->preSend  = [](void* handle, const 
pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                                   const void *, celix_properties_t* metadata) 
{
+            auto* td = (TestData*)handle;
+            celix_properties_set(metadata, "test", "preSend");
 
-static void serializeAndPrint(pubsub_serializer_handler_t* ser, uint32_t 
msgId, const void *msg) {
-    struct iovec* vec = nullptr;
-    size_t vecLen = 0;
-    pubsub_serializerHandler_serialize(ser, msgId, msg, &vec, &vecLen);
-    if (vecLen > 0) {
-        for (size_t i = 0; i < vecLen; ++i) {
-            fwrite(vec[i].iov_base, sizeof(char), vec[i].iov_len, stdout);
+            std::lock_guard<std::mutex> lck{td->mutex};
+            td->preSendCount += 1;
+            td->cond.notify_all();
+            return true;
+        };
+        if (cancelSend) {
+            interceptor->preSend = [](void* handle, const 
pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                                      const void *, celix_properties_t*) {
+                auto* td = (TestData*)handle;
+                std::lock_guard<std::mutex> lck{td->mutex};
+                td->preSendCount += 1;
+                td->cond.notify_all();
+                return false;
+            };
+        }
+        interceptor->postSend = [](void *handle, const 
pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, 
const void *rawMsg,
+                                   const celix_properties_t* metadata) {
+            auto* td = (TestData*)handle;
+            serializeAndPrint(td, msgId, rawMsg);
+            EXPECT_STREQ(msgType, "msg");
+            const auto *msg = static_cast<const msg_t*>(rawMsg);
+            EXPECT_GE(msg->seqNr, 0);
+            EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), 
"preSend");
+            const char *key;
+            CELIX_PROPERTIES_FOR_EACH(metadata, key) {
+                printf("got property %s=%s\n", key, 
celix_properties_get(metadata, key, nullptr));
+            }
+            fprintf(stdout, "Got message in postSend interceptor %s/%s for 
type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, 
intProps->psaType, intProps->serializationType, msg->seqNr);
+
+            std::lock_guard<std::mutex> lck{td->mutex};
+            td->postSendCount += 1;
+            td->cond.notify_all();
+        };
+        interceptor->preReceive = [](void* handle, const 
pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                                     const void *, celix_properties_t* 
metadata) {
+            auto* td = (TestData*)handle;
+            celix_properties_set(metadata, "test", "preReceive");
+
+            std::lock_guard<std::mutex> lck{td->mutex};
+            td->preReceiveCount += 1;
+            td->cond.notify_all();
+            return true;
+        };
+        if (cancelReceive) {
+            interceptor->preReceive = [](void* handle, const 
pubsub_interceptor_properties_t *, const char *, const uint32_t,
+                                         const void *, celix_properties_t*) {
+                auto* td = (TestData*)handle;
+                std::lock_guard<std::mutex> lck{td->mutex};
+                td->preReceiveCount += 1;
+                td->cond.notify_all();
+                return false;
+            };
         }
+        interceptor->postReceive = [](void *handle, const 
pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, 
const void *rawMsg,
+                                      const celix_properties_t* metadata) {
+            auto* td = (TestData*)handle;
+            serializeAndPrint(td, msgId, rawMsg);
+            EXPECT_STREQ(msgType, "msg");
+            const auto *msg = static_cast<const msg_t*>(rawMsg);
+            EXPECT_GE(msg->seqNr, 0);
+            EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), 
"preReceive");
+            fprintf(stdout, "Got message in postReceive interceptor %s/%s for 
type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, 
intProps->psaType, intProps-> serializationType, msg->seqNr);
+
+            std::lock_guard<std::mutex> lck{td->mutex};
+            td->postReceiveCount += 1;
+            td->cond.notify_all();
+        };
+        //note registering identical services to validate multiple interceptors
+        return 
ctx->registerService<pubsub_interceptor>(std::move(interceptor), 
PUBSUB_INTERCEPTOR_SERVICE_NAME)
+                .setUnregisterAsync(false) //note to ensure test data is still 
valid when service is registered
+                .build();
     }
-    fputc('\n', stdout);
-    pubsub_serializerHandler_freeSerializedMsg(ser, msgId, vec, vecLen);
-}
 
-std::shared_ptr<celix::ServiceRegistration> 
createInterceptor(std::shared_ptr<celix::BundleContext>& ctx) {
-    auto interceptor = std::shared_ptr<pubsub_interceptor>{new 
pubsub_interceptor{}, [](pubsub_interceptor* inter) {
-        auto* handler = (pubsub_serializer_handler_t*)inter->handle;
-        pubsub_serializerHandler_destroy(handler);
-        delete inter;
-    }};
-    interceptor->handle = 
pubsub_serializerHandler_create(ctx->getCBundleContext(), "json", true);
-    interceptor->preSend  = [](void *, const pubsub_interceptor_properties_t 
*, const char *, const uint32_t,
-                               const void *, celix_properties_t* metadata) {
-        celix_properties_set(metadata, "test", "preSend");
-        return true;
-    };
-    interceptor->postSend = [](void *handle, const 
pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, 
const void *rawMsg,
-                               const celix_properties_t* metadata) {
-        auto* ser = (pubsub_serializer_handler_t*)handle;
-        serializeAndPrint(ser, msgId, rawMsg);
-        EXPECT_STREQ(msgType, "msg");
-        const auto *msg = static_cast<const msg_t*>(rawMsg);
-        EXPECT_GE(msg->seqNr, 0);
-        EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), 
"preSend");
-        fprintf(stdout, "Got message in postSend interceptor %s/%s for type %s 
and ser %s with seq nr %i\n", intProps->scope, intProps->topic, 
intProps->psaType, intProps->serializationType, msg->seqNr);
-    };
-    interceptor->preReceive = [](void *, const pubsub_interceptor_properties_t 
*, const char *, const uint32_t,
-                                 const void *, celix_properties_t* metadata) {
-        celix_properties_set(metadata, "test", "preReceive");
-        return true;
-    };
-    interceptor->postReceive = [](void *handle, const 
pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, 
const void *rawMsg,
-                                  const celix_properties_t* metadata) {
-        auto* ser = (pubsub_serializer_handler_t*)handle;
-        serializeAndPrint(ser, msgId, rawMsg);
-        EXPECT_STREQ(msgType, "msg");
-        const auto *msg = static_cast<const msg_t*>(rawMsg);
-        EXPECT_GE(msg->seqNr, 0);
-        EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), 
"preReceive");
-        fprintf(stdout, "Got message in postReceive interceptor %s/%s for type 
%s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, 
intProps->psaType, intProps-> serializationType, msg->seqNr);
-    };
-    //note registering identical services to validate multiple interceptors
-    return ctx->registerService<pubsub_interceptor>(interceptor, 
PUBSUB_INTERCEPTOR_SERVICE_NAME).build();
-}
+    std::shared_ptr<celix::Framework> fw{};
+    std::shared_ptr<celix::BundleContext> ctx{};
+    std::shared_ptr<TestData> testData{};
+};
 
 TEST_F(PubSubInterceptorTestSuite, 
InterceptorWithSinglePublishersAndMultipleReceivers) {
     //Given a publisher (PUBSUB_PUBLISHER_BUNDLE_FILE) and 2 receivers 
(PUBSUB_SUBSCRIBER_BUNDLE_FILE)
-    //And a registered interceptor
+    //And several registered interceptors
     //Then the interceptor receives a correct msg type.
 
+    auto reg1 = createInterceptor(false, false);
+    auto reg2 = createInterceptor(false, false);
+    auto reg3 = createInterceptor(false, false);
+    ctx->waitForEvents();
+
     EXPECT_GE(ctx->installBundle(PUBSUB_PUBLISHER_BUNDLE_FILE), 0);
     EXPECT_GE(ctx->installBundle(PUBSUB_SUBSCRIBER_BUNDLE_FILE), 0);
 
-    auto reg1 = createInterceptor(ctx);
-    auto reg2 = createInterceptor(ctx);
-    auto reg3 = createInterceptor(ctx);
+    std::unique_lock<std::mutex> lck{testData->mutex};
+    auto isTestDone = testData->cond.wait_for(lck, std::chrono::seconds{5}, 
[this]{
+        return  testData->preSendCount > 10 &&
+                testData->postSendCount > 10 &&
+                testData->preReceiveCount > 10 &&
+                testData->postReceiveCount > 10;
+    });
 
-    //TODO stop after a certain amount of messages send
-    //TODO also test with tcp v2.
-    sleep(5);
+    EXPECT_TRUE(isTestDone);
 }
+
+TEST_F(PubSubInterceptorTestSuite, 
InterceptorWithPreSendCancelWillPreventSends) {
+    //Given a publisher (PUBSUB_PUBLISHER_BUNDLE_FILE) and 2 receivers 
(PUBSUB_SUBSCRIBER_BUNDLE_FILE)
+    //And a interceptor which cancel a send
+    //Then only the preSend count will be increased, but the rest of the count 
will be 0
+
+    auto reg1 = createInterceptor(true, false);
+    ctx->waitForEvents();
+
+    EXPECT_GE(ctx->installBundle(PUBSUB_PUBLISHER_BUNDLE_FILE), 0);
+    EXPECT_GE(ctx->installBundle(PUBSUB_SUBSCRIBER_BUNDLE_FILE), 0);
+
+    std::unique_lock<std::mutex> lck{testData->mutex};
+    auto isTestDone = testData->cond.wait_for(lck, std::chrono::seconds{5}, 
[this]{
+        return  testData->preSendCount > 10 ;
+    });
+
+    EXPECT_EQ(testData->postSendCount, 0);
+    EXPECT_EQ(testData->preReceiveCount, 0);
+    EXPECT_EQ(testData->postReceiveCount, 0);
+
+    EXPECT_TRUE(isTestDone);
+}
+
+TEST_F(PubSubInterceptorTestSuite, 
InterceptorWithPreRedeiveCancelWillPreventPostReceive) {
+    //Given a publisher (PUBSUB_PUBLISHER_BUNDLE_FILE) and 2 receivers 
(PUBSUB_SUBSCRIBER_BUNDLE_FILE)
+    //And a interceptor which cancel a receive
+    //Then the preSend, postSend and preReceive count will be increased, but 
the postReceive count will be 0
+
+    auto reg1 = createInterceptor(false, true);
+    ctx->waitForEvents();
+
+    EXPECT_GE(ctx->installBundle(PUBSUB_PUBLISHER_BUNDLE_FILE), 0);
+    EXPECT_GE(ctx->installBundle(PUBSUB_SUBSCRIBER_BUNDLE_FILE), 0);
+
+    std::unique_lock<std::mutex> lck{testData->mutex};
+    auto isTestDone = testData->cond.wait_for(lck, std::chrono::seconds{5}, 
[this]{
+        return  testData->preSendCount > 10 &&
+                testData->postSendCount > 10 &&
+                testData->preReceiveCount > 10;
+    });
+
+    EXPECT_EQ(testData->postReceiveCount, 0);
+
+    EXPECT_TRUE(isTestDone);
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index e9ef6b4..fe6fd53 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -40,6 +40,8 @@
 #define UUID_STR_LEN  37
 #endif
 
+#define L_TRACE(...) \
+    celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_TRACE, 
__VA_ARGS__)
 #define L_DEBUG(...) \
     celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, 
__VA_ARGS__)
 #define L_INFO(...) \
@@ -486,13 +488,15 @@ static inline void processMsg(void* handle, const 
pubsub_protocol_message_t *mes
         if (status == CELIX_SUCCESS) {
             celix_properties_t *metadata = message->metadata.metadata;
             bool cont = 
pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, 
msgFqn, message->header.msgId, deSerializedMsg, &metadata);
+            bool release = true;
             if (cont) {
-                bool release;
                 callReceivers(receiver, msgFqn, message, &deSerializedMsg, 
&release, metadata);
                 
pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, 
msgFqn, message->header.msgId, deSerializedMsg, metadata);
-                if (release) {
-                    
pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, 
message->header.msgId, deSerializedMsg);
-                }
+            } else {
+                L_TRACE("Skipping receive for msg type %s, based on pre 
receive interceptor result", msgFqn);
+            }
+            if (release) {
+                
pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, 
message->header.msgId, deSerializedMsg);
             }
         } else {
             L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for 
scope/topic %s/%s", msgFqn,
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c 
b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 3c58e84..a817a3e 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -374,6 +374,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int 
msgTypeId, const void *i
     bool cont = 
pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, 
msgTypeId, inMsg, &metadata);
     if (!cont) {
         L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+        celix_properties_destroy(metadata);
         return status;
     }
 
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
index 56f4008..7421b09 100644
--- 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
+++ 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
@@ -491,14 +491,17 @@ static void 
processJsonMsg(pubsub_websocket_topic_receiver_t *receiver, const pu
             celix_properties_t *metadata = NULL; //NOTE metadata not supported 
for websocket
             bool cont = 
pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, 
header->fqn, msgId,
                                                                   
deserializedMsg, &metadata);
+            bool release = true;
             if (cont) {
-                bool release;
                 callReceivers(receiver, msgId, header, payload, payloadSize, 
&deserializedMsg, &release, metadata);
                 
pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, 
header->fqn, msgId, deserializedMsg, metadata);
-                if (release) {
-                    
pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, 
msgId, deserializedMsg);
-                }
+            } else {
+                L_TRACE("Skipping receive for msg type %s, based on pre 
receive interceptor result", header->fqn);
+            }
+            if (release) {
+                
pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, 
msgId, deserializedMsg);
             }
+            celix_properties_destroy(metadata);
         } else {
             L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for 
scope/topic %s/%s", header->fqn, receiver->scope == NULL ? "(null)" : 
receiver->scope, receiver->topic);
         }
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index e435093..7ecf546 100644
--- 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
+++ 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -270,6 +270,7 @@ static int psa_websocket_topicPublicationSend(void* handle, 
unsigned int msgType
     bool cont = 
pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, 
msgTypeId, inMsg, &metadata);
     if (!cont) {
         L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+        celix_properties_destroy(metadata);
         return status;
     }
 
@@ -316,6 +317,7 @@ static int psa_websocket_topicPublicationSend(void* handle, 
unsigned int msgType
     }
 
     pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, 
msgFqn, msgTypeId, inMsg, metadata);
+    celix_properties_destroy(metadata);
 
     return status;
 }
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index f5e70b0..90d93cb 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -51,7 +51,8 @@
 #define UUID_STR_LEN 37
 #endif
 
-
+#define L_TRACE(...) \
+    celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_TRACE, 
__VA_ARGS__)
 #define L_DEBUG(...) \
     celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, 
__VA_ARGS__)
 #define L_INFO(...) \
@@ -468,13 +469,15 @@ static inline void processMsg(pubsub_zmq_topic_receiver_t 
*receiver, pubsub_prot
         if (status == CELIX_SUCCESS) {
             celix_properties_t *metadata = message->metadata.metadata;
             bool cont = 
pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, 
msgFqn, message->header.msgId, deserializedMsg, &metadata);
+            bool release = true;
             if (cont) {
-                bool release;
                 callReceivers(receiver, msgFqn, message, &deserializedMsg, 
&release, metadata);
                 
pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, 
msgFqn, message->header.msgId, deserializedMsg, metadata);
-                if (release) {
-                    
pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, 
message->header.msgId, deserializedMsg);
-                }
+            } else {
+                L_TRACE("Skipping receive for msg type %s, based on pre 
receive interceptor result", msgFqn);
+            }
+            if (release) {
+                
pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, 
message->header.msgId, deserializedMsg);
             }
         } else {
             L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for 
scope/topic %s/%s", msgFqn,
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c 
b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index 155cb19..64c3b41 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -410,6 +410,7 @@ static int psa_zmq_topicPublicationSend(void* handle, 
unsigned int msgTypeId, co
     bool cont = 
pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, 
msgTypeId, inMsg, &metadata);
     if (!cont) {
         L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+        celix_properties_destroy(metadata);
         return status;
     }
 
@@ -541,10 +542,8 @@ static int psa_zmq_topicPublicationSend(void* handle, 
unsigned int msgTypeId, co
     }
     __atomic_store_n(&sender->zmqBuffers.dataLock, false, __ATOMIC_RELEASE);
     pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, 
msgFqn, msgTypeId, inMsg, metadata);
+    celix_properties_destroy(metadata);
 
-    if (message.metadata.metadata) {
-        celix_properties_destroy(message.metadata.metadata);
-    }
     if (!bound->parent->zeroCopyEnabled && serializedIoVecOutput) {
         pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, 
msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
     }

Reply via email to