This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/pubsub-interceptor-fix in repository https://gitbox.apache.org/repos/asf/celix.git
commit 6229f9e0317ae370b11ed3a99b2b6907763abbd0 Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Jun 28 21:42:18 2021 +0200 Updates the interceptor api so that metadata can be extended in the preSend/Receive callbacks. --- .../include/first_interceptor_private.h | 8 +-- .../include/second_interceptor_private.h | 8 +-- .../pubsub/interceptors/src/first_interceptor.c | 8 +-- .../pubsub/interceptors/src/second_interceptor.c | 8 +-- .../gtest/PubSubInterceptorTestSuite.cc | 26 +++++++-- .../v1/src/pubsub_tcp_topic_receiver.c | 2 +- .../v1/src/pubsub_tcp_topic_sender.c | 2 +- .../v2/src/pubsub_tcp_topic_receiver.c | 3 +- .../v2/src/pubsub_tcp_topic_sender.c | 3 +- .../v1/src/pubsub_zmq_topic_receiver.c | 2 +- .../v1/src/pubsub_zmq_topic_sender.c | 2 +- .../v2/src/pubsub_zmq_topic_receiver.c | 3 +- .../v2/src/pubsub_zmq_topic_sender.c | 3 +- .../pubsub/pubsub_spi/include/pubsub_interceptor.h | 16 ++--- .../include/pubsub_interceptors_handler.h | 12 ++-- .../pubsub_spi/src/pubsub_interceptors_handler.c | 68 ++++++++++------------ 16 files changed, 94 insertions(+), 80 deletions(-) diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h index 941c43b..3071a25 100644 --- a/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h +++ b/bundles/pubsub/examples/pubsub/interceptors/include/first_interceptor_private.h @@ -35,9 +35,9 @@ static const char *const SEQUENCE_NUMBER = "sequence.number"; celix_status_t firstInterceptor_create(first_interceptor_t **interceptor); celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor); -bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); -void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); -bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); -void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); +bool firstInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); +void firstInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); +bool firstInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); +void firstInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); #endif //CELIX_FIRST_INTERCEPTOR_PRIVATE_H diff --git a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h index dc8ebaa..b37abad 100644 --- a/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h +++ b/bundles/pubsub/examples/pubsub/interceptors/include/second_interceptor_private.h @@ -28,9 +28,9 @@ typedef struct second_interceptor { celix_status_t secondInterceptor_create(second_interceptor_t **interceptor); celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor); -bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); -void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); -bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); -void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); +bool secondInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); +void secondInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); +bool secondInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); +void secondInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); #endif //CELIX_SECOND_INTERCEPTOR_PRIVATE_H diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c index e15a007..8414d8d 100644 --- a/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c +++ b/bundles/pubsub/examples/pubsub/interceptors/src/first_interceptor.c @@ -41,7 +41,7 @@ celix_status_t firstInterceptor_destroy(first_interceptor_t *interceptor) { } -bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { +bool firstInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { first_interceptor_t *interceptor = handle; celixThreadMutex_lock(&interceptor->mutex); @@ -54,19 +54,19 @@ bool firstInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *pro return true; } -void firstInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { +void firstInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0); printf("Invoked postSend on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence); } -bool firstInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { +bool firstInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0); printf("Invoked preReceive on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence); return true; } -void firstInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { +void firstInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { uint64_t sequence = celix_properties_getAsLong(metadata, SEQUENCE_NUMBER, 0); printf("Invoked postReceive on first interceptor, for message with sequenceNumber [%"PRIu64"]\n", sequence); } diff --git a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c index 3e18b9c..d89c553 100644 --- a/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c +++ b/bundles/pubsub/examples/pubsub/interceptors/src/second_interceptor.c @@ -36,23 +36,23 @@ celix_status_t secondInterceptor_destroy(second_interceptor_t *interceptor) { } -bool secondInterceptor_preSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { +bool secondInterceptor_preSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { printf("Invoked preSend on second interceptor\n"); return true; } -void secondInterceptor_postSend(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { +void secondInterceptor_postSend(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { printf("Invoked postSend on second interceptor\n"); } -bool secondInterceptor_preReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { +bool secondInterceptor_preReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata) { printf("Invoked preReceive on second interceptor\n"); return true; } -void secondInterceptor_postReceive(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { +void secondInterceptor_postReceive(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata) { printf("Invoked postReceive on second interceptor\n"); } diff --git a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc index 181eff3..8f500f5 100644 --- a/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc +++ b/bundles/pubsub/integration/gtest/PubSubInterceptorTestSuite.cc @@ -62,23 +62,35 @@ std::shared_ptr<celix::ServiceRegistration> createInterceptor(std::shared_ptr<ce delete inter; }}; interceptor->handle = pubsub_serializerHandler_create(ctx->getCBundleContext(), "json", true); - interceptor->postSend = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg, - const celix_properties_t *) { + interceptor->preSend = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t, + const void *, celix_properties_t* metadata) { + celix_properties_set(metadata, "test", "preSend"); + return true; + }; + interceptor->postSend = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg, + const celix_properties_t* metadata) { auto* ser = (pubsub_serializer_handler_t*)handle; serializeAndPrint(ser, msgId, rawMsg); EXPECT_STREQ(msgType, "msg"); const auto *msg = static_cast<const msg_t*>(rawMsg); EXPECT_GE(msg->seqNr, 0); - fprintf(stdout, "Got message in postSend interceptor %p with seq nr %i\n", handle, msg->seqNr); + EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preSend"); + fprintf(stdout, "Got message in postSend interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps->serializationType, msg->seqNr); + }; + interceptor->preReceive = [](void *, const pubsub_interceptor_properties_t *, const char *, const uint32_t, + const void *, celix_properties_t* metadata) { + celix_properties_set(metadata, "test", "preReceive"); + return true; }; - interceptor->postReceive = [](void *handle, pubsub_interceptor_properties_t *, const char *msgType, uint32_t msgId, const void *rawMsg, - const celix_properties_t *) { + interceptor->postReceive = [](void *handle, const pubsub_interceptor_properties_t* intProps, const char *msgType, uint32_t msgId, const void *rawMsg, + const celix_properties_t* metadata) { auto* ser = (pubsub_serializer_handler_t*)handle; serializeAndPrint(ser, msgId, rawMsg); EXPECT_STREQ(msgType, "msg"); const auto *msg = static_cast<const msg_t*>(rawMsg); EXPECT_GE(msg->seqNr, 0); - fprintf(stdout, "Got message in postReceive interceptor %p with seq nr %i\n", handle, msg->seqNr); + EXPECT_STREQ(celix_properties_get(metadata, "test", nullptr), "preReceive"); + fprintf(stdout, "Got message in postReceive interceptor %s/%s for type %s and ser %s with seq nr %i\n", intProps->scope, intProps->topic, intProps->psaType, intProps-> serializationType, msg->seqNr); }; //note registering identical services to validate multiple interceptors return ctx->registerService<pubsub_interceptor>(interceptor, PUBSUB_INTERCEPTOR_SERVICE_NAME).build(); @@ -96,5 +108,7 @@ TEST_F(PubSubInterceptorTestSuite, InterceptorWithSinglePublishersAndMultipleRec auto reg2 = createInterceptor(ctx); auto reg3 = createInterceptor(ctx); + //TODO stop after a certain amount of messages send + //TODO also test with tcp v2. sleep(5); } diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c index 8acb8e2..4178b50 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c @@ -144,7 +144,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context receiver->protocol = protocol; receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024); receiver->topic = strndup(topic, 1024 * 1024); - pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler); + receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*"); const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope); const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope); const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope); diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c index b287ebd..32a3328 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c @@ -145,7 +145,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( if (uuid != NULL) { uuid_parse(uuid, sender->fwUUID); } - pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler); + sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*"); sender->isPassive = false; sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED); char *urls = NULL; diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c index ad321e8..36816f1 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c @@ -124,7 +124,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context receiver->protocol = protocol; receiver->scope = celix_utils_strdup(scope); receiver->topic = celix_utils_strdup(topic); - pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler); + receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, + pubsub_serializerHandler_getSerializationType(serializerHandler)); const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope); const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope); const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope); diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c index 2c8daf4..3c58e84 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c @@ -122,7 +122,8 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( if (uuid != NULL) { uuid_parse(uuid, sender->fwUUID); } - pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler); + sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, + pubsub_serializerHandler_getSerializationType(serializerHandler)); sender->isPassive = false; char *urls = NULL; const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL); diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c index 9cd99f4..62b2fbf 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c @@ -157,7 +157,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context receiver->topic = strndup(topic, 1024 * 1024); receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED); - pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler); + receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*"); #ifdef BUILD_WITH_ZMQ_SECURITY char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c index 1e95c0b..aba8893 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c @@ -157,7 +157,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create( sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED); sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED); - pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler); + sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*"); //setting up zmq socket for ZMQ TopicSender { diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c index 0fc2bc2..d6c5805 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c @@ -138,7 +138,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context receiver->scope = scope == NULL ? NULL : celix_utils_strdup(scope); receiver->topic = celix_utils_strdup(topic); - pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler); + receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, + pubsub_serializerHandler_getSerializationType(serHandler)); #ifdef BUILD_WITH_ZMQ_SECURITY char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c index 7d9e750..d1f36a5 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c @@ -142,7 +142,8 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create( } sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED); - pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler); + sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, + pubsub_serializerHandler_getSerializationType(serializerHandler)); //setting up zmq socket for ZMQ TopicSender { diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h index eff1e78..69bffa9 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h @@ -30,11 +30,13 @@ extern "C" { #include "celix_properties.h" #define PUBSUB_INTERCEPTOR_SERVICE_NAME "pubsub.interceptor" -#define PUBSUB_INTERCEPTOR_SERVICE_VERSION "1.0.0" +#define PUBSUB_INTERCEPTOR_SERVICE_VERSION "2.0.0" typedef struct pubsub_interceptor_properties { - const char *scope; - const char *topic; + const char* psaType; //i.e. zmq, tcp, etc + const char* serializationType; //i.e. json, avrobin + const char* scope; + const char* topic; } pubsub_interceptor_properties_t; /** @@ -60,7 +62,7 @@ struct pubsub_interceptor { * @param metadata The metadata of the message * @return True if the send should continue. */ - bool (*preSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); + bool (*preSend)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); /** * @brief postSend will be called when a user called send on a pubsub/publisher, but after the message is "handed over" to the actual pubsub technology (i.e. TCP stack, shared memory, etc) @@ -74,7 +76,7 @@ struct pubsub_interceptor { * @param message The actual message pointer * @param metadata The metadata of the message */ - void (*postSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); + void (*postSend)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); /** * @brief preReceive will be called when is message is received in a pubsub admin, but before the pubsub/subscriber callback is called. @@ -89,7 +91,7 @@ struct pubsub_interceptor { * @param metadata The metadata of the message * @return True if the pubsub/subsciber callback should be called. */ - bool (*preReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); + bool (*preReceive)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, celix_properties_t *metadata); /** * @brief postReceive will be called when is message is received in a pubsub admin and is called after the pubsub/subscriber callback is called. @@ -103,7 +105,7 @@ struct pubsub_interceptor { * @param message The actual message pointer * @param metadata The metadata of the message */ - void (*postReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); + void (*postReceive)(void *handle, const pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); }; typedef struct pubsub_interceptor pubsub_interceptor_t; diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h index 6595853..a12a865 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h @@ -32,13 +32,13 @@ typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t; -celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler); -celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler); +pubsub_interceptors_handler_t* pubsubInterceptorsHandler_create(celix_bundle_context_t* ctx, const char* scope, const char* topic, const char* psaType, const char* serializationType); +void pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler); -bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata); -void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata); -bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata); -void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata); +bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata); +void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata); +bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t** metadata); +void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char* messageType, uint32_t messageId, const void* message, celix_properties_t* metadata); #ifdef __cplusplus } diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c index 6118fbe..8191f94 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c @@ -45,45 +45,39 @@ static int referenceCompare(const void *a, const void *b); static void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props); static void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, const celix_properties_t *props); -celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler) { - celix_status_t status = CELIX_SUCCESS; - - *handler = calloc(1, sizeof(**handler)); - if (!*handler) { - status = CELIX_ENOMEM; - } else { - (*handler)->ctx = ctx; - - (*handler)->properties.scope = scope; - (*handler)->properties.topic = topic; - - (*handler)->interceptors = celix_arrayList_create(); - - status = celixThreadMutex_create(&(*handler)->lock, NULL); - - if (status == CELIX_SUCCESS) { - // Create service tracker here, and not in the activator - celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; - opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME; - opts.filter.ignoreServiceLanguage = true; - opts.callbackHandle = *handler; - opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor; - opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor; - (*handler)->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); - } - } - - return status; +pubsub_interceptors_handler_t* pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, const char* psaType, const char* serType) { + pubsub_interceptors_handler_t* handler = calloc(1, sizeof(*handler)); + handler->ctx = ctx; + handler->properties.scope = celix_utils_strdup(scope); + handler->properties.topic = celix_utils_strdup(topic); + handler->properties.psaType = celix_utils_strdup(psaType); + handler->properties.serializationType = celix_utils_strdup(serType); + handler->interceptors = celix_arrayList_create(); + celixThreadMutex_create(&handler->lock, NULL); + + // Create service tracker here, and not in the activator + celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; + opts.filter.serviceName = PUBSUB_INTERCEPTOR_SERVICE_NAME; + opts.filter.ignoreServiceLanguage = true; + opts.callbackHandle = handler; + opts.addWithProperties = pubsubInterceptorsHandler_addInterceptor; + opts.removeWithProperties = pubsubInterceptorsHandler_removeInterceptor; + handler->interceptorsTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); + + return handler; } -celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) { - celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId); - - celix_arrayList_destroy(handler->interceptors); - celixThreadMutex_destroy(&handler->lock); - free(handler); - - return CELIX_SUCCESS; +void pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler) { + if (handler != NULL) { + celix_bundleContext_stopTracker(handler->ctx, handler->interceptorsTrackerId); + celix_arrayList_destroy(handler->interceptors); + celixThreadMutex_destroy(&handler->lock); + free((char*)handler->properties.scope); + free((char*)handler->properties.topic); + free((char*)handler->properties.psaType); + free((char*)handler->properties.serializationType); + free(handler); + } } void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const celix_properties_t *props) {
