This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/protect_zmq_socket in repository https://gitbox.apache.org/repos/asf/celix.git
commit c0fd8d05a59b1f8d7258ce8804db9658a7e9796c Author: Pepijn Noltes <[email protected]> AuthorDate: Tue May 5 14:55:31 2020 +0200 Adds thread protection to the use of zmq socket. --- bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c index 1b5abcd..9923a14 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c @@ -508,7 +508,7 @@ pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_se } static void psa_zmq_freeMsg(void *msg, void *hint) { - if(hint) { + if (hint) { psa_zmq_zerocopy_free_entry *entry = hint; entry->msgSer->freeSerializeMsg(entry->msgSer->handle, entry->serializedOutput, entry->serializedOutputLen); free(entry); @@ -597,6 +597,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co bool sendOk; if (bound->parent->zeroCopyEnabled) { + celixThreadMutex_lock(&sender->zmq.mutex); zmq_msg_t msg1; // Header zmq_msg_t msg2; // Payload zmq_msg_t msg3; // Metadata @@ -641,16 +642,20 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co zmq_msg_close(&msg3); } } + celixThreadMutex_unlock(&sender->zmq.mutex); sendOk = rc > 0; } else { + //no zero copy zmsg_t *msg = zmsg_new(); zmsg_addmem(msg, headerData, headerLength); zmsg_addmem(msg, payloadData, payloadLength); if (metadataLength > 0) { zmsg_addmem(msg, metadataData, metadataLength); } + celixThreadMutex_lock(&sender->zmq.mutex); int rc = zmsg_send(&msg, sender->zmq.socket); + celixThreadMutex_unlock(&sender->zmq.mutex); sendOk = rc == 0; if (!sendOk) {
