This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v3 in repository https://gitbox.apache.org/repos/asf/celix.git
commit bec22541abb5f012ec8bf3ed2a3f55d8d6c92096 Author: Roy Bulter <[email protected]> AuthorDate: Sun Jun 28 20:24:02 2020 +0200 Fix unit test --- .../src/pubsub_psa_tcp_constants.h | 2 +- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 275 +++++++++++++-------- 2 files changed, 174 insertions(+), 103 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h index 302c9f6..ff8e68f 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h @@ -60,7 +60,7 @@ //Time-out settings are only for BLOCKING connections #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY "PUBSUB_TCP_PUBLISHER_SEND_TIMEOUT" -#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT 5.0 +#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT 0.0 //5.0 #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT 0.0 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT" diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 19cb0fd..b404ec4 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -53,11 +53,12 @@ #define MAX_EVENTS 64 #define MAX_DEFAULT_BUFFER_SIZE 4u -#define READ_STATE_INIT 0u -#define READ_STATE_HEADER 1u -#define READ_STATE_DATA 2u -#define READ_STATE_READY 3u -#define READ_STATE_FIND_HEADER 4u +#define READ_STATE_HEADER 0u +#define READ_STATE_PAYLOAD 1u +#define READ_STATE_META 2u +#define READ_STATE_FOOTER 3u +#define READ_STATE_READY 4u +#define READ_STATE_SYNC 5u #if defined(__APPLE__) #define MSG_NOSIGNAL (0) @@ -94,7 +95,9 @@ typedef struct psa_tcp_connection_entry { void *footerBuffer; unsigned int bufferSize; void *buffer; - unsigned int bufferReadSize; + unsigned int bufferReadReadOffset; + unsigned int expectedBufferReadSize; + unsigned int msgSizeReadSize; unsigned int metaBufferSize; void *metaBuffer; struct msghdr msg; @@ -134,29 +137,18 @@ struct pubsub_tcpHandler { bool running; }; -static inline int -pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock); - +static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock); static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); - static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd); - -static inline psa_tcp_connection_entry_t * -pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, - struct sockaddr_in *addr); - +static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, struct sockaddr_in *addr); static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry); - static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index); - -static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag ); - +static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, int flag ); +static inline void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); +static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int nextState); static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); - static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd); - static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle); - static void *pubsub_tcpHandler_thread(void *data); // @@ -357,6 +349,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer; entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize; entry->msg_iovlen++; + pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); } return entry; } @@ -469,6 +462,13 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno)); entry = NULL; } + //rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd); + //if (rc < 0) { + // pubsub_tcpHandler_freeEntry(entry); + // L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno)); + // entry = NULL; + // } + } if ((rc >= 0) && (entry)) { celixThreadRwlock_writeLock(&handle->dbLock); @@ -576,7 +576,7 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, else { rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK); if (rc < 0) { - L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno)); + L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", strerror(errno)); } } return rc; @@ -791,21 +791,20 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim } static inline -int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) { - int expectedReadSize = size; - int nbytes = size; - int msgSize = 0; +int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, int flag ) { + int nbytes = entry->expectedBufferReadSize; char* buffer = (char*)_buffer; - while (nbytes > 0 && expectedReadSize > 0) { + while (nbytes > 0 && entry->expectedBufferReadSize > 0) { // Read the message header - nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL); + nbytes = recv(fd, &buffer[entry->bufferReadReadOffset], entry->expectedBufferReadSize, flag | MSG_NOSIGNAL); // Update buffer administration - offset += nbytes; - expectedReadSize -= nbytes; - msgSize += nbytes; + entry->bufferReadReadOffset += nbytes; + entry->expectedBufferReadSize-= nbytes; } - if (nbytes <=0) msgSize = nbytes; - return msgSize; + if (nbytes == 0) { + L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd, strerror(errno)); + } + return nbytes; } @@ -828,6 +827,88 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec } } + static inline + void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) { + entry->bufferReadReadOffset = 0; + if (entry->state == READ_STATE_SYNC) { + entry->expectedBufferReadSize = entry->headerSize; + entry->state = READ_STATE_HEADER; + } else if (entry->state == READ_STATE_HEADER) { + if (entry->header.header.payloadSize) { + entry->state = READ_STATE_PAYLOAD; + entry->bufferReadReadOffset = entry->header.header.payloadOffset; + entry->expectedBufferReadSize = entry->header.header.payloadSize; + // For header less messages adjust offset and msg size; + if (!entry->headerBufferSize) { + entry->bufferReadReadOffset += entry->headerSize; + entry->expectedBufferReadSize -= entry->headerSize; + } + } else if (entry->header.header.metadataSize) { + entry->state = READ_STATE_META; + entry->expectedBufferReadSize = entry->header.header.metadataSize; + } else if (!entry->header.header.payloadSize && !entry->header.header.metadataSize) { + if (entry->footerSize) { + entry->state = READ_STATE_FOOTER; + entry->expectedBufferReadSize = entry->footerSize; + } else if (entry->header.header.isLastSegment) { + entry->state = READ_STATE_READY; + entry->expectedBufferReadSize = 0; + } else { + entry->state = READ_STATE_HEADER; + entry->expectedBufferReadSize = entry->headerSize; + } + } + } else if (entry->state == READ_STATE_PAYLOAD) { + if (entry->header.header.metadataSize) { + entry->state = READ_STATE_META; + entry->expectedBufferReadSize = entry->header.header.metadataSize; + } else { + if (entry->footerSize) { + entry->state = READ_STATE_FOOTER; + entry->expectedBufferReadSize = entry->footerSize; + } else if (entry->header.header.isLastSegment) { + entry->state = READ_STATE_READY; + entry->expectedBufferReadSize = 0; + } else { + entry->state = READ_STATE_HEADER; + entry->expectedBufferReadSize = entry->headerSize; + } + } + } else if (entry->state == READ_STATE_META) { + if (entry->footerSize) { + entry->state = READ_STATE_FOOTER; + entry->expectedBufferReadSize = entry->footerSize; + } else if (entry->header.header.isLastSegment) { + entry->state = READ_STATE_READY; + entry->expectedBufferReadSize = 0; + } else { + entry->state = READ_STATE_HEADER; + entry->expectedBufferReadSize = entry->headerSize; + } + } else if (entry->state == READ_STATE_FOOTER) { + if (entry->header.header.isLastSegment) { + entry->state = READ_STATE_READY; + } else { + entry->state = READ_STATE_HEADER; + entry->expectedBufferReadSize = entry->headerSize; + } + } else if (entry->state == READ_STATE_READY) { + entry->state = READ_STATE_HEADER; + entry->expectedBufferReadSize = entry->headerSize; + } +} +static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int nextState){ + entry->bufferReadReadOffset = 0; + if (nextState == READ_STATE_SYNC) { + entry->expectedBufferReadSize = entry->syncSize; + entry->state = nextState; + } else if (nextState == READ_STATE_HEADER) { + entry->expectedBufferReadSize = entry->headerSize; + entry->state = nextState; + } +} + + // // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure @@ -858,35 +939,40 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { } // Read the message long int nbytes = 0; - bool validMsg = false; char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer; + if (entry->state == READ_STATE_SYNC) { + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0); + if (nbytes > 0) { + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } + } if (entry->state == READ_STATE_HEADER) { - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK); + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK); if (nbytes >= entry->headerSize) { // Check header message buffer if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) { // Did not receive correct header // skip sync word and try to read next header - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0); if (!entry->headerError) { L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); } entry->headerError = true; - entry->bufferReadSize = 0; + pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_SYNC); } else { // Read header message from queue - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0); + pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0); if ((nbytes > 0) && (nbytes == entry->headerSize)) { entry->headerError = false; - entry->state == READ_STATE_DATA; - // For headerless message, add header to bufferReadSize; - if (!entry->headerBufferSize) entry->bufferReadSize += nbytes; + entry->msgSizeReadSize = 0; + // For headerless message, add header to bufferReadSize; + if (!entry->headerBufferSize) entry->msgSizeReadSize += nbytes; + pubsub_tcpHandler_setReadStateMachine(handle, entry); } } } } - if (nentry->state == READ_STATE_DATA) { - + if (entry->state == READ_STATE_PAYLOAD) { // Alloc message buffers if (entry->header.header.payloadSize > entry->bufferSize) { handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); @@ -896,6 +982,17 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { entry->bufferSize = handle->bufferSize; } + // Read payload data from queue + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0); + if (nbytes > 0) { + if (nbytes >= entry->header.header.payloadPartSize) { + entry->msgSizeReadSize += nbytes; + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } + } + } + + if (entry->state == READ_STATE_META) { if (entry->header.header.metadataSize > entry->metaBufferSize) { if (entry->metaBuffer) { free(entry->metaBuffer); @@ -906,64 +1003,38 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { } } - if (entry->header.header.payloadSize) { - unsigned int offset = entry->header.header.payloadOffset; - unsigned int size = entry->header.header.payloadPartSize; - // For header less messages adjust offset and msg size; - if (!entry->headerBufferSize) { - offset += entry->headerSize; - size -= offset; - } - // Read payload data from queue - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0); - if (nbytes > 0) { - if (nbytes == size) { - entry->bufferReadSize += nbytes; - } else { - entry->bufferReadSize = 0; - L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); - } - } - } - if (nbytes > 0 && entry->header.header.metadataSize) { - // Read meta data from queue - unsigned int size = entry->header.header.metadataSize; - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0); - if ((nbytes > 0) && (nbytes != size)) { - L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); - } - } - // Check for end of message using, footer of message. Because of streaming protocol - if (nbytes > 0) { - if (entry->footerSize > 0) { - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0); - if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) { - // valid footer, this means that the message is valid - validMsg = true; - } else { - // Did not receive correct header - L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url); - entry->bufferReadSize = 0; - } - } else { - // No Footer, then complete message is received - validMsg = true; - } - } - } + // Read meta data from (queue + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0); + if ((nbytes > 0) && (nbytes >= entry->header.header.metadataSize)) { + entry->msgSizeReadSize += nbytes; + pubsub_tcpHandler_setReadStateMachine(handle, entry); } } + if (entry->state == READ_STATE_FOOTER) { + // Check for end of message using, footer of message. Because of streaming protocol + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0); + if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) { + // valid footer, this means that the message is valid + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } else { + // Did not receive correct header + L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url); + pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); + } + } + if (entry->state == READ_STATE_READY) { + // Complete message is received + pubsub_tcpHandler_decodePayload(handle, entry); + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } + if (nbytes > 0) { entry->retryCount = 0; - // Check if complete message is received - if ((entry->bufferReadSize >= entry->header.header.payloadSize) && - validMsg && - entry->header.header.isLastSegment) { - entry->bufferReadSize = 0; - pubsub_tcpHandler_decodePayload(handle, entry); - } - } else { - if (entry->retryCount < handle->maxRcvRetryCount) { + } else if (nbytes < 0) { + if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { + // Non blocking interrupt + entry->retryCount = 0; + } else if (entry->retryCount < handle->maxRcvRetryCount) { entry->retryCount++; L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd, strerror(errno), entry->retryCount, handle->maxRcvRetryCount); @@ -1143,7 +1214,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len; if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize) break; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; } message->header.payloadPartSize = msgPartSize; message->header.payloadOffset = msgSize; @@ -1171,7 +1242,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message msg.msg_iovlen++; msg.msg_iov[msg.msg_iovlen].iov_base = footerData; msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; } void *headerData = NULL; @@ -1196,7 +1267,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message L_ERROR("[TCP Socket] No header buffer is generated"); msg.msg_iovlen = 0; } - nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags); + nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgPartSize, flags); // When a specific socket keeps reporting errors can indicate a subscriber // which is not active anymore, the connection will remain until the retry // counter exceeds the maximum retry count. @@ -1215,7 +1286,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message result = -1; //At least one connection failed sending } else if (msgPartSize) { entry->retryCount = 0; - if (nbytes != msgSize) { + if (nbytes != msgPartSize) { L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno)); } }
