This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/pubsub-interceptor-fix in repository https://gitbox.apache.org/repos/asf/celix.git
commit 39979064bc42cedac9f533b860403ae4eb532dcc Author: Pepijn Noltes <[email protected]> AuthorDate: Sun Jun 27 16:39:42 2021 +0200 Adds missing ifdef c++ checks and update interceptor handle to not create a empty metadata if not present on the wire. --- bundles/pubsub/integration/CMakeLists.txt | 28 ++++++++- bundles/pubsub/integration/src/sut_activator.c | 11 +++- .../pubsub/pubsub_api/include/pubsub/publisher.h | 14 ++--- .../pubsub/pubsub_api/include/pubsub/subscriber.h | 19 +++--- bundles/pubsub/pubsub_spi/include/pubsub_admin.h | 14 ++--- .../pubsub_spi/include/pubsub_admin_metrics.h | 7 +++ .../pubsub/pubsub_spi/include/pubsub_interceptor.h | 67 ++++++++++++++++++++++ .../include/pubsub_interceptors_handler.h | 15 +++-- .../pubsub/pubsub_spi/include/pubsub_listeners.h | 9 ++- .../include/pubsub_message_serialization_marker.h | 7 +++ .../include/pubsub_message_serialization_service.h | 7 +++ .../pubsub/pubsub_spi/include/pubsub_protocol.h | 7 +++ .../pubsub/pubsub_spi/include/pubsub_serializer.h | 7 +++ .../pubsub_spi/src/pubsub_interceptors_handler.c | 32 +++++------ 14 files changed, 197 insertions(+), 47 deletions(-) diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt index 3cda520..a134b74 100644 --- a/bundles/pubsub/integration/CMakeLists.txt +++ b/bundles/pubsub/integration/CMakeLists.txt @@ -754,7 +754,6 @@ if (BUILD_PUBSUB_PSA_ZMQ) setup_target_for_coverage(pstm_deadlock_zmq_v2_test SCAN_DIR ..) endif () - if (BUILD_PUBSUB_PSA_ZMQ) #Test suite to test if component with same topic and different scope combinations work add_executable(test_pubsub_topic_and_scope_integration @@ -780,4 +779,31 @@ if (BUILD_PUBSUB_PSA_ZMQ) PUBSUB_ZMQ_BUNDLE_FILE="${PUBSUB_ZMQ_BUNDLE_FILE}" PUBSUB_WIRE_BUNDLE_FILE="${PUBSUB_WIRE_BUNDLE_FILE}" ) +endif () + +if (BUILD_PUBSUB_PSA_ZMQ) + #Test suite to test if pusbub interceptors + add_executable(test_pubsub_interceptors_integration + gtest/PubSubInterceptorTestSuite.cc + ) + target_link_libraries(test_pubsub_interceptors_integration PRIVATE Celix::framework Celix::pubsub_api GTest::gtest GTest::gtest_main Celix::pubsub_spi) + target_include_directories(test_pubsub_interceptors_integration PRIVATE src) + setup_target_for_coverage(test_pubsub_interceptors_integration SCAN_DIR ..) + + #configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles + celix_get_bundle_file(Celix::pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE) + celix_get_bundle_file(Celix::pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE) + celix_get_bundle_file(Celix::pubsub_admin_zmq PUBSUB_ZMQ_BUNDLE_FILE) + celix_get_bundle_file(Celix::pubsub_protocol_wire_v1 PUBSUB_WIRE_BUNDLE_FILE) + celix_get_bundle_file(pubsub_sut PUBSUB_PUBLISHER_BUNDLE_FILE) + celix_get_bundle_file(pubsub_tst PUBSUB_SUBSCRIBER_BUNDLE_FILE) + add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_zmq Celix::pubsub_protocol_wire_v1 pubsub_sut pubsub_tst) + target_compile_definitions(test_pubsub_interceptors_integration PRIVATE + PUBSUB_JSON_BUNDLE_FILE="${PUBSUB_JSON_BUNDLE_FILE}" + PUBSUB_TOPMAN_BUNDLE_FILE="${PUBSUB_TOPMAN_BUNDLE_FILE}" + PUBSUB_ZMQ_BUNDLE_FILE="${PUBSUB_ZMQ_BUNDLE_FILE}" + PUBSUB_WIRE_BUNDLE_FILE="${PUBSUB_WIRE_BUNDLE_FILE}" + PUBSUB_PUBLISHER_BUNDLE_FILE="${PUBSUB_PUBLISHER_BUNDLE_FILE}" + PUBSUB_SUBSCRIBER_BUNDLE_FILE="${PUBSUB_SUBSCRIBER_BUNDLE_FILE}" + ) endif () \ No newline at end of file diff --git a/bundles/pubsub/integration/src/sut_activator.c b/bundles/pubsub/integration/src/sut_activator.c index 019ef7c..5a829e7 100644 --- a/bundles/pubsub/integration/src/sut_activator.c +++ b/bundles/pubsub/integration/src/sut_activator.c @@ -30,6 +30,8 @@ static void sut_pubSet(void *handle, void *service); static void* sut_sendThread(void *data); struct activator { + bool addMetadata; + long pubTrkId; pthread_t sendThread; @@ -41,6 +43,8 @@ struct activator { celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) { + act->addMetadata = celix_bundleContext_getPropertyAsBool(ctx, "CELIX_PUBSUB_TEST_ADD_METADATA", false); + char filter[512]; bool useNegativeScopeFilter = celix_bundleContext_getPropertyAsBool(ctx, "CELIX_PUBSUB_TEST_USE_NEGATIVE_SCOPE_FILTER", true); if (useNegativeScopeFilter) { @@ -91,7 +95,12 @@ static void* sut_sendThread(void *data) { if (msgId == 0) { act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &msgId); } - act->pubSvc->send(act->pubSvc->handle, msgId, &msg, NULL); + celix_properties_t* metadata = NULL; + if (act->addMetadata) { + metadata = celix_properties_create(); + celix_properties_setLong(metadata, "seqNr", (long)msgId); + } + act->pubSvc->send(act->pubSvc->handle, msgId, &msg, metadata); if (msg.seqNr % 1000 == 0) { printf("Send %i messages\n", msg.seqNr); } diff --git a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h index 72cafbe..68eb839 100644 --- a/bundles/pubsub/pubsub_api/include/pubsub/publisher.h +++ b/bundles/pubsub/pubsub_api/include/pubsub/publisher.h @@ -16,17 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -/** - * publisher.h - * - * \date Jan 7, 2016 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ #ifndef __PUBSUB_PUBLISHER_H_ #define __PUBSUB_PUBLISHER_H_ +#ifdef __cplusplus +extern "C" { +#endif + #include <stdlib.h> #include "celix_properties.h" @@ -75,4 +72,7 @@ struct pubsub_publisher { }; typedef struct pubsub_publisher pubsub_publisher_t; +#ifdef __cplusplus +} +#endif #endif // __PUBSUB_PUBLISHER_H_ diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h index 1170f2c..ccc62e4 100644 --- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h +++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h @@ -16,17 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -/** - * subscriber.h - * - * \date Jan 7, 2016 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ #ifndef __PUBSUB_SUBSCRIBER_H_ #define __PUBSUB_SUBSCRIBER_H_ +#ifdef __cplusplus +extern "C" { +#endif + #include <stdbool.h> #include "celix_properties.h" @@ -45,6 +42,8 @@ struct pubsub_subscriber_struct { /** * Called to initialize the subscriber with the receiver thread. * Can be used to tweak the receiver thread attributes + * + * this method can be NULL. */ int (*init)(void *handle); @@ -66,7 +65,7 @@ struct pubsub_subscriber_struct { * @param msgType The fully qualified type name * @param msgTypeId The local type id of the type, how this is calculated/created is up to the pubsub admin. (can be cached for improved performance) * @param msg The pointer to the message - * @param metadata The meta data provided with the data. Can be NULL. + * @param metadata The meta data provided with the data. Can be NULL and is only valid during the callback. * @param release Pointer to the release boolean, default is release is true. * @return Return 0 implies a successful handling. If return is not 0, the msg will always be released by the pubsubadmin. */ @@ -75,5 +74,7 @@ struct pubsub_subscriber_struct { }; typedef struct pubsub_subscriber_struct pubsub_subscriber_t; - +#ifdef __cplusplus +} +#endif #endif // __PUBSUB_SUBSCRIBER_H_ diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h index 15d5d16..e142883 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_admin.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin.h @@ -16,17 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -/** - * pubsub_admin.h - * - * \date Sep 30, 2011 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ #ifndef PUBSUB_ADMIN_H_ #define PUBSUB_ADMIN_H_ +#ifdef __cplusplus +extern "C" { +#endif + #include "celix_properties.h" #include "celix_bundle.h" #include "celix_filter.h" @@ -62,6 +59,9 @@ struct pubsub_admin_service { typedef struct pubsub_admin_service pubsub_admin_service_t; +#ifdef __cplusplus +} +#endif #endif /* PUBSUB_ADMIN_H_ */ diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h b/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h index 449ec63..628eda0 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_admin_metrics.h @@ -20,6 +20,10 @@ #ifndef PUBSUB_ADMIN_METRICS_H_ #define PUBSUB_ADMIN_METRICS_H_ +#ifdef __cplusplus +extern "C" { +#endif + #include <uuid/uuid.h> #include <sys/time.h> #include "celix_array_list.h" @@ -101,6 +105,9 @@ void pubsub_freePubSubAdminMetrics(pubsub_admin_metrics_t *metrics); typedef struct pubsub_admin_metrics_service pubsub_admin_metrics_service_t; +#ifdef __cplusplus +} +#endif #endif /* PUBSUB_ADMIN_METRICS_H_ */ diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h index 75bd3d6..eff1e78 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptor.h @@ -20,6 +20,10 @@ #ifndef __PUBSUB_INTERCEPTOR_H #define __PUBSUB_INTERCEPTOR_H +#ifdef __cplusplus +extern "C" { +#endif + #include <stdlib.h> #include <stdint.h> @@ -33,15 +37,78 @@ typedef struct pubsub_interceptor_properties { const char *topic; } pubsub_interceptor_properties_t; +/** + * @brief PubSub Interceptor which can be used to intercept pubsub publish/receive callbacks + * + */ struct pubsub_interceptor { + /** + * Service handle. + */ void *handle; + /** + * @brief preSend will be called when a user called send on a pubsub/publisher, but before the message is "handed over" to the actual pubsub technology (i.e. TCP stack, shared memory, etc) + * + * This function can be NULL. + * + * @param handle The service handle + * @param properties The scope and topic of the sending publisher + * @param messageType The fqn of the message + * @param msgTypeId The (local) type id of the message + * @param message The actual message pointer + * @param metadata The metadata of the message + * @return True if the send should continue. + */ bool (*preSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); + + /** + * @brief postSend will be called when a user called send on a pubsub/publisher, but after the message is "handed over" to the actual pubsub technology (i.e. TCP stack, shared memory, etc) + * + * This function can be NULL. + * + * @param handle The service handle + * @param properties The scope and topic of the sending publisher + * @param messageType The fqn of the message + * @param msgTypeId The (local) type id of the message + * @param message The actual message pointer + * @param metadata The metadata of the message + */ void (*postSend)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); + + /** + * @brief preReceive will be called when is message is received in a pubsub admin, but before the pubsub/subscriber callback is called. + * + * This function can be NULL. + * + * @param handle The service handle + * @param properties The scope and topic of the sending publisher + * @param messageType The fqn of the message + * @param msgTypeId The (local) type id of the message + * @param message The actual message pointer + * @param metadata The metadata of the message + * @return True if the pubsub/subsciber callback should be called. + */ bool (*preReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); + + /** + * @brief postReceive will be called when is message is received in a pubsub admin and is called after the pubsub/subscriber callback is called. + * + * This function can be NULL. + * + * @param handle The service handle + * @param properties The scope and topic of the sending publisher + * @param messageType The fqn of the message + * @param msgTypeId The (local) type id of the message + * @param message The actual message pointer + * @param metadata The metadata of the message + */ void (*postReceive)(void *handle, pubsub_interceptor_properties_t *properties, const char *messageType, const uint32_t msgTypeId, const void *message, const celix_properties_t *metadata); }; typedef struct pubsub_interceptor pubsub_interceptor_t; +#ifdef __cplusplus +} +#endif #endif //__PUBSUB_INTERCEPTOR_H diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h index 2334fb8..801fd35 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h @@ -19,6 +19,10 @@ #ifndef PUBSUB_INTERCEPTORS_HANDLER_H #define PUBSUB_INTERCEPTORS_HANDLER_H +#ifdef __cplusplus +} +#endif + #include <stdint.h> #include "celix_errno.h" @@ -31,9 +35,12 @@ typedef struct pubsub_interceptors_handler pubsub_interceptors_handler_t; celix_status_t pubsubInterceptorsHandler_create(celix_bundle_context_t *ctx, const char *scope, const char *topic, pubsub_interceptors_handler_t **handler); celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t *handler); -bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t **metadata); -void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata); -bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t **metadata); -void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata); +bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata); +void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata); +bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata); +void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata); +#ifdef __cplusplus +} +#endif #endif //PUBSUB_INTERCEPTORS_HANDLER_H diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h index 0982a0f..67d149f 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_listeners.h @@ -20,8 +20,11 @@ #ifndef PUBSUB_LISTENERS_H_ #define PUBSUB_LISTENERS_H_ -#include "celix_properties.h" +#ifdef __cplusplus +extern "C" { +#endif +#include "celix_properties.h" #define PUBSUB_DISCOVERED_ENDPOINT_LISTENER_SERVICE "pubsub_discovered_endpoint_listener" @@ -48,5 +51,7 @@ struct pubsub_announce_endpoint_listener { typedef struct pubsub_announce_endpoint_listener pubsub_announce_endpoint_listener_t; - +#ifdef __cplusplus +} +#endif #endif /* PUBSUB_LISTENERS_H_ */ diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h index a4ff06b..919b8f4 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h @@ -20,6 +20,10 @@ #ifndef PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_ #define PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_ +#ifdef __cplusplus +extern "C" { +#endif + #include "hash_map.h" #include "version.h" #include "celix_bundle.h" @@ -57,4 +61,7 @@ typedef struct pubsub_message_serialization_marker { void* handle; } pubsub_message_serialization_marker_t; +#ifdef __cplusplus +} +#endif #endif /* PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_ */ diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h index 09df5a5..c47d9eb 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h @@ -20,6 +20,10 @@ #ifndef PUBSUB_MESSAGE_SERIALIZATION_SERVICE_H_ #define PUBSUB_MESSAGE_SERIALIZATION_SERVICE_H_ +#ifdef __cplusplus +extern "C" { +#endif + #include "hash_map.h" #include "version.h" #include "celix_bundle.h" @@ -92,4 +96,7 @@ typedef struct pubsub_message_serialization_service { } pubsub_message_serialization_service_t; +#ifdef __cplusplus +} +#endif #endif /* PUBSUB_MESSAGE_SERIALIZATION_SERVICE_H_ */ diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h index ad1a387..5fcf205 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h @@ -20,6 +20,10 @@ #ifndef PUBSUB_PROTOCOL_SERVICE_H_ #define PUBSUB_PROTOCOL_SERVICE_H_ +#ifdef __cplusplus +extern "C" { +#endif + #include <stdint.h> #include "celix_properties.h" @@ -234,4 +238,7 @@ typedef struct pubsub_protocol_service { celix_status_t (*decodeFooter)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message); } pubsub_protocol_service_t; +#ifdef __cplusplus +} +#endif #endif /* PUBSUB_PROTOCOL_SERVICE_H_ */ diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h index ed7208b..6e251b2 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_serializer.h @@ -20,6 +20,10 @@ #ifndef PUBSUB_SERIALIZER_SERVICE_H_ #define PUBSUB_SERIALIZER_SERVICE_H_ +#ifdef __cplusplus +extern "C" { +#endif + #include "hash_map.h" #include "version.h" #include "celix_bundle.h" @@ -61,4 +65,7 @@ typedef struct pubsub_serializer_service { } pubsub_serializer_service_t; +#ifdef __cplusplus +} +#endif #endif /* PUBSUB_SERIALIZER_SERVICE_H_ */ diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c index 7fd885f..331a41d 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c @@ -126,7 +126,7 @@ void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, __attr celixThreadMutex_unlock(&handler->lock); } -bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t **metadata) { +bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata) { bool cont = true; celixThreadMutex_lock(&handler->lock); @@ -137,8 +137,9 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) { entry_t *entry = arrayList_get(handler->interceptors, i - 1); - - cont = entry->interceptor->preSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata); + if (entry->interceptor->preSend != NULL) { + cont = entry->interceptor->preSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata); + } if (!cont) { break; } @@ -149,31 +150,29 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl return cont; } -void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) { +void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) { celixThreadMutex_lock(&handler->lock); for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) { entry_t *entry = arrayList_get(handler->interceptors, i - 1); - - entry->interceptor->postSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata); + if (entry->interceptor->postSend != NULL) { + entry->interceptor->postSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata); + } } celixThreadMutex_unlock(&handler->lock); } -bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t **metadata) { +bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) { bool cont = true; celixThreadMutex_lock(&handler->lock); - if (*metadata == NULL && arrayList_size(handler->interceptors) > 0) { - *metadata = celix_properties_create(); - } - for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) { entry_t *entry = arrayList_get(handler->interceptors, i); - - cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata); + if (entry->interceptor->preReceive != NULL) { + cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata); + } if (!cont) { break; } @@ -184,13 +183,14 @@ bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *ha return cont; } -void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, const uint32_t messageId, const void *message, celix_properties_t *metadata) { +void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) { celixThreadMutex_lock(&handler->lock); for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) { entry_t *entry = arrayList_get(handler->interceptors, i); - - entry->interceptor->postReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata); + if (entry->interceptor->postReceive != NULL) { + entry->interceptor->postReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata); + } } celixThreadMutex_unlock(&handler->lock);
