This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2 in repository https://gitbox.apache.org/repos/asf/celix.git
commit 75794733c84213d2a1c9eb63182bdb79a848db47 Author: Roy Bulter <[email protected]> AuthorDate: Mon Aug 3 13:50:35 2020 +0200 Fix buffers --- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 458 ++++++++++++++++----- 1 file changed, 352 insertions(+), 106 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 4734497..aae0406 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -84,7 +84,9 @@ typedef struct psa_tcp_connection_entry { socklen_t len; bool connected; bool headerError; + unsigned int state; pubsub_protocol_message_t header; + unsigned int maxMsgSize; unsigned int syncSize; unsigned int headerSize; unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload @@ -94,10 +96,11 @@ typedef struct psa_tcp_connection_entry { unsigned int bufferSize; void *buffer; unsigned int bufferReadSize; + unsigned int bufferReadReadOffset; + unsigned int expectedBufferReadSize; + unsigned int msgSizeReadSize; unsigned int metaBufferSize; void *metaBuffer; - struct msghdr msg; - size_t msg_iovlen; /* Number of elements in the vector. */ unsigned int retryCount; } psa_tcp_connection_entry_t; @@ -124,6 +127,7 @@ struct pubsub_tcpHandler { pubsub_protocol_service_t *protocol; unsigned int bufferSize; unsigned int maxNofBuffer; + unsigned int maxMsgSize; unsigned int maxSendRetryCount; unsigned int maxRcvRetryCount; double sendTimeout; @@ -132,29 +136,19 @@ struct pubsub_tcpHandler { bool running; }; -static inline int -pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock); - +static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock); static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); - static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd); - -static inline psa_tcp_connection_entry_t * -pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, - struct sockaddr_in *addr); - +static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, struct sockaddr_in *addr); static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry); - static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index); - static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag ); - +static inline int pubsub_tcpHandler_readSocket_(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, int flag ); +static inline void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); +static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int nextState); static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); - static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd); - static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle); - static void *pubsub_tcpHandler_thread(void *data); // @@ -348,9 +342,6 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch } if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize); if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize); - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer; - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize; - entry->msg_iovlen++; } return entry; } @@ -458,6 +449,13 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno)); entry = NULL; } + //rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd); + //if (rc < 0) { + // pubsub_tcpHandler_freeEntry(entry); + // L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno)); + // entry = NULL; + // } + } if ((rc >= 0) && (entry)) { celixThreadRwlock_writeLock(&handle->dbLock); @@ -565,7 +563,7 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, else { rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK); if (rc < 0) { - L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno)); + L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", strerror(errno)); } } return rc; @@ -774,7 +772,25 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim } static inline -int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, int flag ) { +int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) { + int expectedReadSize = size; + int nbytes = size; + int msgSize = 0; + char* buffer = (char*)_buffer; + while (nbytes > 0 && expectedReadSize > 0) { + // Read the message header + nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL); + // Update buffer administration + offset += nbytes; + expectedReadSize -= nbytes; + msgSize += nbytes; + } + if (nbytes <=0) msgSize = nbytes; + return msgSize; +} + +static inline +int pubsub_tcpHandler_readSocket_(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, int flag ) { int nbytes = entry->expectedBufferReadSize; char* buffer = (char*)_buffer; while (nbytes > 0 && entry->expectedBufferReadSize > 0) { @@ -811,6 +827,88 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec } } + static inline + void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) { + entry->bufferReadReadOffset = 0; + if (entry->state == READ_STATE_SYNC) { + entry->expectedBufferReadSize = entry->headerSize; + entry->state = READ_STATE_HEADER; + } else if (entry->state == READ_STATE_HEADER) { + if (entry->header.header.payloadSize) { + entry->state = READ_STATE_PAYLOAD; + entry->bufferReadReadOffset = entry->header.header.payloadOffset; + entry->expectedBufferReadSize = entry->header.header.payloadSize; + // For header less messages adjust offset and msg size; + if (!entry->headerBufferSize) { + entry->bufferReadReadOffset += entry->headerSize; + entry->expectedBufferReadSize -= entry->headerSize; + } + } else if (entry->header.header.metadataSize) { + entry->state = READ_STATE_META; + entry->expectedBufferReadSize = entry->header.header.metadataSize; + } else if (!entry->header.header.payloadSize && !entry->header.header.metadataSize) { + if (entry->footerSize) { + entry->state = READ_STATE_FOOTER; + entry->expectedBufferReadSize = entry->footerSize; + } else if (entry->header.header.isLastSegment) { + entry->state = READ_STATE_READY; + entry->expectedBufferReadSize = 0; + } else { + entry->state = READ_STATE_HEADER; + entry->expectedBufferReadSize = entry->headerSize; + } + } + } else if (entry->state == READ_STATE_PAYLOAD) { + if (entry->header.header.metadataSize) { + entry->state = READ_STATE_META; + entry->expectedBufferReadSize = entry->header.header.metadataSize; + } else { + if (entry->footerSize) { + entry->state = READ_STATE_FOOTER; + entry->expectedBufferReadSize = entry->footerSize; + } else if (entry->header.header.isLastSegment) { + entry->state = READ_STATE_READY; + entry->expectedBufferReadSize = 0; + } else { + entry->state = READ_STATE_HEADER; + entry->expectedBufferReadSize = entry->headerSize; + } + } + } else if (entry->state == READ_STATE_META) { + if (entry->footerSize) { + entry->state = READ_STATE_FOOTER; + entry->expectedBufferReadSize = entry->footerSize; + } else if (entry->header.header.isLastSegment) { + entry->state = READ_STATE_READY; + entry->expectedBufferReadSize = 0; + } else { + entry->state = READ_STATE_HEADER; + entry->expectedBufferReadSize = entry->headerSize; + } + } else if (entry->state == READ_STATE_FOOTER) { + if (entry->header.header.isLastSegment) { + entry->state = READ_STATE_READY; + } else { + entry->state = READ_STATE_HEADER; + entry->expectedBufferReadSize = entry->headerSize; + } + } else if (entry->state == READ_STATE_READY) { + entry->state = READ_STATE_HEADER; + entry->expectedBufferReadSize = entry->headerSize; + } +} +static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int nextState){ + entry->bufferReadReadOffset = 0; + if (nextState == READ_STATE_SYNC) { + entry->expectedBufferReadSize = entry->syncSize; + entry->state = nextState; + } else if (nextState == READ_STATE_HEADER) { + entry->expectedBufferReadSize = entry->headerSize; + entry->state = nextState; + } +} + + // // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure @@ -836,6 +934,144 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) { handle->bufferSize = MAX(handle->bufferSize, entry->headerSize ); if (entry->buffer) free(entry->buffer); + entry->buffer = malloc((size_t) handle->bufferSize); + entry->bufferSize = handle->bufferSize; + } + // Read the message + bool validMsg = false; + char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer; + int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK); + if (nbytes > 0) { + // Check header message buffer + if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) { + // Did not receive correct header + // skip sync word and try to read next header + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0); + if (!entry->headerError) { + L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); + } + entry->headerError = true; + entry->bufferReadSize = 0; + } else { + // Read header message from queue + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0); + if ((nbytes > 0) && (nbytes == entry->headerSize)) { + entry->headerError = false; + // For headerless message, add header to bufferReadSize; + if (!entry->headerBufferSize) + entry->bufferReadSize += nbytes; + // Alloc message buffers + if (entry->header.header.payloadSize > entry->bufferSize) { + handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); + if (entry->buffer) + free(entry->buffer); + entry->buffer = malloc((size_t) handle->bufferSize); + entry->bufferSize = handle->bufferSize; + } + if (entry->header.header.metadataSize > entry->metaBufferSize) { + if (entry->metaBuffer) { + free(entry->metaBuffer); + } + entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize); + entry->metaBufferSize = entry->header.header.metadataSize; + L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, + entry->url, entry->metaBufferSize, entry->header.header.metadataSize); + } + + if (entry->header.header.payloadSize) { + unsigned int offset = entry->header.header.payloadOffset; + unsigned int size = entry->header.header.payloadPartSize; + // For header less messages adjust offset and msg size; + if (!entry->headerBufferSize) { + offset = entry->headerSize; + size -= offset; + } + // Read payload data from queue + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0); + if (nbytes > 0) { + if (nbytes == size) { + entry->bufferReadSize += nbytes; + } else { + entry->bufferReadSize = 0; + L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); + } + } + } + if (nbytes > 0 && entry->header.header.metadataSize) { + // Read meta data from queue + unsigned int size = entry->header.header.metadataSize; + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0); + if ((nbytes > 0) && (nbytes != size)) { + L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); + } + } + // Check for end of message using, footer of message. Because of streaming protocol + if (nbytes > 0) { + if (entry->footerSize > 0) { + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0); + if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) { + // valid footer, this means that the message is valid + validMsg = true; + } else { + // Did not receive correct header + L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url); + entry->bufferReadSize = 0; + } + } else { + // No Footer, then complete message is received + validMsg = true; + } + } + } + } + } + if (nbytes > 0) { + entry->retryCount = 0; + // Check if complete message is received + if ((entry->bufferReadSize >= entry->header.header.payloadSize) && validMsg) { + entry->bufferReadSize = 0; + pubsub_tcpHandler_decodePayload(handle, entry); + } + } else { + if (entry->retryCount < handle->maxRcvRetryCount) { + entry->retryCount++; + L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd, + strerror(errno), entry->retryCount, handle->maxRcvRetryCount); + } else { + L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s", + entry->fd, handle->maxRcvRetryCount, strerror(errno)); + nbytes = 0; //Return 0 as indicator to close the connection + } + } + celixThreadRwlock_unlock(&handle->dbLock); + return nbytes; +} + + +// +// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure +// If the message is completely reassembled true is returned and the index and size have valid values +// +int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) { + celixThreadRwlock_writeLock(&handle->dbLock); + psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd); + if (entry == NULL) + entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); + // Find FD entry + if (entry == NULL) { + celixThreadRwlock_unlock(&handle->dbLock); + return -1; + } + // If it's not connected return from function + if (!entry->connected) { + celixThreadRwlock_unlock(&handle->dbLock); + return -1; + } + + // Message buffer is to small, reallocate to make it bigger + if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) { + handle->bufferSize = MAX(handle->bufferSize, entry->headerSize ); + if (entry->buffer) free(entry->buffer); entry->buffer = malloc((size_t) handle->bufferSize); entry->bufferSize = handle->bufferSize; } @@ -843,13 +1079,13 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { long int nbytes = 0; char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer; if (entry->state == READ_STATE_SYNC) { - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0); + nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, 0); if (nbytes > 0) { pubsub_tcpHandler_setReadStateMachine(handle, entry); } } if (entry->state == READ_STATE_HEADER) { - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK); + nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, MSG_PEEK); if (nbytes >= entry->headerSize) { // Check header message buffer if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) { // Did not receive correct header @@ -862,7 +1098,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { } else { // Read header message from queue pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0); + nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, 0); if ((nbytes > 0) && (nbytes == entry->headerSize)) { entry->headerError = false; entry->msgSizeReadSize = 0; @@ -885,7 +1121,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { } // Read payload data from queue - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0); + nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->buffer, 0); if (nbytes > 0) { if (nbytes >= entry->header.header.payloadPartSize) { entry->msgSizeReadSize += nbytes; @@ -906,7 +1142,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { } // Read meta data from (queue - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0); + nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->metaBuffer,0); if ((nbytes > 0) && (nbytes >= entry->header.header.metadataSize)) { entry->msgSizeReadSize += nbytes; pubsub_tcpHandler_setReadStateMachine(handle, entry); @@ -914,7 +1150,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { } if (entry->state == READ_STATE_FOOTER) { // Check for end of message using, footer of message. Because of streaming protocol - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0); + nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->footerBuffer, 0); if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) { // valid footer, this means that the message is valid pubsub_tcpHandler_setReadStateMachine(handle, entry); @@ -1034,6 +1270,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message int nofConnToClose = 0; if (handle) { hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map); + size_t max_msg_iov_len = IOV_MAX - 2; while (hashMapIterator_hasNext(&iter)) { psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter); if (!entry->connected) continue; @@ -1055,9 +1292,11 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message void *metadataData = NULL; size_t metadataSize = 0; if (message->metadata.metadata) { + metadataData = entry->metaBuffer; handle->protocol->encodeMetadata(handle->protocol->handle, message, &metadataData, &metadataSize); + entry->metaBufferSize = metadataSize; } message->header.metadataSize = metadataSize; size_t totalMessageSize = payloadSize + metadataSize; @@ -1074,12 +1313,13 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message void *footerData = NULL; size_t footerDataSize = 0; if (entry->footerSize) { + footerData = entry->footerBuffer; handle->protocol->encodeFooter(handle->protocol->handle, message, &footerData, &footerDataSize); + entry->footerSize = footerDataSize; } - size_t msgSize = 0; size_t msgIovLen = 0; long int nbytes = UINT32_MAX; @@ -1098,33 +1338,35 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message message->header.isLastSegment = 0; // Write generic seralized payload in vector buffer - if (payloadSize && payloadData) { - char *payloadDataBuffer = payloadData; - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = &payloadDataBuffer[msgSize]; - msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgSize), entry->maxMsgSize); - msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; - message->header.payloadPartSize = msgPartSize; - message->header.payloadOffset = msgSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; - } else { - // copy serialized vector into vector buffer - for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) { + if (msgSize < payloadSize) { + if (payloadSize && payloadData) { + char *payloadDataBuffer = payloadData; msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[msgIovLen + i].iov_base; - msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len; - if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize) - break; + msg.msg_iov[msg.msg_iovlen].iov_base = &payloadDataBuffer[msgSize]; + msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgSize), entry->maxMsgSize); msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + message->header.payloadPartSize = msgPartSize; + message->header.payloadOffset = msgSize; + msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + } else { + // copy serialized vector into vector buffer + for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) { + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[msgIovLen + i].iov_base; + msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len; + if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize) + break; + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + } + message->header.payloadPartSize = msgPartSize; + message->header.payloadOffset = msgSize; + msgSize += msgPartSize; + msgIovLen += (msg.msg_iovlen - 1); } - message->header.payloadPartSize = msgPartSize; - message->header.payloadOffset = msgSize; - msgSize += msgPartSize; - msgIovLen += (msg.msg_iovlen - 1); } // Write optional metadata in vector buffer - if ((msgSize < (payloadSize + metadataSize)) && + if ((msgSize >= payloadSize) && (msgPartSize < entry->maxMsgSize) && (metadataSize && metadataData)) { msg.msg_iovlen++; @@ -1146,65 +1388,67 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; } - void *headerData = NULL; - size_t headerSize = 0; - // check if header is not part of the payload (=> headerBufferSize = 0)s - if (entry->headerBufferSize) { - // Encode the header, with payload size and metadata size - handle->protocol->encodeHeader(handle->protocol->handle, message, - &headerData, - &headerSize); - } - if (!entry->headerBufferSize) { - // Skip header buffer, when header is part of payload; - msg.msg_iov = &msg_iov[1]; - } else if (headerSize && headerData) { - // Write header in 1st vector buffer item - msg.msg_iov[0].iov_base = headerData; - msg.msg_iov[0].iov_len = headerSize; - msgSize += msg.msg_iov[0].iov_len; - msg.msg_iovlen++; - } else { - L_ERROR("[TCP Socket] No header buffer is generated"); - msg.msg_iovlen = 0; - } - long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags); - // When a specific socket keeps reporting errors can indicate a subscriber - // which is not active anymore, the connection will remain until the retry - // counter exceeds the maximum retry count. - // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error. - if (nbytes == -1) { - if (entry->retryCount < handle->maxSendRetryCount) { - entry->retryCount++; - L_ERROR( - "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ", - entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount); + void *headerData = NULL; + size_t headerSize = 0; + // check if header is not part of the payload (=> headerBufferSize = 0)s + if (entry->headerBufferSize) { + headerData = entry->headerBuffer; + // Encode the header, with payload size and metadata size + handle->protocol->encodeHeader(handle->protocol->handle, message, + &headerData, + &headerSize); + entry->headerBufferSize = headerSize; + } + if (!entry->headerBufferSize) { + // Skip header buffer, when header is part of payload; + msg.msg_iov = &msg_iov[1]; + } else if (headerSize && headerData) { + // Write header in 1st vector buffer item + msg.msg_iov[0].iov_base = headerData; + msg.msg_iov[0].iov_len = headerSize; + msgPartSize += msg.msg_iov[0].iov_len; + msg.msg_iovlen++; } else { - L_ERROR( - "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", - entry->fd, handle->maxSendRetryCount, strerror(errno)); - connFdCloseQueue[nofConnToClose++] = entry->fd; + L_ERROR("[TCP Socket] No header buffer is generated"); + msg.msg_iovlen = 0; } - result = -1; //At least one connection failed sending - } else if (msgSize) { - entry->retryCount = 0; - if (nbytes != msgSize) { - L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno)); + nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgPartSize, flags); + // When a specific socket keeps reporting errors can indicate a subscriber + // which is not active anymore, the connection will remain until the retry + // counter exceeds the maximum retry count. + // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error. + if (nbytes == -1) { + if (entry->retryCount < handle->maxSendRetryCount) { + entry->retryCount++; + L_ERROR( + "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ", + entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount); + } else { + L_ERROR( + "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno)); + connFdCloseQueue[nofConnToClose++] = entry->fd; + } + result = -1; //At least one connection failed sending + } else if (msgPartSize) { + entry->retryCount = 0; + if (nbytes != msgPartSize) { + L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno)); + } + } + // Release data + if (headerData && headerData != entry->headerBuffer) { + free(headerData); + } + // Note: serialized Payload is deleted by serializer + if (payloadData && (payloadData != message->payload.payload)) { + free(payloadData); + } + if (metadataData && metadataData != entry->metaBuffer) { + free(metadataData); + } + if (footerData && footerData != entry->footerBuffer) { + free(footerData); } - } - // Release data - if (headerData) { - free(headerData); - } - // Note: serialized Payload is deleted by serializer - if (payloadData && (payloadData != message->payload.payload)) { - free(payloadData); - } - if (metadataData) { - free(metadataData); - } - if (footerData) { - free(footerData); } } } @@ -1216,6 +1460,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message return result; } + + // // get interface URL //
