This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2 in repository https://gitbox.apache.org/repos/asf/celix.git
commit 3eb8877ca9b20d8867cfc5f19fe3c17535591d82 Author: Roy Bulter <[email protected]> AuthorDate: Sun Jun 28 08:06:27 2020 +0200 Update for non blocking --- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 88 +++++++++++++--------- 1 file changed, 52 insertions(+), 36 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 9ef6361..19cb0fd 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -53,6 +53,12 @@ #define MAX_EVENTS 64 #define MAX_DEFAULT_BUFFER_SIZE 4u +#define READ_STATE_INIT 0u +#define READ_STATE_HEADER 1u +#define READ_STATE_DATA 2u +#define READ_STATE_READY 3u +#define READ_STATE_FIND_HEADER 4u + #if defined(__APPLE__) #define MSG_NOSIGNAL (0) #endif @@ -77,6 +83,7 @@ typedef struct psa_tcp_connection_entry { socklen_t len; bool connected; bool headerError; + unsigned int state; pubsub_protocol_message_t header; unsigned int maxMsgSize; unsigned int syncSize; @@ -850,45 +857,54 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { entry->bufferSize = handle->bufferSize; } // Read the message + long int nbytes = 0; bool validMsg = false; char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer; - int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK); - if (nbytes > 0) { - // Check header message buffer - if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) { - // Did not receive correct header - // skip sync word and try to read next header - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0); - if (!entry->headerError) { - L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); - } - entry->headerError = true; - entry->bufferReadSize = 0; - } else { - // Read header message from queue - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0); - if ((nbytes > 0) && (nbytes == entry->headerSize)) { - entry->headerError = false; + if (entry->state == READ_STATE_HEADER) { + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK); + if (nbytes >= entry->headerSize) { // Check header message buffer + if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) { + // Did not receive correct header + // skip sync word and try to read next header + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0); + if (!entry->headerError) { + L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); + } + entry->headerError = true; + entry->bufferReadSize = 0; + } else { + // Read header message from queue + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0); + if ((nbytes > 0) && (nbytes == entry->headerSize)) { + entry->headerError = false; + entry->state == READ_STATE_DATA; // For headerless message, add header to bufferReadSize; - if (!entry->headerBufferSize) - entry->bufferReadSize += nbytes; - // Alloc message buffers - if (entry->header.header.payloadSize > entry->bufferSize) { - handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); - if (entry->buffer) - free(entry->buffer); - entry->buffer = malloc((size_t) handle->bufferSize); - entry->bufferSize = handle->bufferSize; - } - if (entry->header.header.metadataSize > entry->metaBufferSize) { - if (entry->metaBuffer) { - free(entry->metaBuffer); - entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize); - entry->bufferSize = handle->bufferSize; - L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, - entry->url, entry->metaBufferSize, entry->header.header.metadataSize); - } - } + if (!entry->headerBufferSize) entry->bufferReadSize += nbytes; + } + } + } + } + + if (nentry->state == READ_STATE_DATA) { + + // Alloc message buffers + if (entry->header.header.payloadSize > entry->bufferSize) { + handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); + if (entry->buffer) + free(entry->buffer); + entry->buffer = malloc((size_t) handle->bufferSize); + entry->bufferSize = handle->bufferSize; + } + + if (entry->header.header.metadataSize > entry->metaBufferSize) { + if (entry->metaBuffer) { + free(entry->metaBuffer); + entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize); + entry->metaBufferSize = entry->header.header.metadataSize; + L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, + entry->url, entry->metaBufferSize, entry->header.header.metadataSize); + } + } if (entry->header.header.payloadSize) { unsigned int offset = entry->header.header.payloadOffset;
