This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/proposal_protocol_footer in repository https://gitbox.apache.org/repos/asf/celix.git
commit 864ad47c009bde48bb3747fef9b6a858b7b1edd0 Author: Roy Bulter <[email protected]> AuthorDate: Mon Jun 8 21:30:46 2020 +0200 Add Footer to ZMQ --- .../src/pubsub_zmq_topic_receiver.c | 7 ++- .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 35 ++++++++++----- .../gtest/src/PS_WP_tests.cc | 52 ++++++++++++++++++++++ .../src/pubsub_wire_protocol_common.h | 2 +- .../src/pubsub_wire_protocol_impl.c | 18 ++------ 5 files changed, 85 insertions(+), 29 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c index 0ec9d7a..bac104d 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c @@ -628,8 +628,8 @@ static void* psa_zmq_recvThread(void * data) { zmsg_t *zmsg = zmsg_recv(receiver->zmqSock); if (zmsg != NULL) { - if (zmsg_size(zmsg) < 2) { - L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata)), got %i frames", (int)zmsg_size(zmsg)); + if (zmsg_size(zmsg) < 3) { + L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata) + footer), got %i frames", (int)zmsg_size(zmsg)); } else { zframe_t *header = zmsg_pop(zmsg); // header zframe_t *payload = NULL; @@ -650,6 +650,8 @@ static void* psa_zmq_recvThread(void * data) { } else { message.metadata.metadata = NULL; } + zframe_t *footer = zmsg_pop(zmsg); // footer + receiver->protocol->decodeFooter(receiver->protocol->handle, zframe_data(footer), zframe_size(footer), &message); if (header != NULL && payload != NULL) { struct timespec receiveTime; clock_gettime(CLOCK_REALTIME, &receiveTime); @@ -659,6 +661,7 @@ static void* psa_zmq_recvThread(void * data) { zframe_destroy(&header); zframe_destroy(&payload); zframe_destroy(&metadata); + zframe_destroy(&footer); } zmsg_destroy(&zmsg); } else { diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c index 91fab55..fe7e171 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c @@ -578,6 +578,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co if(metadataLength > 1000000) { L_WARN("ERR LARGE METADATA DETECTED\n"); } + void *footerData = NULL; + size_t footerLength = 0; + entry->protSer->encodeFooter(entry->protSer->handle, &message, &footerData, &footerLength); message.header.msgId = msgTypeId; message.header.msgMajorVersion = 0; @@ -601,6 +604,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co zmq_msg_t msg1; // Header zmq_msg_t msg2; // Payload zmq_msg_t msg3; // Metadata + zmq_msg_t msg4; // Footer void *socket = zsock_resolve(sender->zmq.socket); psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry)); freeMsgEntry->msgSer = entry->msgSer; @@ -617,16 +621,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co //send Payload if (rc > 0) { - if(metadataLength > 0) { - zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, NULL); - } else { - zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry); - } - int flags = 0; - if (metadataLength > 0) { - flags = ZMQ_SNDMORE; - } - rc = zmq_msg_send(&msg2, socket, flags); + zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry); + rc = zmq_msg_send(&msg2, socket, ZMQ_SNDMORE); if (rc == -1) { L_WARN("Error sending payload msg. %s", strerror(errno)); zmq_msg_close(&msg2); @@ -635,13 +631,24 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co //send MetaData if (rc > 0 && metadataLength > 0) { - zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, freeMsgEntry); - rc = zmq_msg_send(&msg3, socket, 0); + zmq_msg_init_data(&msg3, metadataData, metadataLength, psa_zmq_freeMsg, NULL); + rc = zmq_msg_send(&msg3, socket, ZMQ_SNDMORE); if (rc == -1) { L_WARN("Error sending metadata msg. %s", strerror(errno)); zmq_msg_close(&msg3); } } + + //send Footer + if (rc > 0) { + zmq_msg_init_data(&msg4, footerData, footerLength, psa_zmq_freeMsg, NULL); + rc = zmq_msg_send(&msg4, socket, 0); + if (rc == -1) { + L_WARN("Error sending footer msg. %s", strerror(errno)); + zmq_msg_close(&msg4); + } + } + celixThreadMutex_unlock(&sender->zmq.mutex); sendOk = rc > 0; @@ -653,6 +660,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co if (metadataLength > 0) { zmsg_addmem(msg, metadataData, metadataLength); } + zmsg_addmem(msg, footerData, footerLength); celixThreadMutex_lock(&sender->zmq.mutex); int rc = zmsg_send(&msg, sender->zmq.socket); celixThreadMutex_unlock(&sender->zmq.mutex); @@ -672,6 +680,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co if (metadataData) { free(metadataData); } + if (footerData) { + free(footerData); + } } pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata); diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc b/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc index 5b9f8a0..5444d47 100644 --- a/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc +++ b/bundles/pubsub/pubsub_protocol_wire_v1/gtest/src/PS_WP_tests.cc @@ -253,3 +253,55 @@ TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeMetadata_SpecialChars_Test) pubsubProtocol_destroy(wireprotocol); } +TEST_F(WireProtocolV1Test, WireProtocolV1Test_EncodeFooter_Test) { // NOLINT(cert-err58-cpp) + pubsub_protocol_wire_v1_t *wireprotocol; + pubsubProtocol_create(&wireprotocol); + + pubsub_protocol_message_t message; + + void *footerData = nullptr; + size_t footerLength = 0; + celix_status_t status = pubsubProtocol_encodeFooter(nullptr, &message, &footerData, &footerLength); + + unsigned char exp[4]; + uint32_t s = 0xBAABABBA; + memcpy(exp, &s, sizeof(uint32_t)); + ASSERT_EQ(status, CELIX_SUCCESS); + ASSERT_EQ(4, footerLength); + for (int i = 0; i < 4; i++) { + ASSERT_EQ(((unsigned char*) footerData)[i], exp[i]); + } + + pubsubProtocol_destroy(wireprotocol); + free(footerData); +} + +TEST_F(WireProtocolV1Test, WireProtocolV1Test_DecodeFooter_Test) { // NOLINT(cert-err58-cpp) + pubsub_protocol_wire_v1_t *wireprotocol; + pubsubProtocol_create(&wireprotocol); + + unsigned char exp[4]; + uint32_t s = 0xBAABABBA; + memcpy(exp, &s, sizeof(uint32_t)); + pubsub_protocol_message_t message; + + celix_status_t status = pubsubProtocol_decodeFooter(nullptr, exp, 4, &message); + + ASSERT_EQ(CELIX_SUCCESS, status); + pubsubProtocol_destroy(wireprotocol); +} + +TEST_F(WireProtocolV1Test, WireProtocolV1Test_WireProtocolV1Test_DecodeFooter_IncorrectSync_Test) { // NOLINT(cert-err58-cpp) + pubsub_protocol_wire_v1_t *wireprotocol; + pubsubProtocol_create(&wireprotocol); + + unsigned char exp[24]; + uint32_t s = 0xABBABAAB; + memcpy(exp, &s, sizeof(uint32_t)); + pubsub_protocol_message_t message; + + celix_status_t status = pubsubProtocol_decodeFooter(nullptr, exp, 4, &message); + ASSERT_EQ(CELIX_ILLEGAL_ARGUMENT, status); + + pubsubProtocol_destroy(wireprotocol); +} \ No newline at end of file diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h index ec946a8..c9bc70e 100644 --- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h +++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h @@ -26,7 +26,7 @@ extern "C" { #endif -static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB; +static const unsigned int PROTOCOL_WIRE_SYNC_HEADER = 0xABBABAAB; static const unsigned int PROTOCOL_WIRE_SYNC_FOOTER = 0xBAABABBA; static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1; diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c index 97d7fdd..3da2094 100644 --- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c +++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c @@ -78,7 +78,7 @@ celix_status_t pubsubProtocol_getSyncHeaderSize(void* handle, size_t *length) { } celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) { - writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC); + writeInt(syncHeader, 0, PROTOCOL_WIRE_SYNC_HEADER); return CELIX_SUCCESS; } @@ -97,10 +97,6 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message size_t headerSize = 0; pubsubProtocol_getHeaderSize(handle, &headerSize); - // Get HeaderSize - size_t footerSize = 0; - pubsubProtocol_getFooterSize(handle, &footerSize); - if (*outBuffer == NULL) { *outBuffer = calloc(1, headerSize); *outLength = headerSize; @@ -109,14 +105,13 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message status = CELIX_ENOMEM; } else { int idx = 0; - idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC); + idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC_HEADER); idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_ENVELOPE_VERSION); idx = writeInt(*outBuffer, idx, message->header.msgId); idx = writeShort(*outBuffer, idx, message->header.msgMajorVersion); idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion); idx = writeInt(*outBuffer, idx, message->header.payloadSize); idx = writeInt(*outBuffer, idx, message->header.metadataSize); - idx = writeInt(*outBuffer, idx, footerSize); *outLength = idx; } @@ -211,7 +206,7 @@ celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t leng if (length == headerSize) { unsigned int sync; idx = readInt(data, idx, &sync); - if (sync != PROTOCOL_WIRE_SYNC) { + if (sync != PROTOCOL_WIRE_SYNC_HEADER) { status = CELIX_ILLEGAL_ARGUMENT; } else { unsigned int envelopeVersion; @@ -288,13 +283,8 @@ celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t leng pubsubProtocol_getFooterSize(handle, &footerSize); if (length == footerSize) { unsigned int footerSync; - unsigned int footerSyncValue = PROTOCOL_WIRE_SYNC_FOOTER; - unsigned int headerSyncValue = PROTOCOL_WIRE_SYNC; idx = readInt(data, idx, &footerSync); - if (footerSync != footerSyncValue) { - status = CELIX_ILLEGAL_ARGUMENT; - } - if (footerSync != headerSyncValue) { + if (footerSync != PROTOCOL_WIRE_SYNC_FOOTER) { status = CELIX_ILLEGAL_ARGUMENT; } } else {
