This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/pubsub-interceptor-fix in repository https://gitbox.apache.org/repos/asf/celix.git
commit 461a2cea194c0bc203e03af38ce21a9a15d48182 Author: Pepijn Noltes <[email protected]> AuthorDate: Sun Jun 27 22:36:30 2021 +0200 Refactors v2 of pubsub tcp and zmq to only call interceptors callback once per receive. renames old psa to _v1. --- bundles/pubsub/CMakeLists.txt | 9 +- bundles/pubsub/integration/CMakeLists.txt | 4 +- bundles/pubsub/integration/src/tst_activator.c | 6 +- bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt | 22 +-- .../v1/src/pubsub_tcp_topic_receiver.c | 6 +- .../v2/src/pubsub_tcp_topic_receiver.c | 183 +++++++++------------ .../pubsub_admin_websocket/v1/CMakeLists.txt | 20 ++- .../v1/src/pubsub_websocket_topic_receiver.c | 5 +- bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt | 20 ++- .../v1/src/pubsub_zmq_topic_receiver.c | 59 +------ .../v2/src/pubsub_zmq_topic_receiver.c | 172 +++++++++---------- .../include/pubsub_interceptors_handler.h | 2 +- .../pubsub_spi/src/pubsub_interceptors_handler.c | 41 ++--- 13 files changed, 232 insertions(+), 317 deletions(-) diff --git a/bundles/pubsub/CMakeLists.txt b/bundles/pubsub/CMakeLists.txt index 9285195..482ee3a 100644 --- a/bundles/pubsub/CMakeLists.txt +++ b/bundles/pubsub/CMakeLists.txt @@ -21,14 +21,16 @@ if (PUBSUB) option(BUILD_PUBSUB_PSA_ZMQ "Build ZeroMQ PubSub Admin (LGPL License)" ON) if (BUILD_PUBSUB_PSA_ZMQ) option(BUILD_ZMQ_SECURITY "Build with security for ZeroMQ." OFF) - add_subdirectory(pubsub_admin_zmq/v1) + add_subdirectory(pubsub_admin_zmq/v1) #TODO option for v1 admins add_subdirectory(pubsub_admin_zmq/v2) + add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq_v1) #TODO move to config and set to v2 endif (BUILD_PUBSUB_PSA_ZMQ) option(BUILD_PUBSUB_PSA_TCP "Build TCP PubSub Admin" ON) if (BUILD_PUBSUB_PSA_TCP) - add_subdirectory(pubsub_admin_tcp/v1) + add_subdirectory(pubsub_admin_tcp/v1) #TODO option for v1 admins add_subdirectory(pubsub_admin_tcp/v2) + add_library(Celix::pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp_v1) #TODO move to config and set to v2 endif (BUILD_PUBSUB_PSA_TCP) option(BUILD_PUBSUB_PSA_UDP_MC "Build UDP MC PubSub Admin" ON) @@ -38,8 +40,9 @@ if (PUBSUB) option(BUILD_PUBSUB_PSA_WS "Build WebSocket PubSub Admin" ON) if (BUILD_PUBSUB_PSA_WS) - add_subdirectory(pubsub_admin_websocket/v1) + add_subdirectory(pubsub_admin_websocket/v1) #TODO option for v1 admins add_subdirectory(pubsub_admin_websocket/v2) + add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket_v1) #TODO move to config and set to v2 endif (BUILD_PUBSUB_PSA_WS) add_subdirectory(pubsub_api) diff --git a/bundles/pubsub/integration/CMakeLists.txt b/bundles/pubsub/integration/CMakeLists.txt index a134b74..9c9ce54 100644 --- a/bundles/pubsub/integration/CMakeLists.txt +++ b/bundles/pubsub/integration/CMakeLists.txt @@ -793,11 +793,11 @@ if (BUILD_PUBSUB_PSA_ZMQ) #configure topology manager and pubsub zmq, json serializer and wire protocol v2 bundles celix_get_bundle_file(Celix::pubsub_serializer_json PUBSUB_JSON_BUNDLE_FILE) celix_get_bundle_file(Celix::pubsub_topology_manager PUBSUB_TOPMAN_BUNDLE_FILE) - celix_get_bundle_file(Celix::pubsub_admin_zmq PUBSUB_ZMQ_BUNDLE_FILE) + celix_get_bundle_file(Celix::pubsub_admin_zmq_v2 PUBSUB_ZMQ_BUNDLE_FILE) celix_get_bundle_file(Celix::pubsub_protocol_wire_v1 PUBSUB_WIRE_BUNDLE_FILE) celix_get_bundle_file(pubsub_sut PUBSUB_PUBLISHER_BUNDLE_FILE) celix_get_bundle_file(pubsub_tst PUBSUB_SUBSCRIBER_BUNDLE_FILE) - add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_zmq Celix::pubsub_protocol_wire_v1 pubsub_sut pubsub_tst) + add_celix_bundle_dependencies(test_pubsub_interceptors_integration Celix::pubsub_serializer_json Celix::pubsub_topology_manager Celix::pubsub_admin_zmq_v2 Celix::pubsub_protocol_wire_v1 pubsub_sut pubsub_tst) target_compile_definitions(test_pubsub_interceptors_integration PRIVATE PUBSUB_JSON_BUNDLE_FILE="${PUBSUB_JSON_BUNDLE_FILE}" PUBSUB_TOPMAN_BUNDLE_FILE="${PUBSUB_TOPMAN_BUNDLE_FILE}" diff --git a/bundles/pubsub/integration/src/tst_activator.c b/bundles/pubsub/integration/src/tst_activator.c index 8ce13ac..4017326 100644 --- a/bundles/pubsub/integration/src/tst_activator.c +++ b/bundles/pubsub/integration/src/tst_activator.c @@ -86,7 +86,7 @@ celix_status_t bnd_stop(struct activator *act, celix_bundle_context_t *ctx) { CELIX_GEN_BUNDLE_ACTIVATOR(struct activator, bnd_start, bnd_stop) ; -static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata __attribute__((unused)), bool *release __attribute__((unused))) { +static int tst_receive(void *handle, const char * msgType __attribute__((unused)), unsigned int msgTypeId __attribute__((unused)), void * voidMsg, const celix_properties_t *metadata __attribute__((unused)), bool *release) { struct activator *act = handle; msg_t *msg = voidMsg; @@ -100,6 +100,10 @@ static int tst_receive(void *handle, const char * msgType __attribute__((unused) pthread_mutex_lock(&act->mutex); act->count1 += 1; pthread_mutex_unlock(&act->mutex); + + *release = false; + free(voidMsg); + return CELIX_SUCCESS; } diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt index 0314cb6..2f1bca8 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt +++ b/bundles/pubsub/pubsub_admin_tcp/v1/CMakeLists.txt @@ -15,10 +15,12 @@ # specific language governing permissions and limitations # under the License. +message(WARNING "PubSub TCP Admin V1 is deprecated, use PubSub TCP Websocket v2 instead") + find_package(UUID REQUIRED) -add_celix_bundle(celix_pubsub_admin_tcp - BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_tcp" +add_celix_bundle(celix_pubsub_admin_tcp_v1 + BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_tcp_v1" VERSION "1.0.0" GROUP "Celix/PubSub" SOURCES @@ -30,15 +32,15 @@ add_celix_bundle(celix_pubsub_admin_tcp src/pubsub_tcp_common.c ) -set_target_properties(celix_pubsub_admin_tcp PROPERTIES INSTALL_RPATH "$ORIGIN") -target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::pubsub_spi Celix::pubsub_utils) -target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::framework Celix::dfi Celix::log_helper) -target_include_directories(celix_pubsub_admin_tcp PRIVATE src) +set_target_properties(celix_pubsub_admin_tcp_v1 PROPERTIES INSTALL_RPATH "$ORIGIN") +target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils) +target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::framework Celix::dfi Celix::log_helper) +target_include_directories(celix_pubsub_admin_tcp_v1 PRIVATE src) # cmake find package UUID set the wrong include dir for OSX if (NOT APPLE) - target_link_libraries(celix_pubsub_admin_tcp PRIVATE UUID::lib) + target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE UUID::lib) endif() -install_celix_bundle(celix_pubsub_admin_tcp EXPORT celix COMPONENT pubsub) -target_link_libraries(celix_pubsub_admin_tcp PRIVATE Celix::shell_api) -add_library(Celix::pubsub_admin_tcp ALIAS celix_pubsub_admin_tcp) +install_celix_bundle(celix_pubsub_admin_tcp_v1 EXPORT celix COMPONENT pubsub) +target_link_libraries(celix_pubsub_admin_tcp_v1 PRIVATE Celix::shell_api) +add_library(Celix::pubsub_admin_tcp_v1 ALIAS celix_pubsub_admin_tcp_v1) diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c index b1b2c58..8acb8e2 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c @@ -560,9 +560,8 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release); pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata); - if (!release && hashMapIterator_hasNext(&iter)) { - //receive function has taken ownership and still more receive function to come .. - //deserialize again for new message + if (!release) { + //receive function has taken ownership, deserialize again for new message status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg); if (status != CELIX_SUCCESS) { L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", @@ -574,6 +573,7 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs release = true; } } + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata); if (release) { msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg); } diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c index a49ff23..ad321e8 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c @@ -79,7 +79,7 @@ struct pubsub_tcp_topic_receiver { long subscriberTrackerId; struct { celix_thread_mutex_t mutex; - hash_map_t *map; //key = bnd id, value = psa_tcp_subscriber_entry_t + hash_map_t *map; //key = long svc id, value = psa_tcp_subscriber_entry_t bool allInitialized; } subscribers; }; @@ -92,12 +92,12 @@ typedef struct psa_tcp_requested_connection_entry { } psa_tcp_requested_connection_entry_t; typedef struct psa_tcp_subscriber_entry { - hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t* + pubsub_subscriber_t* subscriberSvc; bool initialized; //true if the init function is called through the receive thread } psa_tcp_subscriber_entry_t; -static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); -static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); +static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props); +static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props); static void *psa_tcp_recvThread(void *data); static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver); static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver); @@ -229,8 +229,8 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; opts.filter.filter = buf; opts.callbackHandle = receiver; - opts.addWithOwner = pubsub_tcpTopicReceiver_addSubscriber; - opts.removeWithOwner = pubsub_tcpTopicReceiver_removeSubscriber; + opts.addWithProperties = pubsub_tcpTopicReceiver_addSubscriber; + opts.removeWithProperties = pubsub_tcpTopicReceiver_removeSubscriber; receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); } @@ -259,19 +259,11 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) { celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId); celixThreadMutex_lock(&receiver->subscribers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - hashMap_destroy(entry->subscriberServices, false, false); - free(entry); - } - } - hashMap_destroy(receiver->subscribers.map, false, false); + hashMap_destroy(receiver->subscribers.map, false, true); celixThreadMutex_unlock(&receiver->subscribers.mutex); celixThreadMutex_lock(&receiver->requestedConnections.mutex); - iter = hashMapIterator_construct(receiver->requestedConnections.map); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); while (hashMapIterator_hasNext(&iter)) { psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); if (entry != NULL) { @@ -394,11 +386,9 @@ void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receive celixThreadMutex_unlock(&receiver->requestedConnections.mutex); } -static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, - const celix_bundle_t *bnd) { +static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) { pubsub_tcp_topic_receiver_t *receiver = handle; - long bndId = celix_bundle_getId(bnd); long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1); const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL); if (receiver->scope == NULL) { @@ -415,62 +405,78 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const return; } - celixThreadMutex_lock(&receiver->subscribers.mutex); - psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *)bndId); - if (entry != NULL) { - hashMap_put(entry->subscriberServices, (void*)svcId, svc); - } else { - //new create entry - entry = calloc(1, sizeof(*entry)); - entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL); - entry->initialized = false; - receiver->subscribers.allInitialized = false; + psa_tcp_subscriber_entry_t *entry = calloc(1, sizeof(*entry)); + entry->subscriberSvc = svc; + entry->initialized = false; - hashMap_put(entry->subscriberServices, (void*)svcId, svc); - hashMap_put(receiver->subscribers.map, (void *)bndId, entry); - } + celixThreadMutex_lock(&receiver->subscribers.mutex); + hashMap_put(receiver->subscribers.map, (void*)svcId, entry); + receiver->subscribers.allInitialized = false; celixThreadMutex_unlock(&receiver->subscribers.mutex); } -static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props, - const celix_bundle_t *bnd) { +static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props) { pubsub_tcp_topic_receiver_t *receiver = handle; - long bndId = celix_bundle_getId(bnd); long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1); celixThreadMutex_lock(&receiver->subscribers.mutex); - psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId); - if (entry != NULL) { - hashMap_remove(entry->subscriberServices, (void*)svcId); - } - if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) { - //remove entry - hashMap_remove(receiver->subscribers.map, (void *) bndId); - hashMap_destroy(entry->subscriberServices, false, false); - free(entry); + psa_tcp_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void *)svcId); + free(entry); + celixThreadMutex_unlock(&receiver->subscribers.mutex); +} + +static void callReceivers(pubsub_tcp_topic_receiver_t *receiver, const char* msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release, const celix_properties_t* metadata) { + *release = true; + celixThreadMutex_lock(&receiver->subscribers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter); + if (entry != NULL && entry->subscriberSvc->receive != NULL) { + entry->subscriberSvc->receive(entry->subscriberSvc->handle, msgFqn, message->header.msgId, *msg, metadata, release); + if (!(*release)) { + //receive function has taken ownership, deserialize again for new message + struct iovec deSerializeBuffer; + deSerializeBuffer.iov_base = message->payload.payload; + deSerializeBuffer.iov_len = message->payload.length; + celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, + message->header.msgId, + message->header.msgMajorVersion, + message->header.msgMinorVersion, + &deSerializeBuffer, 0, msg); + if (status != CELIX_SUCCESS) { + L_WARN("[PSA_TCO_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, + receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); + break; + } + } + *release = true; + } } celixThreadMutex_unlock(&receiver->subscribers.mutex); } -static inline void -processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry, - const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) { - //NOTE receiver->subscribers.mutex locked +static inline void processMsg(void* handle, const pubsub_protocol_message_t *message, bool* releaseMsg, struct timespec *receiveTime) { + pubsub_tcp_topic_receiver_t *receiver = handle; - const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId); + const char *msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId); if (msgFqn == NULL) { L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId); return; } - void *deSerializedMsg = NULL; - bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion); + bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, + message->header.msgMajorVersion, + message->header.msgMinorVersion); if (validVersion) { struct iovec deSerializeBuffer; deSerializeBuffer.iov_base = message->payload.payload; deSerializeBuffer.iov_len = message->payload.length; - celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg); + celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, + message->header.msgMajorVersion, + message->header.msgMinorVersion, + &deSerializeBuffer, 0, &deSerializedMsg); + // When received payload pointer is the same as deserializedMsg, set ownership of pointer to topic receiver if (message->payload.payload == deSerializedMsg) { *releaseMsg = true; @@ -479,33 +485,15 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs if (status == CELIX_SUCCESS) { uint32_t msgId = message->header.msgId; celix_properties_t *metadata = message->metadata.metadata; - bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, &metadata); - bool release = true; + bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, + deSerializedMsg, &metadata); if (cont) { - hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices); - while (hashMapIterator_hasNext(&iter)) { - pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); - svc->receive(svc->handle, msgFqn, msgId, deSerializedMsg, message->metadata.metadata, &release); - pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata); - if (!release && hashMapIterator_hasNext(&iter)) { - //receive function has taken ownership and still more receive function to come .. - //deserialize again for new message - status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg); - if (status != CELIX_SUCCESS) { - L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", - msgFqn, - receiver->scope == NULL ? "(null)" : receiver->scope, - receiver->topic); - break; - } - release = true; - } - } + bool release; + callReceivers(receiver, msgFqn, message, &deSerializedMsg, &release, metadata); + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata); if (release) { - pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg); - } - if (message->metadata.metadata) { - celix_properties_destroy(message->metadata.metadata); + pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, + deSerializedMsg); } } } else { @@ -516,27 +504,13 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x", msgFqn, pubsub_serializerHandler_getSerializationType(receiver->serializerHandler), - (int)message->header.msgMajorVersion, - (int)message->header.msgMinorVersion, + (int) message->header.msgMajorVersion, + (int) message->header.msgMinorVersion, pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId), pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId)); } } -static void -processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime) { - pubsub_tcp_topic_receiver_t *receiver = handle; - celixThreadMutex_lock(&receiver->subscribers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - processMsgForSubscriberEntry(receiver, entry, message, release, receiveTime); - } - } - celixThreadMutex_unlock(&receiver->subscribers.mutex); -} - static void *psa_tcp_recvThread(void *data) { pubsub_tcp_topic_receiver_t *receiver = data; @@ -642,20 +616,15 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv while (hashMapIterator_hasNext(&iter)) { psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); if (!entry->initialized) { - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices); - while (hashMapIterator_hasNext(&iter2)) { - pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2); - int rc = 0; - if (svc != NULL && svc->init != NULL) { - rc = svc->init(svc->handle); - } - if (rc == 0) { - //note now only initialized on first subscriber entries added. - entry->initialized = true; - } else { - L_WARN("Cannot initialize subscriber svc. Got rc %i", rc); - allInitialized = false; - } + int rc = 0; + if (entry->subscriberSvc->init != NULL) { + rc = entry->subscriberSvc->init(entry->subscriberSvc->handle); + } + if (rc == 0) { + entry->initialized = true; + } else { + L_WARN("Cannot initialize subscriber svc. Got rc %i", rc); + allInitialized = false; } } } diff --git a/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt index f6397e0..021310f 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt +++ b/bundles/pubsub/pubsub_admin_websocket/v1/CMakeLists.txt @@ -15,11 +15,13 @@ # specific language governing permissions and limitations # under the License. +message(WARNING "PubSub Websocket Admin V1 is deprecated, use PubSub ZMQ Websocket v2 instead") + find_package(Jansson REQUIRED) find_package(UUID REQUIRED) -add_celix_bundle(celix_pubsub_admin_websocket - BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket" +add_celix_bundle(celix_pubsub_admin_websocket_v1 + BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_websocket_v1" VERSION "1.0.0" GROUP "Celix/PubSub" SOURCES @@ -30,16 +32,16 @@ add_celix_bundle(celix_pubsub_admin_websocket src/pubsub_websocket_common.c ) -set_target_properties(celix_pubsub_admin_websocket PROPERTIES INSTALL_RPATH "$ORIGIN") -target_link_libraries(celix_pubsub_admin_websocket PRIVATE +set_target_properties(celix_pubsub_admin_websocket_v1 PROPERTIES INSTALL_RPATH "$ORIGIN") +target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE Celix::framework Celix::dfi Celix::log_helper Celix::utils Celix::http_admin_api ) -target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::pubsub_spi Celix::pubsub_utils ) -target_include_directories(celix_pubsub_admin_websocket PRIVATE +target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils ) +target_include_directories(celix_pubsub_admin_websocket_v1 PRIVATE src ) -install_celix_bundle(celix_pubsub_admin_websocket EXPORT celix COMPONENT pubsub) -target_link_libraries(celix_pubsub_admin_websocket PRIVATE Celix::shell_api) -add_library(Celix::pubsub_admin_websocket ALIAS celix_pubsub_admin_websocket) +install_celix_bundle(celix_pubsub_admin_websocket_v1 EXPORT celix COMPONENT pubsub) +target_link_libraries(celix_pubsub_admin_websocket_v1 PRIVATE Celix::shell_api) +add_library(Celix::pubsub_admin_websocket_v1 ALIAS celix_pubsub_admin_websocket_v1) diff --git a/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c index 7d8cd00..7b2cfcc 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_websocket/v1/src/pubsub_websocket_topic_receiver.c @@ -500,9 +500,8 @@ static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_ while (hashMapIterator_hasNext(&iter)) { pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, NULL, &release); - if (!release && hashMapIterator_hasNext(&iter)) { - //receive function has taken ownership and still more receive function to come .. - //deserialize again for new message + if (!release) { + //receive function has taken ownership, deserialize again for new message status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deSerializedMsg); if (status != CELIX_SUCCESS) { L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt b/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt index 52723b9..bfd8eef 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt +++ b/bundles/pubsub/pubsub_admin_zmq/v1/CMakeLists.txt @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +message(WARNING "PubSub ZMQ Admin V1 is deprecated, use PubSub ZMQ Admin v2 instead") + find_package(ZMQ REQUIRED) find_package(CZMQ REQUIRED) find_package(Jansson REQUIRED) @@ -31,8 +33,8 @@ if (BUILD_ZMQ_SECURITY) set (ZMQ_CRYPTO_C "src/zmq_crypto.c") endif() -add_celix_bundle(celix_pubsub_admin_zmq - BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq" +add_celix_bundle(celix_pubsub_admin_zmq_v1 + BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq_v1" VERSION "1.1.0" GROUP "Celix/PubSub" SOURCES @@ -43,17 +45,17 @@ add_celix_bundle(celix_pubsub_admin_zmq ${ZMQ_CRYPTO_C} ) -set_target_properties(celix_pubsub_admin_zmq PROPERTIES INSTALL_RPATH "$ORIGIN") -target_link_libraries(celix_pubsub_admin_zmq PRIVATE +set_target_properties(celix_pubsub_admin_zmq_v1 PROPERTIES INSTALL_RPATH "$ORIGIN") +target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE Celix::framework Celix::dfi Celix::log_helper Celix::utils ZMQ::lib CZMQ::lib ${OPTIONAL_OPENSSL_LIB} ) -target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::pubsub_spi Celix::pubsub_utils ) -target_include_directories(celix_pubsub_admin_zmq PRIVATE +target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE Celix::pubsub_spi Celix::pubsub_utils ) +target_include_directories(celix_pubsub_admin_zmq_v1 PRIVATE src ) -install_celix_bundle(celix_pubsub_admin_zmq EXPORT celix COMPONENT pubsub) -target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::shell_api) -add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq) +install_celix_bundle(celix_pubsub_admin_zmq_v1 EXPORT celix COMPONENT pubsub) +target_link_libraries(celix_pubsub_admin_zmq_v1 PRIVATE Celix::shell_api) +add_library(Celix::celix_pubsub_admin_zmq_v1 ALIAS celix_pubsub_admin_zmq_v1) diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c index 11f2b8f..9cd99f4 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c @@ -533,10 +533,8 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec while (hashMapIterator_hasNext(&iter2)) { pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2); svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, metadata, &release); - pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata); - if (!release && hashMapIterator_hasNext(&iter2)) { - //receive function has taken ownership and still more receive function to come .. - //deserialize again for new message + if (!release) { + //receive function has taken ownership deserialize again for new message status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deserializedMsg); if (status != CELIX_SUCCESS) { L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); @@ -544,6 +542,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec } release = true; } + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata); } if (release) { msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg); @@ -558,58 +557,6 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec } else { L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", message->header.msgId); } - - if (msgSer != NULL && monitor) { - // TODO disabled for now, should move to an interceptor? -// hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t )message->header.msgId); -// char uuidStr[UUID_STR_LEN+1]; -// uuid_unparse(hdr->originUUID, uuidStr); -// psa_zmq_subscriber_metrics_entry_t *metrics = hashMap_get(origins, uuidStr); -// -// if (metrics == NULL) { -// metrics = calloc(1, sizeof(*metrics)); -// hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics); -// uuid_copy(metrics->origin, hdr->originUUID); -// metrics->msgTypeId = hdr->type; -// metrics->maxDelayInSeconds = -INFINITY; -// metrics->minDelayInSeconds = INFINITY; -// metrics->lastSeqNr = 0; -// } -// -// double diff = celix_difftime(&beginSer, &endSer); -// long n = metrics->nrOfMessagesReceived; -// metrics->averageSerializationTimeInSeconds = (metrics->averageSerializationTimeInSeconds * n + diff) / (n+1); -// -// diff = celix_difftime(&metrics->lastMessageReceived, receiveTime); -// n = metrics->nrOfMessagesReceived; -// if (metrics->nrOfMessagesReceived >= 1) { -// metrics->averageTimeBetweenMessagesInSeconds = (metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1); -// } -// metrics->lastMessageReceived = *receiveTime; -// -// -// int incr = hdr->seqNr - metrics->lastSeqNr; -// if (metrics->lastSeqNr >0 && incr > 1) { -// metrics->nrOfMissingSeqNumbers += (incr - 1); -// L_WARN("Missing message seq nr went from %i to %i", metrics->lastSeqNr, hdr->seqNr); -// } -// metrics->lastSeqNr = hdr->seqNr; -// -// struct timespec sendTime; -// sendTime.tv_sec = (time_t)hdr->sendtimeSeconds; -// sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the tv_nsec is not correct -// diff = celix_difftime(&sendTime, receiveTime); -// metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + diff) / (n+1); -// if (diff < metrics->minDelayInSeconds) { -// metrics->minDelayInSeconds = diff; -// } -// if (diff > metrics->maxDelayInSeconds) { -// metrics->maxDelayInSeconds = diff; -// } -// -// metrics->nrOfMessagesReceived += updateReceiveCount; -// metrics->nrOfSerializationErrors += updateSerError; - } } static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) { diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c index 90e9510..0fc2bc2 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c @@ -93,7 +93,7 @@ struct pubsub_zmq_topic_receiver { long subscriberTrackerId; struct { celix_thread_mutex_t mutex; - hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t + hash_map_t *map; //key = long svc id, value = psa_zmq_subscriber_entry_t bool allInitialized; } subscribers; }; @@ -105,13 +105,13 @@ typedef struct psa_zmq_requested_connection_entry { } psa_zmq_requested_connection_entry_t; typedef struct psa_zmq_subscriber_entry { - hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t* + pubsub_subscriber_t* subscriberSvc; bool initialized; //true if the init function is called through the receive thread } psa_zmq_subscriber_entry_t; -static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); -static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); +static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props); +static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props); static void* psa_zmq_recvThread(void * data); static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver); static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver); @@ -237,8 +237,8 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; opts.filter.filter = buf; opts.callbackHandle = receiver; - opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber; - opts.removeWithOwner = pubsub_zmqTopicReceiver_removeSubscriber; + opts.addWithProperties = pubsub_zmqTopicReceiver_addSubscriber; + opts.removeWithProperties = pubsub_zmqTopicReceiver_removeSubscriber; receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); } @@ -275,19 +275,11 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) { celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId); celixThreadMutex_lock(&receiver->subscribers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - hashMap_destroy(entry->subscriberServices, false, false); - free(entry); - } - } - hashMap_destroy(receiver->subscribers.map, false, false); + hashMap_destroy(receiver->subscribers.map, false, true); celixThreadMutex_unlock(&receiver->subscribers.mutex); celixThreadMutex_lock(&receiver->requestedConnections.mutex); - iter = hashMapIterator_construct(receiver->requestedConnections.map); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); while (hashMapIterator_hasNext(&iter)) { psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); if (entry != NULL) { @@ -384,10 +376,9 @@ void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receive celixThreadMutex_unlock(&receiver->requestedConnections.mutex); } -static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { +static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props) { pubsub_zmq_topic_receiver_t *receiver = handle; - long bndId = celix_bundle_getId(bnd); long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1); const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL); if (receiver->scope == NULL) { @@ -404,107 +395,104 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const return; } + psa_zmq_subscriber_entry_t *entry = calloc(1, sizeof(*entry)); + entry->subscriberSvc = svc; + entry->initialized = false; + celixThreadMutex_lock(&receiver->subscribers.mutex); - psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId); - if (entry != NULL) { - hashMap_put(entry->subscriberServices, (void*)svcId, svc); - } else { - //new create entry - entry = calloc(1, sizeof(*entry)); - entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL); - entry->initialized = false; - hashMap_put(entry->subscriberServices, (void*)svcId, svc); - hashMap_put(receiver->subscribers.map, (void*)bndId, entry); - } + hashMap_put(receiver->subscribers.map, (void*)svcId, entry); + receiver->subscribers.allInitialized = false; celixThreadMutex_unlock(&receiver->subscribers.mutex); } -static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { +static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props) { pubsub_zmq_topic_receiver_t *receiver = handle; - long bndId = celix_bundle_getId(bnd); long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1); celixThreadMutex_lock(&receiver->subscribers.mutex); - psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId); - if (entry != NULL) { - hashMap_remove(entry->subscriberServices, (void*)svcId); - } - if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) { - //remove entry - hashMap_remove(receiver->subscribers.map, (void*)bndId); - hashMap_destroy(entry->subscriberServices, false, false); - free(entry); + psa_zmq_subscriber_entry_t *entry = hashMap_remove(receiver->subscribers.map, (void*)svcId); + free(entry); + celixThreadMutex_unlock(&receiver->subscribers.mutex); +} + +static void callReceivers(pubsub_zmq_topic_receiver_t *receiver, const char* msgFqn, const pubsub_protocol_message_t *message, void** msg, bool* release, const celix_properties_t* metadata) { + *release = true; + celixThreadMutex_lock(&receiver->subscribers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); + while (hashMapIterator_hasNext(&iter)) { + psa_zmq_subscriber_entry_t* entry = hashMapIterator_nextValue(&iter); + if (entry != NULL && entry->subscriberSvc->receive != NULL) { + entry->subscriberSvc->receive(entry->subscriberSvc->handle, msgFqn, message->header.msgId, *msg, metadata, release); + if (!(*release)) { + //receive function has taken ownership, deserialize again for new message + struct iovec deSerializeBuffer; + deSerializeBuffer.iov_base = message->payload.payload; + deSerializeBuffer.iov_len = message->payload.length; + celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, + message->header.msgId, + message->header.msgMajorVersion, + message->header.msgMinorVersion, + &deSerializeBuffer, 0, msg); + if (status != CELIX_SUCCESS) { + L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, + receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); + break; + } + } + *release = true; + } } celixThreadMutex_unlock(&receiver->subscribers.mutex); } -static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) { - const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId); +static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) { + const char *msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId); if (msgFqn == NULL) { L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId); return; } - void *deserializedMsg = NULL; - bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion); + bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, + message->header.msgMajorVersion, + message->header.msgMinorVersion); if (validVersion) { struct iovec deSerializeBuffer; deSerializeBuffer.iov_base = message->payload.payload; - deSerializeBuffer.iov_len = message->payload.length; - celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg); + deSerializeBuffer.iov_len = message->payload.length; + celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, + message->header.msgMajorVersion, + message->header.msgMinorVersion, + &deSerializeBuffer, 0, &deserializedMsg); if (status == CELIX_SUCCESS) { uint32_t msgId = message->header.msgId; celix_properties_t *metadata = message->metadata.metadata; - bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata); - bool release = true; + bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, + deserializedMsg, &metadata); if (cont) { - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices); - while (hashMapIterator_hasNext(&iter2)) { - pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2); - svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release); - pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata); - if (!release && hashMapIterator_hasNext(&iter2)) { - //receive function has taken ownership and still more receive function to come .. - //deserialize again for new message - status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg); - if (status != CELIX_SUCCESS) { - L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); - break; - } - release = true; - } - } + bool release; + callReceivers(receiver, msgFqn, message, &deserializedMsg, &release, metadata); + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata); if (release) { - pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg); + pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, + deserializedMsg); } } } else { - L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); + L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, + receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); } } else { L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x", msgFqn, pubsub_serializerHandler_getSerializationType(receiver->serializerHandler), - (int)message->header.msgMajorVersion, - (int)message->header.msgMinorVersion, + (int) message->header.msgMajorVersion, + (int) message->header.msgMinorVersion, pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId), pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId)); } } -static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) { - celixThreadMutex_lock(&receiver->subscribers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); - while (hashMapIterator_hasNext(&iter)) { - psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - processMsgForSubscriberEntry(receiver, entry, message, receiveTime); - } - } - celixThreadMutex_unlock(&receiver->subscribers.mutex); -} - static void* psa_zmq_recvThread(void * data) { pubsub_zmq_topic_receiver_t *receiver = data; @@ -627,20 +615,16 @@ static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiv while (hashMapIterator_hasNext(&iter)) { psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); if (!entry->initialized) { - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices); - while (hashMapIterator_hasNext(&iter2)) { - pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2); - int rc = 0; - if (svc != NULL && svc->init != NULL) { - rc = svc->init(svc->handle); - } - if (rc == 0) { - //note now only initialized on first subscriber entries added. - entry->initialized = true; - } else { - L_WARN("Cannot initialize subscriber svc. Got rc %i", rc); - allInitialized = false; - } + int rc = 0; + if (entry->subscriberSvc != NULL && entry->subscriberSvc->init != NULL) { + rc = entry->subscriberSvc->init(entry->subscriberSvc->handle); + } + if (rc == 0) { + //note now only initialized on first subscriber entries added. + entry->initialized = true; + } else { + L_WARN("Cannot initialize subscriber svc. Got rc %i", rc); + allInitialized = false; } } } diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h index 801fd35..6595853 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_interceptors_handler.h @@ -37,7 +37,7 @@ celix_status_t pubsubInterceptorsHandler_destroy(pubsub_interceptors_handler_t * bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata); void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata); -bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata); +bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata); void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata); #ifdef __cplusplus diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c index 331a41d..6118fbe 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_interceptors_handler.c @@ -18,7 +18,8 @@ */ #include "celix_bundle_context.h" #include "celix_constants.h" -#include "utils.h" +#include "celix_array_list.h" +#include "celix_utils.h" #include "pubsub_interceptors_handler.h" @@ -91,8 +92,8 @@ void pubsubInterceptorsHandler_addInterceptor(void *handle, void *svc, const cel celixThreadMutex_lock(&handler->lock); bool exists = false; - for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) { - entry_t *entry = arrayList_get(handler->interceptors, i); + for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) { + entry_t *entry = celix_arrayList_get(handler->interceptors, i); if (entry->interceptor == svc) { exists = true; } @@ -114,11 +115,11 @@ void pubsubInterceptorsHandler_removeInterceptor(void *handle, void *svc, __attr celixThreadMutex_lock(&handler->lock); - for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) { - entry_t *entry = arrayList_get(handler->interceptors, i); + for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) { + entry_t *entry = celix_arrayList_get(handler->interceptors, i); if (entry->interceptor == svc) { - void *old = arrayList_remove(handler->interceptors, i); - free(old); + celix_arrayList_removeAt(handler->interceptors, i); + free(entry); break; } } @@ -131,12 +132,12 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl celixThreadMutex_lock(&handler->lock); - if (*metadata == NULL && arrayList_size(handler->interceptors) > 0) { + if (*metadata == NULL && celix_arrayList_size(handler->interceptors) > 0) { *metadata = celix_properties_create(); } - for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) { - entry_t *entry = arrayList_get(handler->interceptors, i - 1); + for (uint32_t i = celix_arrayList_size(handler->interceptors); i > 0; i--) { + entry_t *entry = celix_arrayList_get(handler->interceptors, i - 1); if (entry->interceptor->preSend != NULL) { cont = entry->interceptor->preSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata); } @@ -153,8 +154,8 @@ bool pubsubInterceptorHandler_invokePreSend(pubsub_interceptors_handler_t *handl void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) { celixThreadMutex_lock(&handler->lock); - for (uint32_t i = arrayList_size(handler->interceptors); i > 0; i--) { - entry_t *entry = arrayList_get(handler->interceptors, i - 1); + for (uint32_t i = celix_arrayList_size(handler->interceptors); i > 0; i--) { + entry_t *entry = celix_arrayList_get(handler->interceptors, i - 1); if (entry->interceptor->postSend != NULL) { entry->interceptor->postSend(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata); } @@ -163,15 +164,17 @@ void pubsubInterceptorHandler_invokePostSend(pubsub_interceptors_handler_t *hand celixThreadMutex_unlock(&handler->lock); } -bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) { +bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t **metadata) { bool cont = true; celixThreadMutex_lock(&handler->lock); - - for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) { - entry_t *entry = arrayList_get(handler->interceptors, i); + if (*metadata == NULL && celix_arrayList_size(handler->interceptors) > 0) { + *metadata = celix_properties_create(); + } + for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) { + entry_t *entry = celix_arrayList_get(handler->interceptors, i); if (entry->interceptor->preReceive != NULL) { - cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata); + cont = entry->interceptor->preReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, *metadata); } if (!cont) { break; @@ -186,8 +189,8 @@ bool pubsubInterceptorHandler_invokePreReceive(pubsub_interceptors_handler_t *ha void pubsubInterceptorHandler_invokePostReceive(pubsub_interceptors_handler_t *handler, const char *messageType, uint32_t messageId, const void *message, celix_properties_t *metadata) { celixThreadMutex_lock(&handler->lock); - for (uint32_t i = 0; i < arrayList_size(handler->interceptors); i++) { - entry_t *entry = arrayList_get(handler->interceptors, i); + for (uint32_t i = 0; i < celix_arrayList_size(handler->interceptors); i++) { + entry_t *entry = celix_arrayList_get(handler->interceptors, i); if (entry->interceptor->postReceive != NULL) { entry->interceptor->postReceive(entry->interceptor->handle, &handler->properties, messageType, messageId, message, metadata); }
