This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/refactor_tcp_receive_function
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 02558474957863e840e15d026a879578a6c7d41b
Author: Roy Bulter <[email protected]>
AuthorDate: Sun May 31 14:49:15 2020 +0200

    Fix crashes when receiving big messages (500k)
---
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 188 ++++++++++++---------
 libs/framework/src/celix_log.c                     |   1 +
 2 files changed, 110 insertions(+), 79 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 93f0358..92ef423 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -83,6 +83,7 @@ typedef struct psa_tcp_connection_entry {
     void *headerBuffer;
     unsigned int bufferSize;
     void *buffer;
+    unsigned int bufferReadSize;
     unsigned int metaBufferSize;
     void *metaBuffer;
     struct msghdr msg;
@@ -128,6 +129,7 @@ pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t 
*handle, psa_tcp_conn
 static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t 
*handle, psa_tcp_connection_entry_t *entry);
 
 static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t 
*handle, int fd);
+static inline int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle, 
int fd);
 
 static inline psa_tcp_connection_entry_t *
 pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, 
char *external_url,
@@ -139,6 +141,8 @@ static inline void 
pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *han
 
 static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, 
int fd);
 
+static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, 
psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, 
unsigned int size, int flag );
+
 static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, 
int fd);
 
 static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t 
*handle, int fd);
@@ -547,7 +551,7 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t 
*handle,
 // Make accept file descriptor non blocking
 //
 static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t 
*handle,
-                                                    int fd) {
+                                                 int fd) {
     int rc = 0;
     int flags = fcntl(fd, F_GETFL, 0);
     if (flags == -1)
@@ -562,6 +566,26 @@ static inline int 
pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
 }
 
 //
+// Make accept file descriptor non blocking
+//
+static inline int pubsub_tcpHandler_makeBlocking(pubsub_tcpHandler_t *handle,
+                                                    int fd) {
+    int rc = 0;
+    int flags = fcntl(fd, F_GETFL, 0);
+    if (flags == -1)
+        rc = flags;
+    else {
+        rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+        if (rc < 0) {
+            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", 
strerror(errno));
+        }
+    }
+    return rc;
+}
+
+
+
+//
 // setup listening to interface (sender) using an url
 //
 int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) {
@@ -759,6 +783,20 @@ void 
pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
     }
 }
 
+static inline
+int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, 
psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, 
unsigned int size, int flag ) {
+    int expectedReadSize = size;
+    int nbytes = size;
+    while (nbytes > 0 && expectedReadSize > 0) {
+        // Read the message header
+        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | 
MSG_NOSIGNAL);
+        // Update buffer administration
+        offset += nbytes;
+        expectedReadSize -= nbytes;
+    }
+    return nbytes;
+}
+
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) 
and stores it in the internal structure
 // If the message is completely reassembled true is returned and the index and 
size have valid values
@@ -784,93 +822,84 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t 
*handle, int fd, unsigne
     // Message buffer is to small, reallocate to make it bigger
     if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) 
{
         handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
-        char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
-        if (buffer) {
-            entry->buffer = buffer;
-            entry->bufferSize = handle->bufferSize;
-        }
+        if (entry->buffer) free(entry->buffer);
+        entry->buffer = malloc((size_t) handle->bufferSize);
+        entry->bufferSize = handle->bufferSize;
     }
 
     // Read the message
-    entry->msg.msg_iovlen = 0;
-    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = 
(entry->headerBufferSize) ? entry->headerBuffer
-                                                                               
    : entry->buffer;
