This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2 in repository https://gitbox.apache.org/repos/asf/celix.git
commit 63fe56e1231146e446b8801e9a72f46f5bfc67e3 Author: Roy Bulter <[email protected]> AuthorDate: Tue Jun 23 21:53:13 2020 +0200 Add Message segmentation --- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 241 +++++++++++++-------- 1 file changed, 149 insertions(+), 92 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 3bb31cd..9ef6361 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -78,6 +78,7 @@ typedef struct psa_tcp_connection_entry { bool connected; bool headerError; pubsub_protocol_message_t header; + unsigned int maxMsgSize; unsigned int syncSize; unsigned int headerSize; unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload @@ -117,6 +118,7 @@ struct pubsub_tcpHandler { pubsub_protocol_service_t *protocol; unsigned int bufferSize; unsigned int maxNofBuffer; + unsigned int maxMsgSize; unsigned int maxSendRetryCount; unsigned int maxRcvRetryCount; double sendTimeout; @@ -331,6 +333,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch entry->headerBufferSize = size; handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size); entry->syncSize = size; + entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : UINT32_MAX; handle->protocol->getFooterSize(handle->protocol->handle, &size); entry->footerSize = size; entry->bufferSize = handle->bufferSize; @@ -658,6 +661,20 @@ int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle, } // +// Setup receive buffer size +// +int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size) { + if (handle != NULL) { + celixThreadRwlock_writeLock(&handle->dbLock); + handle->maxMsgSize = size; + celixThreadRwlock_unlock(&handle->dbLock); + } + return 0; +} + + +int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int maxMsgSize); +// // Setup thread timeout // void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, @@ -878,7 +895,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { unsigned int size = entry->header.header.payloadPartSize; // For header less messages adjust offset and msg size; if (!entry->headerBufferSize) { - offset = entry->headerSize; + offset += entry->headerSize; size -= offset; } // Read payload data from queue @@ -923,7 +940,9 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { if (nbytes > 0) { entry->retryCount = 0; // Check if complete message is received - if ((entry->bufferReadSize >= entry->header.header.payloadSize) && validMsg) { + if ((entry->bufferReadSize >= entry->header.header.payloadSize) && + validMsg && + entry->header.header.isLastSegment) { entry->bufferReadSize = 0; pubsub_tcpHandler_decodePayload(handle, entry); } @@ -1026,6 +1045,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message int nofConnToClose = 0; if (handle) { hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map); + size_t max_msg_iov_len = IOV_MAX - 2; while (hashMapIterator_hasNext(&iter)) { psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter); if (!entry->connected) continue; @@ -1052,6 +1072,16 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message &metadataSize); } message->header.metadataSize = metadataSize; + size_t totalMessageSize = payloadSize + metadataSize; + + bool isMessageSegmentationSupported = false; + handle->protocol->isMessageSegmentationSupported(handle->protocol->handle, &isMessageSegmentationSupported); + if (((!isMessageSegmentationSupported) && (msg_iov_len > max_msg_iov_len)) || + ((!isMessageSegmentationSupported) && (totalMessageSize > entry->maxMsgSize))) { + L_WARN("[TCP Socket] Failed to send message (fd: %d), Message segmentation is not supported\n", + entry->fd); + continue; + } void *footerData = NULL; size_t footerDataSize = 0; @@ -1061,107 +1091,134 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message &footerDataSize); } + size_t msgSize = 0; - struct msghdr msg; - struct iovec msg_iov[IOV_MAX]; - memset(&msg, 0x00, sizeof(struct msghdr)); - msg.msg_name = &entry->addr; - msg.msg_namelen = entry->len; - msg.msg_flags = flags; - msg.msg_iov = msg_iov; - - // Write generic seralized payload in vector buffer - if (payloadSize && payloadData) { - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = payloadData; - msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; - } else { - // copy serialized vector into vector buffer - for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) { + size_t msgIovLen = 0; + long int nbytes = UINT32_MAX; + while (msgSize < totalMessageSize && nbytes > 0) { + struct msghdr msg; + struct iovec msg_iov[IOV_MAX]; + memset(&msg, 0x00, sizeof(struct msghdr)); + msg.msg_name = &entry->addr; + msg.msg_namelen = entry->len; + msg.msg_flags = flags; + msg.msg_iov = msg_iov; + size_t msgPartSize = 0; + message->header.payloadPartSize = 0; + message->header.payloadOffset = 0; + message->header.metadataSize = 0; + message->header.isLastSegment = 0; + + // Write generic seralized payload in vector buffer + if (payloadSize && payloadData) { + char *payloadDataBuffer = payloadData; msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base; - msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len; + msg.msg_iov[msg.msg_iovlen].iov_base = &payloadDataBuffer[msgSize]; + msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgSize), entry->maxMsgSize); + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + message->header.payloadPartSize = msgPartSize; + message->header.payloadOffset = msgSize; msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + } else { + // copy serialized vector into vector buffer + for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) { + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[msgIovLen + i].iov_base; + msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len; + if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize) + break; + msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + } + message->header.payloadPartSize = msgPartSize; + message->header.payloadOffset = msgSize; + msgSize += msgPartSize; + msgIovLen += (msg.msg_iovlen - 1); } - } - // Write optional metadata in vector buffer - if (metadataSize && metadataData) { - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = metadataData; - msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; - } + // Write optional metadata in vector buffer + if ((msgSize < (payloadSize + metadataSize)) && + (msgPartSize < entry->maxMsgSize) && + (metadataSize && metadataData)) { + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = metadataData; + msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize; + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + message->header.metadataSize = metadataSize; + msgSize += metadataSize; + } + if (msgSize >= totalMessageSize) { + message->header.isLastSegment = 0x1; + } - // Write optional footerData in vector buffer - if (footerData && footerDataSize) { - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = footerData; - msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; - } + // Write optional footerData in vector buffer + if (footerData && footerDataSize) { + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = footerData; + msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize; + msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + } - void *headerData = NULL; - size_t headerSize = 0; - // check if header is not part of the payload (=> headerBufferSize = 0)s - if (entry->headerBufferSize) { - // Encode the header, with payload size and metadata size - handle->protocol->encodeHeader(handle->protocol->handle, message, - &headerData, - &headerSize); - } - if (!entry->headerBufferSize) { - // Skip header buffer, when header is part of payload; - msg.msg_iov = &msg_iov[1]; - } else if (headerSize && headerData) { - // Write header in 1st vector buffer item - msg.msg_iov[0].iov_base = headerData; - msg.msg_iov[0].iov_len = headerSize; - msgSize += msg.msg_iov[0].iov_len; - msg.msg_iovlen++; - } else { - L_ERROR("[TCP Socket] No header buffer is generated"); - msg.msg_iovlen = 0; - } - long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags); - // When a specific socket keeps reporting errors can indicate a subscriber - // which is not active anymore, the connection will remain until the retry - // counter exceeds the maximum retry count. - // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error. - if (nbytes == -1) { - if (entry->retryCount < handle->maxSendRetryCount) { - entry->retryCount++; - L_ERROR( - "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ", - entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount); + void *headerData = NULL; + size_t headerSize = 0; + // check if header is not part of the payload (=> headerBufferSize = 0)s + if (entry->headerBufferSize) { + // Encode the header, with payload size and metadata size + handle->protocol->encodeHeader(handle->protocol->handle, message, + &headerData, + &headerSize); + } + if (!entry->headerBufferSize) { + // Skip header buffer, when header is part of payload; + msg.msg_iov = &msg_iov[1]; + } else if (headerSize && headerData) { + // Write header in 1st vector buffer item + msg.msg_iov[0].iov_base = headerData; + msg.msg_iov[0].iov_len = headerSize; + msgPartSize += msg.msg_iov[0].iov_len; + msg.msg_iovlen++; } else { - L_ERROR( - "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", - entry->fd, handle->maxSendRetryCount, strerror(errno)); - connFdCloseQueue[nofConnToClose++] = entry->fd; + L_ERROR("[TCP Socket] No header buffer is generated"); + msg.msg_iovlen = 0; } - result = -1; //At least one connection failed sending - } else if (msgSize) { - entry->retryCount = 0; - if (nbytes != msgSize) { - L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno)); + nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags); + // When a specific socket keeps reporting errors can indicate a subscriber + // which is not active anymore, the connection will remain until the retry + // counter exceeds the maximum retry count. + // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error. + if (nbytes == -1) { + if (entry->retryCount < handle->maxSendRetryCount) { + entry->retryCount++; + L_ERROR( + "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ", + entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount); + } else { + L_ERROR( + "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno)); + connFdCloseQueue[nofConnToClose++] = entry->fd; + } + result = -1; //At least one connection failed sending + } else if (msgPartSize) { + entry->retryCount = 0; + if (nbytes != msgSize) { + L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno)); + } + } + // Release data + if (headerData) { + free(headerData); + } + // Note: serialized Payload is deleted by serializer + if (payloadData && (payloadData != message->payload.payload)) { + free(payloadData); + } + if (metadataData) { + free(metadataData); + } + if (footerData) { + free(footerData); } } - // Release data - if (headerData) { - free(headerData); - } - // Note: serialized Payload is deleted by serializer - if (payloadData && (payloadData != message->payload.payload)) { - free(payloadData); - } - if (metadataData) { - free(metadataData); - } - if (footerData) { - free(footerData); - } + } } celixThreadRwlock_unlock(&handle->dbLock);
