This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2 in repository https://gitbox.apache.org/repos/asf/celix.git
commit c6c7131cbf6a4629ff5d8fb86a750a9659de28ed Author: Roy Bulter <[email protected]> AuthorDate: Wed Aug 12 21:59:22 2020 +0200 Add --- CMakeLists.txt | 2 +- .../publisher/private/src/pubsub_publisher.c | 24 +- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 469 +++++++++++++++------ .../pubsub_admin_tcp/src/pubsub_tcp_handler.h | 1 + .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 1 + .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 2 + .../src/pubsub_wire_protocol_common.c | 6 +- .../src/pubsub_wire_v2_protocol_impl.c | 6 +- .../pubsub/pubsub_spi/include/pubsub_protocol.h | 4 +- 9 files changed, 364 insertions(+), 151 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9ede833..788200f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,7 +47,7 @@ set(CMAKE_C_FLAGS "-Wall -Werror ${CMAKE_C_FLAGS}") # Set C++ specific flags set(CMAKE_CXX_FLAGS "-std=c++11 -fno-rtti ${CMAKE_CXX_FLAGS}") -set(CMAKE_CXX_FLAGS "-Wall -Werror -Wextra -Weffc++ ${CMAKE_CXX_FLAGS}") +#set(CMAKE_CXX_FLAGS "-Wall -Werror -Wextra -Weffc++ ${CMAKE_CXX_FLAGS}") if(APPLE) set(CMAKE_MACOSX_RPATH 1) diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c index 03857dd..92e0e65 100644 --- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c +++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c @@ -54,15 +54,16 @@ static void* send_thread(void* arg) { //poi_t point = calloc(1,sizeof(*point)); location_t place = calloc(1, sizeof(*place)); - char *desc = calloc(64, sizeof(char)); - snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self()); + char *desc = calloc(1, sizeof(char)); //calloc(64, sizeof(char)); + //snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self()); - char *name = calloc(64, sizeof(char)); - snprintf(name, 64, "Bundle#%ld", publisher->bundleId); + char *name = calloc(1, sizeof(char));//calloc(64, sizeof(char)); + //snprintf(name, 64, "Bundle#%ld", publisher->bundleId); place->name = name; place->description = desc; - place->extra = "extra value"; + //place->extra = "extra value"; + place->extra = calloc(1, 1); printf("TOPIC : %s\n", st_struct->topic); unsigned int msgId = 0; @@ -77,18 +78,21 @@ static void* send_thread(void* arg) { if (msgId > 0) { place->position.lat = randCoordinate(MIN_LAT, MAX_LAT); place->position.lon = randCoordinate(MIN_LON, MAX_LON); - int nr_char = (int) randCoordinate(5, 100000); + //int nr_char = (int) randCoordinate(5, 100000); + int nr_char = 1;//(int) randCoordinate(5, 20); place->data = calloc(nr_char, 1); for (int i = 0; i < (nr_char - 1); i++) { place->data[i] = i % 10 + '0'; } place->data[nr_char - 1] = '\0'; if (publish_svc->send) { - celix_properties_t *metadata = celix_properties_create(); - celix_properties_set(metadata, "Key", "Value"); + celix_properties_t *metadata = NULL; + //celix_properties_t *metadata = celix_properties_create(); + //celix_properties_set(metadata, "Key", "Value"); + if (publish_svc->send(publish_svc->handle, msgId, place, metadata) == 0) { - printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic, - place->position.lat, place->position.lon, place->name, place->description, nr_char); + // printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", st_struct->topic, + // place->position.lat, place->position.lon, place->name, place->description, nr_char); } } else { printf("No send for %s\n", st_struct->topic); diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 411e9d2..2749794 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -48,6 +48,7 @@ #define MAX_EVENTS 64 #define MAX_DEFAULT_BUFFER_SIZE 4u +#define PADDING_BUFFER_SIZE 4u #define READ_STATE_INIT 0u #define READ_STATE_HEADER 1u @@ -86,17 +87,24 @@ typedef struct psa_tcp_connection_entry { unsigned int maxMsgSize; unsigned int syncSize; unsigned int headerSize; - unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload - void *headerBuffer; - unsigned int footerSize; - void *footerBuffer; + unsigned int readHeaderBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload + void *readHeaderBuffer; + unsigned int writeHeaderBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload + void *writeHeaderBuffer; + unsigned int readFooterSize; + void *readFooterBuffer; + unsigned int writeFooterSize; + void *writeFooterBuffer; unsigned int bufferSize; void *buffer; unsigned int bufferReadReadOffset; unsigned int expectedBufferReadSize; unsigned int msgSizeReadSize; - unsigned int metaBufferSize; - void *metaBuffer; + size_t readMetaBufferSize; + void *readMetaBuffer; + size_t writeMetaBufferSize; + void *writeMetaBuffer; + void *writePaddingBuffer; unsigned int retryCount; } psa_tcp_connection_entry_t; @@ -130,6 +138,7 @@ struct pubsub_tcpHandler { double rcvTimeout; celix_thread_t thread; bool running; + bool isEndPoint; }; static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock); @@ -324,19 +333,22 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch handle->protocol->getHeaderSize(handle->protocol->handle, &size); entry->headerSize = size; handle->protocol->getHeaderBufferSize(handle->protocol->handle, &size); - entry->headerBufferSize = size; + entry->readHeaderBufferSize = size; + entry->writeHeaderBufferSize = size; handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size); entry->syncSize = size; entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : UINT32_MAX; handle->protocol->getFooterSize(handle->protocol->handle, &size); - entry->footerSize = size; + entry->readFooterSize = size; + entry->writeFooterSize = size; entry->bufferSize = handle->bufferSize; entry->connected = false; - if (entry->headerBufferSize) { - entry->headerBuffer = calloc(sizeof(char), entry->headerSize); - } - if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize); + if (entry->readHeaderBufferSize) entry->readHeaderBuffer = calloc(sizeof(char), entry->readHeaderBufferSize); + if (entry->writeHeaderBufferSize) entry->writeHeaderBuffer = calloc(sizeof(char), entry->writeHeaderBufferSize); + if (entry->readFooterSize) entry->readFooterBuffer = calloc(sizeof(char), entry->readFooterSize); + if (entry->writeFooterSize) entry->writeFooterBuffer = calloc(sizeof(char), entry->writeFooterSize); if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize); + entry->writePaddingBuffer = calloc(sizeof(char),PADDING_BUFFER_SIZE); pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); } return entry; @@ -348,40 +360,17 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) { if (entry) { - if (entry->url) { - free(entry->url); - entry->url = NULL; - } - if (entry->interface_url) { - free(entry->interface_url); - entry->interface_url = NULL; - } - if (entry->fd >= 0) { - close(entry->fd); - entry->fd = -1; - } - if (entry->buffer) { - free(entry->buffer); - entry->buffer = NULL; - entry->bufferSize = 0; - } - if (entry->headerBuffer) { - free(entry->headerBuffer); - entry->headerBuffer = NULL; - entry->headerBufferSize = 0; - } - - if (entry->footerBuffer) { - free(entry->footerBuffer); - entry->footerBuffer = NULL; - } - - if (entry->metaBuffer) { - free(entry->metaBuffer); - entry->metaBuffer = NULL; - entry->metaBufferSize = 0; - } - entry->connected = false; + if (entry->url) free(entry->url); + if (entry->interface_url) free(entry->interface_url); + if (entry->fd >= 0) close(entry->fd); + if (entry->buffer) free(entry->buffer); + if (entry->readHeaderBuffer) free(entry->readHeaderBuffer); + if (entry->writeHeaderBuffer) free(entry->writeHeaderBuffer); + if (entry->readFooterBuffer) free(entry->readFooterBuffer); + if (entry->writeFooterBuffer) free(entry->writeFooterBuffer); + if (entry->readMetaBuffer) free(entry->readMetaBuffer); + if (entry->writeMetaBuffer) free(entry->writeMetaBuffer); + if (entry->writePaddingBuffer) free(entry->writePaddingBuffer); free(entry); } } @@ -403,8 +392,7 @@ pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsign // int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { int rc = 0; - psa_tcp_connection_entry_t *entry = - hashMap_get(handle->connection_url_map, (void *) (intptr_t) url); + psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url); if (entry == NULL) { pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url); int fd = pubsub_tcpHandler_open(handle, url_info->interface_url); @@ -418,7 +406,7 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { if ((rc >= 0) && addr) { rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr)); if (rc < 0 && errno != EINPROGRESS) { - L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname, url_info->port_nr, interface_url, + L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err(%d): %s\n", url_info->hostname, url_info->port_nr, interface_url, errno, strerror(errno)); close(fd); } else { @@ -445,7 +433,7 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno)); entry = NULL; } - rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd); + //rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd); if (rc < 0) { pubsub_tcpHandler_freeEntry(entry); L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno)); @@ -652,7 +640,7 @@ int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int si if (handle != NULL) { celixThreadRwlock_writeLock(&handle->dbLock); handle->maxMsgSize = size; - handle->maxMsgSize = 4; + handle->maxMsgSize = 16; celixThreadRwlock_unlock(&handle->dbLock); } return 0; @@ -767,6 +755,14 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim } } +void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle,bool isEndPoint) { + if (handle != NULL) { + celixThreadRwlock_writeLock(&handle->dbLock); + handle->isEndPoint = isEndPoint; + celixThreadRwlock_unlock(&handle->dbLock); + } +} + static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, int flag ) { int long nbytes = entry->expectedBufferReadSize; @@ -775,8 +771,10 @@ int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection // Read the message header nbytes = recv(fd, &buffer[entry->bufferReadReadOffset], entry->expectedBufferReadSize, flag | MSG_NOSIGNAL); // Update buffer administration - entry->bufferReadReadOffset += nbytes; - entry->expectedBufferReadSize-= nbytes; + if (nbytes > 0) { + entry->bufferReadReadOffset += nbytes; + entry->expectedBufferReadSize -= nbytes; + } } return nbytes; } @@ -789,9 +787,8 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header); } if (entry->header.header.metadataSize > 0) { - handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer, - entry->header.header.metadataSize, &entry->header); - entry->metaBufferSize = entry->header.header.metadataSize; + handle->protocol->decodeMetadata(handle->protocol->handle, entry->readMetaBuffer, + entry->header.header.metadataSize, &entry->header); } if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) { struct timespec receiveTime; @@ -809,12 +806,12 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec entry->expectedBufferReadSize = entry->headerSize; entry->state = READ_STATE_HEADER; } else if (entry->state == READ_STATE_HEADER) { - if (entry->header.header.payloadSize) { + if (entry->header.header.payloadSize && entry->header.header.payloadPartSize) { entry->state = READ_STATE_PAYLOAD; entry->bufferReadReadOffset = entry->header.header.payloadOffset; - entry->expectedBufferReadSize = (entry->header.header.payloadPartSize) ? entry->header.header.payloadPartSize : entry->header.header.payloadSize; + entry->expectedBufferReadSize = entry->header.header.payloadPartSize; // For header less messages adjust offset and msg size; - if (!entry->headerBufferSize) { + if (!entry->readHeaderBufferSize) { entry->bufferReadReadOffset += entry->headerSize; entry->expectedBufferReadSize -= entry->headerSize; } @@ -822,10 +819,10 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec entry->state = READ_STATE_META; entry->expectedBufferReadSize = entry->header.header.metadataSize; } else if (!entry->header.header.payloadSize && !entry->header.header.metadataSize) { - if (entry->footerSize) { + if (entry->readFooterSize) { entry->state = READ_STATE_FOOTER; - entry->expectedBufferReadSize = entry->footerSize; - } else if (entry->header.header.isLastSegment) { + entry->expectedBufferReadSize = entry->readFooterSize; + } else if (entry->header.header.isLastSegment == 0x01) { entry->state = READ_STATE_READY; entry->expectedBufferReadSize = 0; } else { @@ -838,10 +835,10 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec entry->state = READ_STATE_META; entry->expectedBufferReadSize = entry->header.header.metadataSize; } else { - if (entry->footerSize) { + if (entry->readFooterSize) { entry->state = READ_STATE_FOOTER; - entry->expectedBufferReadSize = entry->footerSize; - } else if (entry->header.header.isLastSegment) { + entry->expectedBufferReadSize = entry->readFooterSize; + } else if (entry->header.header.isLastSegment == 0x01) { entry->state = READ_STATE_READY; entry->expectedBufferReadSize = 0; } else { @@ -850,10 +847,10 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec } } } else if (entry->state == READ_STATE_META) { - if (entry->footerSize) { + if (entry->readFooterSize) { entry->state = READ_STATE_FOOTER; - entry->expectedBufferReadSize = entry->footerSize; - } else if (entry->header.header.isLastSegment) { + entry->expectedBufferReadSize = entry->readFooterSize; + } else if (entry->header.header.isLastSegment == 0x01) { entry->state = READ_STATE_READY; entry->expectedBufferReadSize = 0; } else { @@ -861,7 +858,7 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec entry->expectedBufferReadSize = entry->headerSize; } } else if (entry->state == READ_STATE_FOOTER) { - if (entry->header.header.isLastSegment) { + if (entry->header.header.isLastSegment == 0x01) { entry->state = READ_STATE_READY; } else { entry->state = READ_STATE_HEADER; @@ -890,6 +887,170 @@ static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHand // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure // If the message is completely reassembled true is returned and the index and size have valid values // +int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) { + celixThreadRwlock_writeLock(&handle->dbLock); + psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd); + if (entry == NULL) + entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); + // Find FD entry + if (entry == NULL) { + celixThreadRwlock_unlock(&handle->dbLock); + return -1; + } + // If it's not connected return from function + if (!entry->connected) { + celixThreadRwlock_unlock(&handle->dbLock); + return -1; + } + + if (entry->readHeaderBufferSize && entry->readHeaderBuffer) entry->readHeaderBuffer = malloc(entry->readHeaderBufferSize); + + // Message buffer is to small, reallocate to make it bigger + if ((!entry->readHeaderBufferSize) && (entry->headerSize > entry->bufferSize)) { + handle->bufferSize = MAX(handle->bufferSize, entry->headerSize); + char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize); + if (buffer) { + entry->buffer = buffer; + entry->bufferSize = handle->bufferSize; + } + } + // Read the message + // Read the message + entry->msg.msg_iovlen = 0; + entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = (entry->headerBufferSize) ? entry->headerBuffer + : entry->buffer; + entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize; + entry->msg.msg_iovlen++; + int nbytes = recvmsg(fd, &entry->msg, MSG_PEEK | MSG_NOSIGNAL); + long int nbytes = UINT32_MAX; + char *header_buffer = (entry->readHeaderBufferSize) ? entry->readHeaderBuffer : entry->buffer; + if (entry->state == READ_STATE_SYNC) { + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0); + nbytes = recv(fd, header_buffer, entry->expectedBufferReadSize, flag | MSG_NOSIGNAL); + if (nbytes && (entry->expectedBufferReadSize <= 0)) { + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } + } + if (entry->state == READ_STATE_HEADER) { + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK); + if (nbytes && (entry->expectedBufferReadSize <= 0)) { // Check header message buffer + if (handle->protocol->decodeHeader(handle->protocol->handle, + header_buffer, + entry->headerSize, + &entry->header) != CELIX_SUCCESS) { + // Did not receive correct header + // skip sync word and try to read next header + if (!entry->headerError) { + L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); + } + entry->headerError = true; + pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_SYNC); + } else { + // Read header message from queue + pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0); + if ((nbytes > 0) && (nbytes == entry->headerSize)) { + entry->headerError = false; + entry->msgSizeReadSize = 0; + // For headerless message, add header to bufferReadSize; + if (!entry->readHeaderBufferSize) entry->msgSizeReadSize += nbytes; + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } + } + } + } + + if (entry->state == READ_STATE_PAYLOAD) { + // Alloc message buffers + if (entry->header.header.payloadSize > entry->bufferSize) { + handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize + PADDING_BUFFER_SIZE); + if (entry->buffer) { + free(entry->buffer); + } + entry->buffer = malloc((size_t) handle->bufferSize); + entry->bufferSize = handle->bufferSize; + } + + //if (entry->header.header.isLastSegment) entry->expectedBufferReadSize+=4; + // Read payload data from queue + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0); + if (nbytes && (entry->expectedBufferReadSize <= 0)) { + entry->msgSizeReadSize += nbytes; + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } + } + + if (entry->state == READ_STATE_META) { + if (entry->header.header.metadataSize > entry->readMetaBufferSize) { + if (entry->readMetaBuffer) { + free(entry->readMetaBuffer); + L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, + entry->url, entry->readMetaBufferSize, entry->header.header.metadataSize); + } + entry->readMetaBufferSize = entry->header.header.metadataSize + PADDING_BUFFER_SIZE; + entry->readMetaBuffer = malloc((size_t) entry->readMetaBufferSize); + } + + // Read meta data from (queue + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->readMetaBuffer,0); + if (nbytes && (entry->expectedBufferReadSize <= 0)) { + entry->msgSizeReadSize += nbytes; + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } + } + if (entry->state == READ_STATE_FOOTER) { + // Check for end of message using, footer of message. Because of streaming protocol + if (!entry->readFooterBuffer) entry->readFooterBuffer = malloc(entry->readFooterSize); + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->readFooterBuffer, 0); + if (nbytes && (entry->expectedBufferReadSize <= 0)) { + if (handle->protocol->decodeFooter(handle->protocol->handle, + entry->readFooterBuffer, + entry->readFooterSize, + &entry->header) == CELIX_SUCCESS) { + // valid footer, this means that the message is valid + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } else { + // Did not receive correct footer + L_ERROR( + "[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", + entry->header.header.seqNr, + entry->fd, + entry->url); + pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); + } + } + } + if (entry->state == READ_STATE_READY) { + // Complete message is received + pubsub_tcpHandler_decodePayload(handle, entry); + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } + + if (nbytes > 0) { + entry->retryCount = 0; + } else if (nbytes < 0) { + if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { + // Non blocking interrupt + entry->retryCount = 0; + } else if (entry->retryCount < handle->maxRcvRetryCount) { + entry->retryCount++; + L_WARN( + "[TCP Socket] Failed to receive message (fd: %d), try again. error(%d): %s, Retry count %u of %u.", + entry->fd, errno, strerror(errno), entry->retryCount, handle->maxSendRetryCount); + } else { + L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s", + entry->fd, handle->maxRcvRetryCount, strerror(errno)); + nbytes = 0; //Return 0 as indicator to close the connection + } + } + celixThreadRwlock_unlock(&handle->dbLock); + return (int)nbytes; +} + +// +// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure +// If the message is completely reassembled true is returned and the index and size have valid values +// int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { celixThreadRwlock_writeLock(&handle->dbLock); psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd); @@ -906,8 +1067,10 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { return -1; } + if (entry->readHeaderBufferSize && entry->readHeaderBuffer) entry->readHeaderBuffer = malloc(entry->readHeaderBufferSize); + // Message buffer is to small, reallocate to make it bigger - if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) { + if ((!entry->readHeaderBufferSize) && (entry->headerSize > entry->bufferSize)) { handle->bufferSize = MAX(handle->bufferSize, entry->headerSize); if (entry->buffer) free(entry->buffer); @@ -916,16 +1079,16 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { } // Read the message long int nbytes = UINT32_MAX; - char *header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer; + char *header_buffer = (entry->readHeaderBufferSize) ? entry->readHeaderBuffer : entry->buffer; if (entry->state == READ_STATE_SYNC) { nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0); - if (nbytes > 0) { + if (nbytes && (entry->expectedBufferReadSize <= 0)) { pubsub_tcpHandler_setReadStateMachine(handle, entry); } } if (entry->state == READ_STATE_HEADER) { nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK); - if (nbytes >= entry->headerSize) { // Check header message buffer + if (nbytes && (entry->expectedBufferReadSize <= 0)) { // Check header message buffer if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, @@ -945,8 +1108,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { entry->headerError = false; entry->msgSizeReadSize = 0; // For headerless message, add header to bufferReadSize; - if (!entry->headerBufferSize) - entry->msgSizeReadSize += nbytes; + if (!entry->readHeaderBufferSize) entry->msgSizeReadSize += nbytes; pubsub_tcpHandler_setReadStateMachine(handle, entry); } } @@ -956,51 +1118,61 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { if (entry->state == READ_STATE_PAYLOAD) { // Alloc message buffers if (entry->header.header.payloadSize > entry->bufferSize) { - handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); - if (entry->buffer) + handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize + PADDING_BUFFER_SIZE); + if (entry->buffer) { free(entry->buffer); + } entry->buffer = malloc((size_t) handle->bufferSize); entry->bufferSize = handle->bufferSize; } + //if (entry->header.header.isLastSegment) entry->expectedBufferReadSize+=4; // Read payload data from queue nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0); - if (nbytes > 0) { - if (nbytes >= entry->header.header.payloadPartSize) { - entry->msgSizeReadSize += nbytes; - pubsub_tcpHandler_setReadStateMachine(handle, entry); - } + if (nbytes && (entry->expectedBufferReadSize <= 0)) { + entry->msgSizeReadSize += nbytes; + pubsub_tcpHandler_setReadStateMachine(handle, entry); } } if (entry->state == READ_STATE_META) { - if (entry->header.header.metadataSize > entry->metaBufferSize) { - if (entry->metaBuffer) { - free(entry->metaBuffer); - entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize); - entry->metaBufferSize = entry->header.header.metadataSize; + if (entry->header.header.metadataSize > entry->readMetaBufferSize) { + if (entry->readMetaBuffer) { + free(entry->readMetaBuffer); L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, - entry->url, entry->metaBufferSize, entry->header.header.metadataSize); + entry->url, entry->readMetaBufferSize, entry->header.header.metadataSize); } + entry->readMetaBufferSize = entry->header.header.metadataSize + PADDING_BUFFER_SIZE; + entry->readMetaBuffer = malloc((size_t) entry->readMetaBufferSize); } // Read meta data from (queue - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0); - if ((nbytes > 0) && (nbytes >= entry->header.header.metadataSize)) { + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->readMetaBuffer,0); + if (nbytes && (entry->expectedBufferReadSize <= 0)) { entry->msgSizeReadSize += nbytes; pubsub_tcpHandler_setReadStateMachine(handle, entry); } } if (entry->state == READ_STATE_FOOTER) { // Check for end of message using, footer of message. Because of streaming protocol - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0); - if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) { - // valid footer, this means that the message is valid - pubsub_tcpHandler_setReadStateMachine(handle, entry); - } else { - // Did not receive correct header - L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url); - pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); + if (!entry->readFooterBuffer) entry->readFooterBuffer = malloc(entry->readFooterSize); + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->readFooterBuffer, 0); + if (nbytes && (entry->expectedBufferReadSize <= 0)) { + if (handle->protocol->decodeFooter(handle->protocol->handle, + entry->readFooterBuffer, + entry->readFooterSize, + &entry->header) == CELIX_SUCCESS) { + // valid footer, this means that the message is valid + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } else { + // Did not receive correct footer + L_ERROR( + "[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", + entry->header.header.seqNr, + entry->fd, + entry->url); + pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); + } } } if (entry->state == READ_STATE_READY) { @@ -1114,7 +1286,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message int nofConnToClose = 0; if (handle) { hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map); - size_t max_msg_iov_len = IOV_MAX - 2; + size_t max_msg_iov_len = IOV_MAX - 3; // header , footer, padding while (hashMapIterator_hasNext(&iter)) { psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter); if (!entry->connected) continue; @@ -1136,11 +1308,13 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message void *metadataData = NULL; size_t metadataSize = 0; if (message->metadata.metadata) { - metadataData = entry->metaBuffer; + metadataSize = entry->writeMetaBufferSize; + metadataData = entry->writeMetaBuffer; handle->protocol->encodeMetadata(handle->protocol->handle, message, &metadataData, &metadataSize); - entry->metaBufferSize = metadataSize; + entry->writeMetaBufferSize = MAX(metadataSize, entry->writeMetaBufferSize); + if (metadataData && entry->writeMetaBuffer != metadataData) entry->writeMetaBuffer = metadataData; } message->header.metadataSize = metadataSize; size_t totalMessageSize = payloadSize + metadataSize; @@ -1156,15 +1330,19 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message void *footerData = NULL; size_t footerDataSize = 0; - if (entry->footerSize) { - footerData = entry->footerBuffer; + if (entry->writeFooterSize) { + footerDataSize = entry->writeFooterSize; + footerData = entry->writeFooterBuffer; handle->protocol->encodeFooter(handle->protocol->handle, message, &footerData, &footerDataSize); - entry->footerSize = footerDataSize; + entry->writeFooterSize = MAX(footerDataSize, entry->writeFooterSize); + if (footerData && entry->writeFooterBuffer != footerData) entry->writeFooterBuffer = footerData; } size_t msgSize = 0; + size_t msgPayloadSize = 0; + size_t msgMetaDataSize = 0; size_t msgIovLen = 0; long int nbytes = UINT32_MAX; while (msgSize < totalMessageSize && nbytes > 0) { @@ -1176,22 +1354,26 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message msg.msg_flags = flags; msg.msg_iov = msg_iov; size_t msgPartSize = 0; + size_t msgMetaDataPartSize = 0; message->header.payloadPartSize = 0; message->header.payloadOffset = 0; message->header.metadataSize = 0; + message->header.metadataPartSize = 0; + message->header.metadataOffset = 0; message->header.isLastSegment = 0; // Write generic seralized payload in vector buffer - if (msgSize < payloadSize) { + if (msgPayloadSize < payloadSize) { if (payloadSize && payloadData) { - char *payloadDataBuffer = payloadData; + char *buffer = payloadData; msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = &payloadDataBuffer[msgSize]; - msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgSize), entry->maxMsgSize); + msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadSize]; + msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadSize), entry->maxMsgSize); msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; message->header.payloadPartSize = msgPartSize; - message->header.payloadOffset = msgSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + message->header.payloadOffset = msgPayloadSize; + msgPayloadSize += msg.msg_iov[msg.msg_iovlen].iov_len; + msgSize = msgPayloadSize; } else { // copy serialized vector into vector buffer for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) { @@ -1202,23 +1384,42 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message break; msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; } + message->header.payloadOffset = msgPayloadSize; message->header.payloadPartSize = msgPartSize; - message->header.payloadOffset = msgSize; - msgSize += msgPartSize; + msgPayloadSize += message->header.payloadPartSize; + msgSize = msgPayloadSize; msgIovLen += (msg.msg_iovlen - 1); } + // iov structures are aligned to 4 bytes. + // Add padding to read correct footer. + //unsigned int padding = msgPartSize & ( 0x4-1 ); + //if (padding) { + // msg.msg_iovlen++; + // msg.msg_iov[msg.msg_iovlen].iov_len = 0x4 - padding; + // msg.msg_iov[msg.msg_iovlen].iov_base = entry->writePaddingBuffer; + // msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + // message->header.payloadPartSize= msgPartSize; + // } } // Write optional metadata in vector buffer - if ((msgSize >= payloadSize) && + if ((msgPayloadSize >= payloadSize) && + (msgMetaDataSize < metadataSize) && (msgPartSize < entry->maxMsgSize) && - (metadataSize && metadataData)) { - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = metadataData; - msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize; - msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; - message->header.metadataSize = metadataSize; - msgSize += metadataSize; + (msg.msg_iovlen < max_msg_iov_len)) { + if (metadataSize && metadataData) { + char *buffer = payloadData; + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgMetaDataSize]; + msg.msg_iov[msg.msg_iovlen].iov_len = MIN((metadataSize - msgMetaDataSize), entry->maxMsgSize); + msgMetaDataPartSize = msg.msg_iov[msg.msg_iovlen].iov_len; + msgPartSize += msgMetaDataPartSize; + message->header.metadataSize = metadataSize; + message->header.metadataPartSize = msgMetaDataPartSize; + message->header.metadataOffset = msgMetaDataSize; + msgMetaDataSize += msgMetaDataPartSize; + msgSize += msgMetaDataPartSize; + } } if (msgSize >= totalMessageSize) { message->header.isLastSegment = 0x1; @@ -1233,17 +1434,19 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message } void *headerData = NULL; - size_t headerSize = 0; + size_t headerSize = entry->writeHeaderBufferSize; // check if header is not part of the payload (=> headerBufferSize = 0)s - if (entry->headerBufferSize) { - headerData = entry->headerBuffer; + if (entry->writeHeaderBufferSize) { + headerSize = entry->writeHeaderBufferSize; + headerData = entry->writeHeaderBuffer; // Encode the header, with payload size and metadata size handle->protocol->encodeHeader(handle->protocol->handle, message, &headerData, &headerSize); - entry->headerBufferSize = headerSize; + entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize); + if (headerData && entry->writeHeaderBuffer != headerData) entry->writeHeaderBuffer = headerData; } - if (!entry->headerBufferSize) { + if (!entry->writeHeaderBufferSize) { // Skip header buffer, when header is part of payload; msg.msg_iov = &msg_iov[1]; } else if (headerSize && headerData) { @@ -1256,7 +1459,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message L_ERROR("[TCP Socket] No header buffer is generated"); msg.msg_iovlen = 0; } - nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgPartSize, flags); + //nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgPartSize, flags); + nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL); // When a specific socket keeps reporting errors can indicate a subscriber // which is not active anymore, the connection will remain until the retry // counter exceeds the maximum retry count. @@ -1279,20 +1483,10 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno)); } } - // Release data - if (headerData && headerData != entry->headerBuffer) { - free(headerData); - } // Note: serialized Payload is deleted by serializer if (payloadData && (payloadData != message->payload.payload)) { free(payloadData); } - if (metadataData && metadataData != entry->metaBuffer) { - free(metadataData); - } - if (footerData && footerData != entry->footerBuffer) { - free(footerData); - } } } } @@ -1357,7 +1551,8 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect #else struct epoll_event event; bzero(&event, sizeof(event)); // zero the struct - event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR; + event.events = EPOLLRDHUP | EPOLLERR; + if (handle->isEndPoint) event.events |= EPOLLIN; event.data.fd = entry->fd; // Register Read to epoll rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event); diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h index bb74387..a08911c 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h @@ -65,6 +65,7 @@ void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count); void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout); void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout); +void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle, bool isEndPoint); int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd); int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c index 7ae65d1..218b47b 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c @@ -208,6 +208,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt); pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout); pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize); + pubsub_tcpHandler_setEndPoint(sender->socketHandler, isEndpoint); } //setting up tcp socket for TCP TopicSender diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c index 1e95c0b..8c2c573 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c @@ -601,6 +601,8 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co message.header.metadataSize = entry->metadataBufferSize; message.header.payloadPartSize = payloadLength; message.header.payloadOffset = 0; + message.header.metadataPartSize = entry->metadataBufferSize; + message.header.metadataOffset = 0; message.header.isLastSegment = 1; message.header.convertEndianess = 0; diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c index 1d123d7..eb3755b 100644 --- a/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c +++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_lib/src/pubsub_wire_protocol_common.c @@ -192,7 +192,11 @@ celix_status_t pubsubProtocol_encodePayload(pubsub_protocol_message_t *message, celix_status_t pubsubProtocol_encodeMetadata(pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) { celix_status_t status = CELIX_SUCCESS; - + if (outLength == NULL) { + status = CELIX_ILLEGAL_ARGUMENT; + return status; + } + *outLength = (*outLength == 0) ? 1024 : *outLength; size_t lineMemoryLength = *outBuffer == NULL ? 1024 : *outLength; unsigned char *line = *outBuffer == NULL ? calloc(1, lineMemoryLength) : *outBuffer; size_t idx = 4; diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c index a05e095..f87cd7c 100644 --- a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c +++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v2/src/pubsub_wire_v2_protocol_impl.c @@ -46,7 +46,7 @@ celix_status_t pubsubProtocol_wire_v2_destroy(pubsub_protocol_wire_v2_t* protoco } celix_status_t pubsubProtocol_wire_v2_getHeaderSize(void* handle, size_t *length) { - *length = sizeof(int) * 9 + sizeof(short) * 2; // header + sync + version = 36 + *length = sizeof(int) * 11 + sizeof(short) * 2; // header + sync + version = 48 return CELIX_SUCCESS; } @@ -99,6 +99,8 @@ celix_status_t pubsubProtocol_wire_v2_encodeHeader(void *handle, pubsub_protocol idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.metadataSize); idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.payloadPartSize); idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.payloadOffset); + idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.metadataPartSize); + idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.metadataOffset); idx = pubsubProtocol_writeInt(*outBuffer, idx, convert, message->header.isLastSegment); *outLength = idx; } @@ -160,6 +162,8 @@ celix_status_t pubsubProtocol_wire_v2_decodeHeader(void* handle, void *data, siz idx = pubsubProtocol_readInt(data, idx, convert, &message->header.metadataSize); idx = pubsubProtocol_readInt(data, idx, convert, &message->header.payloadPartSize); idx = pubsubProtocol_readInt(data, idx, convert, &message->header.payloadOffset); + idx = pubsubProtocol_readInt(data, idx, convert, &message->header.metadataPartSize); + idx = pubsubProtocol_readInt(data, idx, convert, &message->header.metadataOffset); pubsubProtocol_readInt(data, idx, convert, &message->header.isLastSegment); } } diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h index ad1a387..2c23b6f 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h @@ -47,11 +47,13 @@ struct pubsub_protocol_header { * Note: this attribute is transmitted using the wire protocol, the sync word is used to determine endianess conversion */ uint32_t convertEndianess; - /** Optional message segmentation attributes, these attributes are only used/written by the protocol admin. + /** pptional message segmentation attributes, these attributes are only used/written by the protocol admin. * When message segmentation is supported by the protocol admin */ uint32_t seqNr; uint32_t payloadPartSize; uint32_t payloadOffset; + uint32_t metadataPartSize; + uint32_t metadataOffset; uint32_t isLastSegment; };