-    entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
-    entry->msg.msg_iovlen++;
-    int nbytes = recvmsg(fd, &entry->msg, MSG_PEEK | MSG_NOSIGNAL);
+    char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : 
entry->buffer;
+    int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, 
header_buffer, 0, entry->headerSize, MSG_PEEK);
     if (nbytes > 0) {
-        entry->msg.msg_iovlen = 0;
-        if (entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len != nbytes) {
-            celixThreadRwlock_unlock(&handle->dbLock);
-            return nbytes;
-
-        } else if (handle->protocol->decodeHeader(handle->protocol->handle,
-                                                  
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base,
-                                                  
entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len, &entry->header) !=
-            CELIX_SUCCESS) {
-            entry->msg.msg_iov[0].iov_len = entry->syncSize;
-            nbytes = recvmsg(fd, &entry->msg, 0);
-            if (nbytes > 0)
-                entry->retryCount = 0;
-            celixThreadRwlock_unlock(&handle->dbLock);
-            return nbytes;
-        }
-        if (entry->header.header.payloadSize > entry->bufferSize) {
-            handle->bufferSize = MAX(handle->bufferSize, 
entry->header.header.payloadSize);
-            char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
-            if (buffer) {
-                entry->buffer = buffer;
-                entry->bufferSize = handle->bufferSize;
-            }
-        }
-        if (entry->header.header.metadataSize > entry->metaBufferSize) {
-            char *buffer = realloc(entry->metaBuffer, (size_t) 
entry->header.header.metadataSize);
-            if (buffer) {
-                entry->metaBuffer = buffer;
-                entry->metaBufferSize = entry->header.header.metadataSize;
-                L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta 
buffer: (%d, %d) \n", entry->fd,
-                       entry->url, entry->metaBufferSize, 
entry->header.header.metadataSize);
-            }
-        }
-
-        if (entry->headerBufferSize)
-            entry->msg.msg_iovlen++;
-        if (entry->header.header.payloadSize) {
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer;
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = 
entry->header.header.payloadSize;
-            entry->msg.msg_iovlen++;
-        }
-        if (entry->header.header.metadataSize) {
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = 
entry->metaBuffer;
-            entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = 
entry->header.header.metadataSize;
-            entry->msg.msg_iovlen++;
-        }
-        nbytes = recvmsg(fd, &entry->msg, MSG_WAITALL | MSG_NOSIGNAL);
-    } else {
-        if (entry->retryCount < handle->maxRcvRetryCount) {
-            entry->retryCount++;
-            L_WARN("[TCP Socket] Failed to receive message header (fd: %d), 
error: %s. Retry count %u of %u,",
-                   entry->fd, strerror(errno), entry->retryCount, 
handle->maxRcvRetryCount);
+        // Check header message buffer
+        if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                           header_buffer,
+                                           entry->headerSize,
+                                           &entry->header) != CELIX_SUCCESS) {
+            // Did not receive correct header
+            // skip sync word and try to read next header
+            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, 
header_buffer, 0, entry->syncSize, 0);
+            L_WARN("[TCP Socket] Failed to decode message header (fd: %d) 
(url: %s)", entry->fd, entry->url);
+            entry->bufferReadSize = 0;
         } else {
-            L_ERROR(
-                "[TCP Socket] Failed to receive message header (fd: %d) after 
%u retries! Closing connection... Error: %s",
-                entry->fd,
-                handle->maxRcvRetryCount,
-                strerror(errno));
-            nbytes = 0; //Return 0 as indicator to close the connection
+            // Read header message from queue
+            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, 
header_buffer, 0, entry->headerSize, 0);
+            if ((nbytes > 0) && (nbytes == entry->headerSize)) {
+                // For headerless message, add header to bufferReadSize;
+                if (!entry->headerBufferSize)
+                    entry->bufferReadSize += nbytes;
+                // Alloc message buffers
+                if (entry->header.header.payloadSize > entry->bufferSize) {
+                    handle->bufferSize = MAX(handle->bufferSize, 
entry->header.header.payloadSize);
+                    if (entry->buffer)
+                        free(entry->buffer);
+                    entry->buffer = malloc((size_t) handle->bufferSize);
+                    entry->bufferSize = handle->bufferSize;
+                }
+                if (entry->header.header.metadataSize > entry->metaBufferSize) 
{
+                    if (entry->metaBuffer)
+                        free(entry->metaBuffer);
+                    entry->metaBuffer = malloc((size_t) 
entry->header.header.metadataSize);
+                    entry->bufferSize = handle->bufferSize;
+                    L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read 
meta buffer: (%d, %d) \n", entry->fd,
+                           entry->url, entry->metaBufferSize, 
entry->header.header.metadataSize);
+                }
+                if (entry->header.header.payloadSize) {
+                    unsigned int offset = entry->header.header.payloadOffset;
+                    unsigned int size = entry->header.header.payloadPartSize;
+                    // For header less messages adjust offset and msg size;
+                    if (!entry->headerBufferSize) {
+                        offset = entry->headerSize;
+                        size -= offset;
+                    }
+                    // Read payload data from queue
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, 
entry->buffer, offset, size, 0);
+                    if (nbytes > 0) {
+                        if (nbytes == size) {
+                            entry->bufferReadSize += nbytes;
+                        } else {
+                            entry->bufferReadSize = 0;
+                            L_ERROR("[TCP Socket] Failed to receive complete 
payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+                        }
+                    }
+                }
+                if (nbytes > 0 && entry->header.header.metadataSize) {
+                    // Read meta data from queue
+                    unsigned int size = entry->header.header.metadataSize;
+                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, 
entry->metaBuffer,0, size,0);
+                    if ((nbytes > 0) && (nbytes != size)) {
+                        L_ERROR("[TCP Socket] Failed to receive complete 
payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
+                    }
+                }
+            }
         }
     }
     if (nbytes > 0) {
         entry->retryCount = 0;
-        unsigned int msgSize = 0;
-        for (int i = 0; i < entry->msg.msg_iovlen; i++) {
-            msgSize += entry->msg.msg_iov[i].iov_len;
-        }
-        if (nbytes == msgSize) {
+        // Check is complete message is received
+        if (entry->bufferReadSize >= entry->header.header.payloadSize) {
+            entry->bufferReadSize = 0;
             *readMsg = true;
-        } else {
-            L_ERROR("[TCP Socket] Failed to receive complete message (fd: %d) 
nbytes : %d = msgSize %d", entry->fd,
-                    nbytes, msgSize);
         }
     } else {
         if (entry->retryCount < handle->maxRcvRetryCount) {
@@ -1132,6 +1161,7 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t 
*handle, psa_tcp_connect
     if (rc >= 0) {
         // handle new connection:
         struct sockaddr_in sin;
+        pubsub_tcpHandler_makeBlocking(handle, fd);
         getsockname(pendingConnectionEntry->fd, (struct sockaddr *) &sin, 
&len);
         char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
         char *url = pubsub_utils_url_get_url(&their_addr, NULL);
@@ -1143,7 +1173,7 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t 
*handle, psa_tcp_connect
 #else
         struct epoll_event event;
         bzero(&event, sizeof(event)); // zero the struct
-        event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT;
+        event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
         event.data.fd = entry->fd;
         // Register Read to epoll
         rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
diff --git a/libs/framework/src/celix_log.c b/libs/framework/src/celix_log.c
index 587189b..b2c074e 100644
--- a/libs/framework/src/celix_log.c
+++ b/libs/framework/src/celix_log.c
@@ -26,6 +26,7 @@
 #include "celix_log.h"
 #include "celix_threads.h"
 #include "celix_array_list.h"
+#include "memstream/open_memstream.h"
 
 #define LOG_NAME        "celix_framework"
 

Reply via email to