This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/refactor_tcp_receive_function in repository https://gitbox.apache.org/repos/asf/celix.git
commit 02558474957863e840e15d026a879578a6c7d41b Author: Roy Bulter <[email protected]> AuthorDate: Sun May 31 14:49:15 2020 +0200 Fix crashes when receiving big messages (500k) --- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 188 ++++++++++++--------- libs/framework/src/celix_log.c | 1 + 2 files changed, 110 insertions(+), 79 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 93f0358..92ef423 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -83,6 +83,7 @@ typedef struct psa_tcp_connection_entry { void *headerBuffer; unsigned int bufferSize; void *buffer; + unsigned int bufferReadSize; unsigned int metaBufferSize; void *metaBuffer; struct msghdr msg; @@ -128,6 +129,7 @@ pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_conn static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd); +static inline int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle, int fd); static inline psa_tcp_connection_entry_t * pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, @@ -139,6 +141,8 @@ static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *han static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd); +static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag ); + static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd); static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd); @@ -547,7 +551,7 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, // Make accept file descriptor non blocking // static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, - int fd) { + int fd) { int rc = 0; int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) @@ -562,6 +566,26 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, } // +// Make accept file descriptor non blocking +// +static inline int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle, + int fd) { + int rc = 0; + int flags = fcntl(fd, F_GETFL, 0); + if (flags == -1) + rc = flags; + else { + rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); + if (rc < 0) { + L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno)); + } + } + return rc; +} + + + +// // setup listening to interface (sender) using an url // int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) { @@ -759,6 +783,20 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim } } +static inline +int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag ) { + int expectedReadSize = size; + int nbytes = size; + while (nbytes > 0 && expectedReadSize > 0) { + // Read the message header + nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL); + // Update buffer administration + offset += nbytes; + expectedReadSize -= nbytes; + } + return nbytes; +} + // // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure // If the message is completely reassembled true is returned and the index and size have valid values @@ -784,93 +822,84 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne // Message buffer is to small, reallocate to make it bigger if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) { handle->bufferSize = MAX(handle->bufferSize, entry->headerSize); - char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize); - if (buffer) { - entry->buffer = buffer; - entry->bufferSize = handle->bufferSize; - } + if (entry->buffer) free(entry->buffer); + entry->buffer = malloc((size_t) handle->bufferSize); + entry->bufferSize = handle->bufferSize; } // Read the message - entry->msg.msg_iovlen = 0; - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = (entry->headerBufferSize) ? entry->headerBuffer - : entry->buffer; - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize; - entry->msg.msg_iovlen++; - int nbytes = recvmsg(fd, &entry->msg, MSG_PEEK | MSG_NOSIGNAL); + char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer; + int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK); if (nbytes > 0) { - entry->msg.msg_iovlen = 0; - if (entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len != nbytes) { - celixThreadRwlock_unlock(&handle->dbLock); - return nbytes; - - } else if (handle->protocol->decodeHeader(handle->protocol->handle, - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base, - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len, &entry->header) != - CELIX_SUCCESS) { - entry->msg.msg_iov[0].iov_len = entry->syncSize; - nbytes = recvmsg(fd, &entry->msg, 0); - if (nbytes > 0) - entry->retryCount = 0; - celixThreadRwlock_unlock(&handle->dbLock); - return nbytes; - } - if (entry->header.header.payloadSize > entry->bufferSize) { - handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); - char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize); - if (buffer) { - entry->buffer = buffer; - entry->bufferSize = handle->bufferSize; - } - } - if (entry->header.header.metadataSize > entry->metaBufferSize) { - char *buffer = realloc(entry->metaBuffer, (size_t) entry->header.header.metadataSize); - if (buffer) { - entry->metaBuffer = buffer; - entry->metaBufferSize = entry->header.header.metadataSize; - L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, - entry->url, entry->metaBufferSize, entry->header.header.metadataSize); - } - } - - if (entry->headerBufferSize) - entry->msg.msg_iovlen++; - if (entry->header.header.payloadSize) { - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer; - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.payloadSize; - entry->msg.msg_iovlen++; - } - if (entry->header.header.metadataSize) { - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->metaBuffer; - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.metadataSize; - entry->msg.msg_iovlen++; - } - nbytes = recvmsg(fd, &entry->msg, MSG_WAITALL | MSG_NOSIGNAL); - } else { - if (entry->retryCount < handle->maxRcvRetryCount) { - entry->retryCount++; - L_WARN("[TCP Socket] Failed to receive message header (fd: %d), error: %s. Retry count %u of %u,", - entry->fd, strerror(errno), entry->retryCount, handle->maxRcvRetryCount); + // Check header message buffer + if (handle->protocol->decodeHeader(handle->protocol->handle, + header_buffer, + entry->headerSize, + &entry->header) != CELIX_SUCCESS) { + // Did not receive correct header + // skip sync word and try to read next header + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0); + L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); + entry->bufferReadSize = 0; } else { - L_ERROR( - "[TCP Socket] Failed to receive message header (fd: %d) after %u retries! Closing connection... Error: %s", - entry->fd, - handle->maxRcvRetryCount, - strerror(errno)); - nbytes = 0; //Return 0 as indicator to close the connection + // Read header message from queue + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0); + if ((nbytes > 0) && (nbytes == entry->headerSize)) { + // For headerless message, add header to bufferReadSize; + if (!entry->headerBufferSize) + entry->bufferReadSize += nbytes; + // Alloc message buffers + if (entry->header.header.payloadSize > entry->bufferSize) { + handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); + if (entry->buffer) + free(entry->buffer); + entry->buffer = malloc((size_t) handle->bufferSize); + entry->bufferSize = handle->bufferSize; + } + if (entry->header.header.metadataSize > entry->metaBufferSize) { + if (entry->metaBuffer) + free(entry->metaBuffer); + entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize); + entry->bufferSize = handle->bufferSize; + L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, + entry->url, entry->metaBufferSize, entry->header.header.metadataSize); + } + if (entry->header.header.payloadSize) { + unsigned int offset = entry->header.header.payloadOffset; + unsigned int size = entry->header.header.payloadPartSize; + // For header less messages adjust offset and msg size; + if (!entry->headerBufferSize) { + offset = entry->headerSize; + size -= offset; + } + // Read payload data from queue + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0); + if (nbytes > 0) { + if (nbytes == size) { + entry->bufferReadSize += nbytes; + } else { + entry->bufferReadSize = 0; + L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); + } + } + } + if (nbytes > 0 && entry->header.header.metadataSize) { + // Read meta data from queue + unsigned int size = entry->header.header.metadataSize; + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0); + if ((nbytes > 0) && (nbytes != size)) { + L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); + } + } + } } } if (nbytes > 0) { entry->retryCount = 0; - unsigned int msgSize = 0; - for (int i = 0; i < entry->msg.msg_iovlen; i++) { - msgSize += entry->msg.msg_iov[i].iov_len; - } - if (nbytes == msgSize) { + // Check is complete message is received + if (entry->bufferReadSize >= entry->header.header.payloadSize) { + entry->bufferReadSize = 0; *readMsg = true; - } else { - L_ERROR("[TCP Socket] Failed to receive complete message (fd: %d) nbytes : %d = msgSize %d", entry->fd, - nbytes, msgSize); } } else { if (entry->retryCount < handle->maxRcvRetryCount) { @@ -1132,6 +1161,7 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect if (rc >= 0) { // handle new connection: struct sockaddr_in sin; + pubsub_tcpHandler_makeBlocking(handle, fd); getsockname(pendingConnectionEntry->fd, (struct sockaddr *) &sin, &len); char *interface_url = pubsub_utils_url_get_url(&sin, NULL); char *url = pubsub_utils_url_get_url(&their_addr, NULL); @@ -1143,7 +1173,7 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect #else struct epoll_event event; bzero(&event, sizeof(event)); // zero the struct - event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT; + event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR; event.data.fd = entry->fd; // Register Read to epoll rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event); diff --git a/libs/framework/src/celix_log.c b/libs/framework/src/celix_log.c index 587189b..b2c074e 100644 --- a/libs/framework/src/celix_log.c +++ b/libs/framework/src/celix_log.c @@ -26,6 +26,7 @@ #include "celix_log.h" #include "celix_threads.h" #include "celix_array_list.h" +#include "memstream/open_memstream.h" #define LOG_NAME "celix_framework"
