This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/pubsub-interceptor-fix in repository https://gitbox.apache.org/repos/asf/celix.git
commit 3f0014b778b9abb3a2df04e9eca50eee5eca0fb0 Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Jul 26 19:04:19 2021 +0200 Removes interceptor support from psa zmq/tcp v1 (the behavior is different from v2) --- .../v1/src/pubsub_tcp_topic_receiver.c | 54 ++--- .../v1/src/pubsub_tcp_topic_sender.c | 87 ++++---- .../v1/src/pubsub_zmq_topic_receiver.c | 45 ++-- .../v1/src/pubsub_zmq_topic_sender.c | 232 ++++++++++----------- 4 files changed, 182 insertions(+), 236 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c index 4178b50..aeda1c3 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_receiver.c @@ -34,7 +34,6 @@ #include <uuid/uuid.h> #include <pubsub_admin_metrics.h> #include <pubsub_utils.h> -#include "pubsub_interceptors_handler.h" #include <celix_api.h> #ifndef UUID_STR_LEN @@ -64,7 +63,6 @@ struct pubsub_tcp_topic_receiver { bool isPassive; pubsub_tcpHandler_t *socketHandler; pubsub_tcpHandler_t *sharedSocketHandler; - pubsub_interceptors_handler_t *interceptorsHandler; struct { celix_thread_t thread; @@ -144,7 +142,6 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context receiver->protocol = protocol; receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024); receiver->topic = strndup(topic, 1024 * 1024); - receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*"); const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope); const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope); const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope); @@ -322,7 +319,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) { pubsub_tcpHandler_destroy(receiver->socketHandler); receiver->socketHandler = NULL; } - pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler); if (receiver->scope != NULL) { free(receiver->scope); } @@ -549,39 +545,31 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs } if (status == CELIX_SUCCESS) { - const char *msgType = msgSer->msgName; - uint32_t msgId = message->header.msgId; - celix_properties_t *metadata = message->metadata.metadata; - bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata); bool release = true; - if (cont) { - hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices); - while (hashMapIterator_hasNext(&iter)) { - pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); - svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release); - pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata); - if (!release) { - //receive function has taken ownership, deserialize again for new message - status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg); - if (status != CELIX_SUCCESS) { - L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", - msgSer->msgName, - receiver->scope == NULL ? "(null)" : receiver->scope, - receiver->topic); - break; - } - release = true; + hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices); + while (hashMapIterator_hasNext(&iter)) { + pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); + svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release); + if (!release) { + //receive function has taken ownership, deserialize again for new message + status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg); + if (status != CELIX_SUCCESS) { + L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", + msgSer->msgName, + receiver->scope == NULL ? "(null)" : receiver->scope, + receiver->topic); + break; } + release = true; } - pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata); - if (release) { - msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg); - } - if (message->metadata.metadata) { - celix_properties_destroy(message->metadata.metadata); - } - updateReceiveCount += 1; } + if (release) { + msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg); + } + if (message->metadata.metadata) { + celix_properties_destroy(message->metadata.metadata); + } + updateReceiveCount += 1; } else { updateSerError += 1; L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, diff --git a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c index 32a3328..7bea628 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/v1/src/pubsub_tcp_topic_sender.c @@ -34,7 +34,6 @@ #include <uuid/uuid.h> #include "celix_constants.h" #include <pubsub_utils.h> -#include "pubsub_interceptors_handler.h" #define TCP_BIND_MAX_RETRY 10 @@ -58,7 +57,6 @@ struct pubsub_tcp_topic_sender { bool metricsEnabled; pubsub_tcpHandler_t *socketHandler; pubsub_tcpHandler_t *sharedSocketHandler; - pubsub_interceptors_handler_t *interceptorsHandler; char *scope; char *topic; @@ -145,7 +143,6 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( if (uuid != NULL) { uuid_parse(uuid, sender->fwUUID); } - sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_TCP_ADMIN_TYPE, "*unknown*"); sender->isPassive = false; sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED); char *urls = NULL; @@ -303,7 +300,6 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) { celixThreadMutex_unlock(&sender->boundedServices.mutex); celixThreadMutex_destroy(&sender->boundedServices.mutex); - pubsubInterceptorsHandler_destroy(sender->interceptorsHandler); if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) { pubsub_tcpHandler_destroy(sender->socketHandler); sender->socketHandler = NULL; @@ -531,58 +527,47 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i clock_gettime(CLOCK_REALTIME, &serializationEnd); } - bool cont = false; - if (status == CELIX_SUCCESS) /*ser ok*/ { - cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata); + pubsub_protocol_message_t message; + message.metadata.metadata = NULL; + message.payload.payload = NULL; + message.payload.length = 0; + if (serializedIoVecOutput) { + message.payload.payload = serializedIoVecOutput->iov_base; + message.payload.length = serializedIoVecOutput->iov_len; } - if (cont) { - pubsub_protocol_message_t message; - message.metadata.metadata = NULL; - message.payload.payload = NULL; - message.payload.length = 0; - if (serializedIoVecOutput) { - message.payload.payload = serializedIoVecOutput->iov_base; - message.payload.length = serializedIoVecOutput->iov_len; + message.header.msgId = msgTypeId; + message.header.seqNr = entry->seqNr; + message.header.msgMajorVersion = entry->major; + message.header.msgMinorVersion = entry->minor; + message.header.payloadSize = 0; + message.header.payloadPartSize = 0; + message.header.payloadOffset = 0; + message.header.metadataSize = 0; + if (metadata != NULL) + message.metadata.metadata = metadata; + entry->seqNr++; + bool sendOk = true; + { + int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0); + if (rc < 0) { + status = -1; + sendOk = false; } - message.header.msgId = msgTypeId; - message.header.seqNr = entry->seqNr; - message.header.msgMajorVersion = entry->major; - message.header.msgMinorVersion = entry->minor; - message.header.payloadSize = 0; - message.header.payloadPartSize = 0; - message.header.payloadOffset = 0; - message.header.metadataSize = 0; - if (metadata != NULL) - message.metadata.metadata = metadata; - entry->seqNr++; - bool sendOk = true; - { - int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0); - if (rc < 0) { - status = -1; - sendOk = false; - } - pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata); - if (message.metadata.metadata) - celix_properties_destroy(message.metadata.metadata); - if (serializedIoVecOutput) { - entry->msgSer->freeSerializeMsg(entry->msgSer->handle, - serializedIoVecOutput, - serializedIoVecOutputLen); - serializedIoVecOutput = NULL; - } + if (message.metadata.metadata) + celix_properties_destroy(message.metadata.metadata); + if (serializedIoVecOutput) { + entry->msgSer->freeSerializeMsg(entry->msgSer->handle, + serializedIoVecOutput, + serializedIoVecOutputLen); + serializedIoVecOutput = NULL; } + } - if (sendOk) { - sendCountUpdate = 1; - } else { - sendErrorUpdate = 1; - L_WARN("[PSA_TCP_TS] Error sending msg. %s", strerror(errno)); - } + if (sendOk) { + sendCountUpdate = 1; } else { - serializationErrorUpdate = 1; - L_WARN("[PSA_TCP_TS] Error serialize message of type %s for scope/topic %s/%s", entry->msgSer->msgName, - sender->scope == NULL ? "(null)" : sender->scope, sender->topic); + sendErrorUpdate = 1; + L_WARN("[PSA_TCP_TS] Error sending msg. %s", strerror(errno)); } } else { //unknownMessageCountUpdate = 1; diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c index 62b2fbf..28146af 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_receiver.c @@ -39,8 +39,6 @@ #include <pubsub_utils.h> #include <celix_api.h> -#include "pubsub_interceptors_handler.h" - #include "celix_utils_api.h" #define PSA_ZMQ_RECV_TIMEOUT 1000 @@ -70,8 +68,6 @@ struct pubsub_zmq_topic_receiver { char *topic; bool metricsEnabled; - pubsub_interceptors_handler_t *interceptorsHandler; - void *zmqCtx; void *zmqSock; @@ -157,8 +153,6 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context receiver->topic = strndup(topic, 1024 * 1024); receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED); - receiver->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*"); - #ifdef BUILD_WITH_ZMQ_SECURITY char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); if (keys_bundle_dir == NULL) { @@ -334,8 +328,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) { zmq_close(receiver->zmqSock); zmq_ctx_term(receiver->zmqCtx); - pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler); - free(receiver->scope); free(receiver->topic); } @@ -522,33 +514,26 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec clock_gettime(CLOCK_REALTIME, &endSer); } if (status == CELIX_SUCCESS) { - - const char *msgType = msgSer->msgName; - uint32_t msgId = message->header.msgId; celix_properties_t *metadata = message->metadata.metadata; - bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, &metadata); bool release = true; - if (cont) { - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices); - while (hashMapIterator_hasNext(&iter2)) { - pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2); - svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, metadata, &release); - if (!release) { - //receive function has taken ownership deserialize again for new message - status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deserializedMsg); - if (status != CELIX_SUCCESS) { - L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); - break; - } - release = true; + hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices); + while (hashMapIterator_hasNext(&iter2)) { + pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2); + svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deserializedMsg, metadata, &release); + if (!release) { + //receive function has taken ownership deserialize again for new message + status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 0, &deserializedMsg); + if (status != CELIX_SUCCESS) { + L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); + break; } - pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deserializedMsg, metadata); + release = true; } - if (release) { - msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg); - } - updateReceiveCount += 1; } + if (release) { + msgSer->freeDeserializeMsg(msgSer->handle, deserializedMsg); + } + updateReceiveCount += 1; } else { updateSerError += 1; L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c index aba8893..19c4660 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_topic_sender.c @@ -32,7 +32,6 @@ #include "pubsub_psa_zmq_constants.h" #include <uuid/uuid.h> #include "celix_constants.h" -#include "pubsub_interceptors_handler.h" #define FIRST_SEND_DELAY_IN_SECONDS 2 #define ZMQ_BIND_MAX_RETRY 10 @@ -57,8 +56,6 @@ struct pubsub_zmq_topic_sender { bool metricsEnabled; bool zeroCopyEnabled; - pubsub_interceptors_handler_t *interceptorsHandler; - char *scope; char *topic; char *url; @@ -157,8 +154,6 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create( sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED); sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED); - sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, topic, PUBSUB_ZMQ_ADMIN_TYPE, "*unknown*"); - //setting up zmq socket for ZMQ TopicSender { #ifdef BUILD_WITH_ZMQ_SECURITY @@ -347,8 +342,6 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) { celixThreadMutex_destroy(&sender->boundedServices.mutex); - pubsubInterceptorsHandler_destroy(sender->interceptorsHandler); - if (sender->scope != NULL) { free(sender->scope); } @@ -573,138 +566,133 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co usleep(500); } - bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata); - if (cont) { + pubsub_protocol_message_t message; + message.payload.payload = serializedOutput->iov_base; + message.payload.length = serializedOutput->iov_len; - pubsub_protocol_message_t message; - message.payload.payload = serializedOutput->iov_base; - message.payload.length = serializedOutput->iov_len; + void *payloadData = NULL; + size_t payloadLength = 0; + entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength); - void *payloadData = NULL; - size_t payloadLength = 0; - entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength); + if (metadata != NULL) { + message.metadata.metadata = metadata; + entry->protSer->encodeMetadata(entry->protSer->handle, &message, &entry->metadataBuffer, &entry->metadataBufferSize); + } else { + message.metadata.metadata = NULL; + } - if (metadata != NULL) { - message.metadata.metadata = metadata; - entry->protSer->encodeMetadata(entry->protSer->handle, &message, &entry->metadataBuffer, &entry->metadataBufferSize); - } else { - message.metadata.metadata = NULL; + entry->protSer->encodeFooter(entry->protSer->handle, &message, &entry->footerBuffer, &entry->footerBufferSize); + + message.header.msgId = msgTypeId; + message.header.seqNr = entry->seqNr; + message.header.msgMajorVersion = 0; + message.header.msgMinorVersion = 0; + message.header.payloadSize = payloadLength; + message.header.metadataSize = entry->metadataBufferSize; + message.header.payloadPartSize = payloadLength; + message.header.payloadOffset = 0; + message.header.isLastSegment = 1; + message.header.convertEndianess = 0; + + // increase seqNr + entry->seqNr++; + + entry->protSer->encodeHeader(entry->protSer->handle, &message, &entry->headerBuffer, &entry->headerBufferSize); + + errno = 0; + bool sendOk; + + if (bound->parent->zeroCopyEnabled) { + + zmq_msg_t msg1; // Header + zmq_msg_t msg2; // Payload + zmq_msg_t msg3; // Metadata + zmq_msg_t msg4; // Footer + void *socket = zsock_resolve(sender->zmq.socket); + psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry)); + freeMsgEntry->msgSer = entry->msgSer; + freeMsgEntry->serializedOutput = serializedOutput; + freeMsgEntry->serializedOutputLen = serializedOutputLen; + + zmq_msg_init_data(&msg1, entry->headerBuffer, entry->headerBufferSize, psa_zmq_unlockData, entry); + //send header + int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE); + if (rc == -1) { + L_WARN("Error sending header msg. %s", strerror(errno)); + zmq_msg_close(&msg1); } - entry->protSer->encodeFooter(entry->protSer->handle, &message, &entry->footerBuffer, &entry->footerBufferSize); - - message.header.msgId = msgTypeId; - message.header.seqNr = entry->seqNr; - message.header.msgMajorVersion = 0; - message.header.msgMinorVersion = 0; - message.header.payloadSize = payloadLength; - message.header.metadataSize = entry->metadataBufferSize; - message.header.payloadPartSize = payloadLength; - message.header.payloadOffset = 0; - message.header.isLastSegment = 1; - message.header.convertEndianess = 0; - - // increase seqNr - entry->seqNr++; - - entry->protSer->encodeHeader(entry->protSer->handle, &message, &entry->headerBuffer, &entry->headerBufferSize); - - errno = 0; - bool sendOk; - - if (bound->parent->zeroCopyEnabled) { - - zmq_msg_t msg1; // Header - zmq_msg_t msg2; // Payload - zmq_msg_t msg3; // Metadata - zmq_msg_t msg4; // Footer - void *socket = zsock_resolve(sender->zmq.socket); - psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry)); - freeMsgEntry->msgSer = entry->msgSer; - freeMsgEntry->serializedOutput = serializedOutput; - freeMsgEntry->serializedOutputLen = serializedOutputLen; - - zmq_msg_init_data(&msg1, entry->headerBuffer, entry->headerBufferSize, psa_zmq_unlockData, entry); - //send header - int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE); + //send Payload + if (rc > 0) { + int flag = ((entry->metadataBufferSize > 0) || (entry->footerBufferSize > 0)) ? ZMQ_SNDMORE : 0; + zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry); + rc = zmq_msg_send(&msg2, socket, flag); if (rc == -1) { - L_WARN("Error sending header msg. %s", strerror(errno)); - zmq_msg_close(&msg1); - } - - //send Payload - if (rc > 0) { - int flag = ((entry->metadataBufferSize > 0) || (entry->footerBufferSize > 0)) ? ZMQ_SNDMORE : 0; - zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry); - rc = zmq_msg_send(&msg2, socket, flag); - if (rc == -1) { - L_WARN("Error sending payload msg. %s", strerror(errno)); - zmq_msg_close(&msg2); - } - } - - //send MetaData - if (rc > 0 && entry->metadataBufferSize > 0) { - int flag = (entry->footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0; - zmq_msg_init_data(&msg3, entry->metadataBuffer, entry->metadataBufferSize, NULL, NULL); - rc = zmq_msg_send(&msg3, socket, flag); - if (rc == -1) { - L_WARN("Error sending metadata msg. %s", strerror(errno)); - zmq_msg_close(&msg3); - } - } - - //send Footer - if (rc > 0 && entry->footerBufferSize > 0) { - zmq_msg_init_data(&msg4, entry->footerBuffer, entry->footerBufferSize, NULL, NULL); - rc = zmq_msg_send(&msg4, socket, 0); - if (rc == -1) { - L_WARN("Error sending footer msg. %s", strerror(errno)); - zmq_msg_close(&msg4); - } - } - - sendOk = rc > 0; - } else { - //no zero copy - zmsg_t *msg = zmsg_new(); - zmsg_addmem(msg, entry->headerBuffer, entry->headerBufferSize); - zmsg_addmem(msg, payloadData, payloadLength); - if (entry->metadataBufferSize > 0) { - zmsg_addmem(msg, entry->metadataBuffer, entry->metadataBufferSize); - } - if (entry->footerBufferSize > 0) { - zmsg_addmem(msg, entry->footerBuffer, entry->footerBufferSize); + L_WARN("Error sending payload msg. %s", strerror(errno)); + zmq_msg_close(&msg2); } - int rc = zmsg_send(&msg, sender->zmq.socket); - sendOk = rc == 0; + } - if (!sendOk) { - zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg + //send MetaData + if (rc > 0 && entry->metadataBufferSize > 0) { + int flag = (entry->footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0; + zmq_msg_init_data(&msg3, entry->metadataBuffer, entry->metadataBufferSize, NULL, NULL); + rc = zmq_msg_send(&msg3, socket, flag); + if (rc == -1) { + L_WARN("Error sending metadata msg. %s", strerror(errno)); + zmq_msg_close(&msg3); } + } - // Note: serialized Payload is deleted by serializer - if (payloadData && (payloadData != message.payload.payload)) { - free(payloadData); + //send Footer + if (rc > 0 && entry->footerBufferSize > 0) { + zmq_msg_init_data(&msg4, entry->footerBuffer, entry->footerBufferSize, NULL, NULL); + rc = zmq_msg_send(&msg4, socket, 0); + if (rc == -1) { + L_WARN("Error sending footer msg. %s", strerror(errno)); + zmq_msg_close(&msg4); } - - __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE); } - pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata); - if (message.metadata.metadata) { - celix_properties_destroy(message.metadata.metadata); + sendOk = rc > 0; + } else { + //no zero copy + zmsg_t *msg = zmsg_new(); + zmsg_addmem(msg, entry->headerBuffer, entry->headerBufferSize); + zmsg_addmem(msg, payloadData, payloadLength); + if (entry->metadataBufferSize > 0) { + zmsg_addmem(msg, entry->metadataBuffer, entry->metadataBufferSize); } - if (!bound->parent->zeroCopyEnabled && serializedOutput) { - entry->msgSer->freeSerializeMsg(entry->msgSer->handle, serializedOutput, serializedOutputLen); + if (entry->footerBufferSize > 0) { + zmsg_addmem(msg, entry->footerBuffer, entry->footerBufferSize); } + int rc = zmsg_send(&msg, sender->zmq.socket); + sendOk = rc == 0; - if (sendOk) { - sendCountUpdate = 1; - } else { - sendErrorUpdate = 1; - L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno)); + if (!sendOk) { + zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg + } + + // Note: serialized Payload is deleted by serializer + if (payloadData && (payloadData != message.payload.payload)) { + free(payloadData); } + + __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE); + } + + if (message.metadata.metadata) { + celix_properties_destroy(message.metadata.metadata); + } + if (!bound->parent->zeroCopyEnabled && serializedOutput) { + entry->msgSer->freeSerializeMsg(entry->msgSer->handle, serializedOutput, serializedOutputLen); + } + + if (sendOk) { + sendCountUpdate = 1; + } else { + sendErrorUpdate = 1; + L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno)); } } else { serializationErrorUpdate = 1;
