This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/make_tcp_admin_msg_sending_robust_when_tcp_send_timeout_expires in repository https://gitbox.apache.org/repos/asf/celix.git
commit 9cdf93bf575aa14323ee4ec45fae9b1d02c18492 Author: Roy Bulter <[email protected]> AuthorDate: Fri Jun 5 15:02:05 2020 +0200 refactor read and write function --- .../src/pubsub_psa_tcp_constants.h | 2 + .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 368 +++++++++++---------- .../pubsub_admin_tcp/src/pubsub_tcp_handler.h | 3 +- .../src/pubsub_tcp_topic_receiver.c | 9 +- .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 5 +- 5 files changed, 208 insertions(+), 179 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h index 3e7a7b3..6026212 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h @@ -61,9 +61,11 @@ //Time-out settings are only for BLOCKING connections #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY "PUBSUB_TCP_PUBLISHER_SEND_TIMEOUT" #define PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT 5.0 +#define PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT 0.0 #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY "PUBSUB_TCP_SUBSCRIBER_RCV_TIMEOUT" #define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT 5.0 +#define PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT 0.0 #define PUBSUB_TCP_PSA_IP_KEY "PSA_IP" #define PUBSUB_TCP_ADMIN_TYPE "tcp" diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 93f0358..7b13b6f 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -76,25 +76,28 @@ typedef struct psa_tcp_connection_entry { struct sockaddr_in addr; socklen_t len; bool connected; + bool headerError; pubsub_protocol_message_t header; unsigned int syncSize; unsigned int headerSize; unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload void *headerBuffer; + void *footerBuffer; unsigned int bufferSize; void *buffer; + unsigned int bufferReadSize; unsigned int metaBufferSize; void *metaBuffer; struct msghdr msg; size_t msg_iovlen; /* Number of elements in the vector. */ unsigned int retryCount; + unsigned int seqNr; } psa_tcp_connection_entry_t; // // Handle administration // struct pubsub_tcpHandler { - unsigned int readSeqNr; celix_thread_rwlock_t dbLock; unsigned int timeout; hash_map_t *connection_url_map; @@ -137,9 +140,9 @@ static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index); -static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd); +static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag ); -static inline void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd); +static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd); @@ -337,6 +340,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize; entry->msg_iovlen++; } + entry->footerBuffer = calloc(sizeof(char), entry->headerSize); entry->buffer = calloc(sizeof(char), entry->bufferSize); entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer; entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize; @@ -378,6 +382,12 @@ pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) { entry->headerBuffer = NULL; entry->headerBufferSize = 0; } + + if (entry->footerBuffer) { + free(entry->footerBuffer); + entry->footerBuffer = NULL; + } + if (entry->metaBuffer) { free(entry->metaBuffer); entry->metaBuffer = NULL; @@ -438,7 +448,7 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { #else struct epoll_event event; bzero(&event, sizeof(struct epoll_event)); // zero the struct - event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLOUT; + event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR; event.data.fd = entry->fd; rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event); #endif @@ -520,7 +530,7 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) { int rc = 0; if (handle != NULL && entry != NULL) { - fprintf(stdout, "[TCP Socket] Close interface url: %s: \n", entry->url); + L_INFO("[TCP Socket] Close interface url: %s: \n", entry->url); hashMap_remove(handle->interface_fd_map, (void *) (intptr_t) entry->fd); if ((handle->efd >= 0)) { #if defined(__APPLE__) @@ -546,8 +556,7 @@ pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, // // Make accept file descriptor non blocking // -static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, - int fd) { +static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd) { int rc = 0; int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) @@ -714,13 +723,9 @@ void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio, sch.sched_priority = prio; pthread_setschedparam(handle->thread.thread, policy, &sch); } else { - printf("Skipping configuration of thread prio to %i and thread " + L_INFO("Skipping configuration of thread prio to %i and thread " "scheduling to %s. No permission\n", (int) prio, sched); - celix_logHelper_log(handle->logHelper, CELIX_LOG_LEVEL_INFO, - "Skipping configuration of thread prio to %i and thread " - "scheduling to %s. No permission\n", - (int) prio, sched); } celixThreadRwlock_unlock(&handle->dbLock); } @@ -759,14 +764,51 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim } } +static inline +int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) { + int expectedReadSize = size; + int nbytes = size; + int msgSize = 0; + char* buffer = (char*)_buffer; + while (nbytes > 0 && expectedReadSize > 0) { + // Read the message header + nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL); + // Update buffer administration + offset += nbytes; + expectedReadSize -= nbytes; + msgSize += nbytes; + } + if (nbytes <=0) msgSize = nbytes; + return msgSize; +} + + +static inline +void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) { + + if (entry->header.header.payloadSize > 0) { + handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header); + } + if (entry->header.header.metadataSize > 0) { + handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer, + entry->header.header.metadataSize, &entry->header); + } + if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) { + struct timespec receiveTime; + clock_gettime(CLOCK_REALTIME, &receiveTime); + bool releaseEntryBuffer = false; + handle->processMessageCallback(handle->processMessagePayload, &entry->header, &releaseEntryBuffer, &receiveTime); + if (releaseEntryBuffer) pubsub_tcpHandler_releaseEntryBuffer(handle, entry->fd, 0); + } +} + + // // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure // If the message is completely reassembled true is returned and the index and size have valid values // -int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, bool *readMsg) { +int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { celixThreadRwlock_writeLock(&handle->dbLock); - *index = 0; - *readMsg = false; psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd); if (entry == NULL) entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); @@ -784,93 +826,105 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne // Message buffer is to small, reallocate to make it bigger if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) { handle->bufferSize = MAX(handle->bufferSize, entry->headerSize); - char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize); - if (buffer) { - entry->buffer = buffer; + if (entry->buffer) free(entry->buffer); + entry->buffer = malloc((size_t) handle->bufferSize); entry->bufferSize = handle->bufferSize; } - } - // Read the message - entry->msg.msg_iovlen = 0; - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = (entry->headerBufferSize) ? entry->headerBuffer - : entry->buffer; - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize; - entry->msg.msg_iovlen++; - int nbytes = recvmsg(fd, &entry->msg, MSG_PEEK | MSG_NOSIGNAL); + bool validMsg = false; + char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer; + int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK); if (nbytes > 0) { - entry->msg.msg_iovlen = 0; - if (entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len != nbytes) { - celixThreadRwlock_unlock(&handle->dbLock); - return nbytes; - - } else if (handle->protocol->decodeHeader(handle->protocol->handle, - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base, - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len, &entry->header) != - CELIX_SUCCESS) { - entry->msg.msg_iov[0].iov_len = entry->syncSize; - nbytes = recvmsg(fd, &entry->msg, 0); - if (nbytes > 0) - entry->retryCount = 0; - celixThreadRwlock_unlock(&handle->dbLock); - return nbytes; - } - if (entry->header.header.payloadSize > entry->bufferSize) { - handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); - char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize); - if (buffer) { - entry->buffer = buffer; - entry->bufferSize = handle->bufferSize; - } - } - if (entry->header.header.metadataSize > entry->metaBufferSize) { - char *buffer = realloc(entry->metaBuffer, (size_t) entry->header.header.metadataSize); - if (buffer) { - entry->metaBuffer = buffer; - entry->metaBufferSize = entry->header.header.metadataSize; - L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, - entry->url, entry->metaBufferSize, entry->header.header.metadataSize); - } - } - - if (entry->headerBufferSize) - entry->msg.msg_iovlen++; - if (entry->header.header.payloadSize) { - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer; - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.payloadSize; - entry->msg.msg_iovlen++; - } - if (entry->header.header.metadataSize) { - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->metaBuffer; - entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->header.header.metadataSize; - entry->msg.msg_iovlen++; - } - nbytes = recvmsg(fd, &entry->msg, MSG_WAITALL | MSG_NOSIGNAL); - } else { - if (entry->retryCount < handle->maxRcvRetryCount) { - entry->retryCount++; - L_WARN("[TCP Socket] Failed to receive message header (fd: %d), error: %s. Retry count %u of %u,", - entry->fd, strerror(errno), entry->retryCount, handle->maxRcvRetryCount); + // Check header message buffer + if (handle->protocol->decodeHeader(handle->protocol->handle, + header_buffer, + entry->headerSize, + &entry->header) != CELIX_SUCCESS) { + // Did not receive correct header + // skip sync word and try to read next header + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0); + if (!entry->headerError) L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); + entry->headerError = true; + entry->bufferReadSize = 0; } else { - L_ERROR( - "[TCP Socket] Failed to receive message header (fd: %d) after %u retries! Closing connection... Error: %s", - entry->fd, - handle->maxRcvRetryCount, - strerror(errno)); - nbytes = 0; //Return 0 as indicator to close the connection + // Read header message from queue + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0); + if ((nbytes > 0) && (nbytes == entry->headerSize)) { + entry->headerError = false; + // For headerless message, add header to bufferReadSize; + if (!entry->headerBufferSize) + entry->bufferReadSize += nbytes; + // Alloc message buffers + if (entry->header.header.payloadSize > entry->bufferSize) { + handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); + if (entry->buffer) + free(entry->buffer); + entry->buffer = malloc((size_t) handle->bufferSize); + entry->bufferSize = handle->bufferSize; + } + if (entry->header.header.metadataSize > entry->metaBufferSize) { + if (entry->metaBuffer) { + free(entry->metaBuffer); + entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize); + entry->bufferSize = handle->bufferSize; + L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, + entry->url, entry->metaBufferSize, entry->header.header.metadataSize); + } + } + + if (entry->header.header.payloadSize) { + unsigned int offset = entry->header.header.payloadOffset; + unsigned int size = entry->header.header.payloadPartSize; + // For header less messages adjust offset and msg size; + if (!entry->headerBufferSize) { + offset = entry->headerSize; + size -= offset; + } + // Read payload data from queue + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0); + if (nbytes > 0) { + if (nbytes == size) { + entry->bufferReadSize += nbytes; + } else { + entry->bufferReadSize = 0; + L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); + } + } + } + if (nbytes > 0 && entry->header.header.metadataSize) { + // Read meta data from queue + unsigned int size = entry->header.header.metadataSize; + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0); + if ((nbytes > 0) && (nbytes != size)) { + L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); + } + } + // Check for end of message using, header of next message. Because of streaming protocol + // TODO: Add to protocol service to decode/EncodeFooter with unique sync word(different then header) + if (nbytes > 0) { + pubsub_protocol_message_t header; + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->headerSize, MSG_PEEK); + if (handle->protocol->decodeHeader(handle->protocol->handle, + entry->footerBuffer, + entry->headerSize, + &header) == CELIX_SUCCESS) { + // valid header for next buffer, this means that the message is valid + validMsg = true; + } else { + // Did not receive correct header + L_ERROR("[TCP Socket] Failed to decode next message header seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url); + entry->bufferReadSize = 0; + } + } + } } } if (nbytes > 0) { entry->retryCount = 0; - unsigned int msgSize = 0; - for (int i = 0; i < entry->msg.msg_iovlen; i++) { - msgSize += entry->msg.msg_iov[i].iov_len; - } - if (nbytes == msgSize) { - *readMsg = true; - } else { - L_ERROR("[TCP Socket] Failed to receive complete message (fd: %d) nbytes : %d = msgSize %d", entry->fd, - nbytes, msgSize); + // Check if complete message is received + if ((entry->bufferReadSize >= entry->header.header.payloadSize) && validMsg) { + entry->bufferReadSize = 0; + pubsub_tcpHandler_decodePayload(handle, entry); } } else { if (entry->retryCount < handle->maxRcvRetryCount) { @@ -887,34 +941,6 @@ int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigne return nbytes; } -// -// Read out the message which is indicated available by the largeUdp_dataAvailable function -// -int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index __attribute__ ((__unused__)), - pubsub_protocol_message_t **header) { - int result = 0; - celixThreadRwlock_readLock(&handle->dbLock); - psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);; - if (entry == NULL) - entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); - if (entry == NULL) - result = -1; - if (entry) - result = (!entry->connected) ? -1 : result; - if (!result) { - if (entry->header.header.payloadSize > 0) { - handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, - &entry->header); - } - if (entry->header.header.metadataSize > 0) { - handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer, - entry->header.header.metadataSize, &entry->header); - } - *header = &entry->header; - } - celixThreadRwlock_unlock(&handle->dbLock); - return result; -} int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void *payload, pubsub_tcpHandler_processMessage_callback_t processMessageCallback) { @@ -950,6 +976,44 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle, v return result; } +static inline +int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, struct msghdr* msg, unsigned int size, int flag ) { + int nbytes = 0; + int msgSize = 0; + if (entry->fd >= 0 && size && msg->msg_iovlen) { + int expectedReadSize = size; + unsigned int offset = 0; + nbytes = size; + while (nbytes > 0 && expectedReadSize > 0) { + // Read the message header + nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL); + // Update admin + expectedReadSize -= nbytes; + msgSize += nbytes; + // Not all written + if (expectedReadSize && nbytes > 0) { + unsigned int readSize = 0; + unsigned int readIndex = 0; + unsigned int i = 0; + for (i = 0; i < msg->msg_iovlen; i++) { + if (nbytes < msg->msg_iov[i].iov_len) { + readIndex = i; + break; + } + readSize+= msg->msg_iov[i].iov_len; + } + msg->msg_iov = &msg->msg_iov[readIndex]; + msg->msg_iovlen -= readIndex; + char* buffer = (char*)msg->msg_iov->iov_base; + offset = nbytes - readSize; + msg->msg_iov->iov_base = &buffer[offset]; + msg->msg_iov->iov_len = msg->msg_iov->iov_len - offset; + } + } + } + if (nbytes <=0) msgSize = nbytes; + return msgSize; +} // // Write large data to TCP. . // @@ -963,6 +1027,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map); while (hashMapIterator_hasNext(&iter)) { psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter); + if (!entry->connected) continue; void *payloadData = NULL; size_t payloadSize = 0; if (msg_iov_len == 1) { @@ -973,6 +1038,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message } } + message->header.seqNr = entry->seqNr; message->header.payloadSize = payloadSize; message->header.payloadPartSize = payloadSize; message->header.payloadOffset = 0; @@ -987,15 +1053,12 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message message->header.metadataSize = metadataSize; size_t msgSize = 0; - long int nbytes = 0; struct msghdr msg; struct iovec msg_iov[IOV_MAX]; + memset(&msg, 0x00, sizeof(struct msghdr)); msg.msg_name = &entry->addr; msg.msg_namelen = entry->len; msg.msg_flags = flags; - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_iovlen = 0; msg.msg_iov = msg_iov; // Write generic seralized payload in vector buffer @@ -1044,10 +1107,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message L_ERROR("[TCP Socket] No header buffer is generated"); msg.msg_iovlen = 0; } - nbytes = 0; - if (entry->fd >= 0 && msgSize && msg.msg_iovlen) { - nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL); - } + long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags); // When a specific socket keeps reporting errors can indicate a subscriber // which is not active anymore, the connection will remain until the retry // counter exceeds the maximum retry count. @@ -1068,7 +1128,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message } else if (msgSize) { entry->retryCount = 0; if (nbytes != msgSize) { - L_ERROR("[TCP Socket] MsgSize not correct: %d != %d\n", msgSize, nbytes); + L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", entry->seqNr, msgSize, nbytes, strerror(errno)); } } // Release data @@ -1082,6 +1142,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message if (metadataData) { free(metadataData); } + entry->seqNr++; } } celixThreadRwlock_unlock(&handle->dbLock); @@ -1168,51 +1229,10 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect } // -// Handle sockets reads (blocking) -// -static inline -void pubsub_tcpHandler_readHandler(pubsub_tcpHandler_t *handle, int fd) { - unsigned int index = 0; - bool readMsg = false; - int rc = pubsub_tcpHandler_dataAvailable(handle, fd, &index, &readMsg); - if (rc <= 0) { - // close connection. - if (rc == 0) - pubsub_tcpHandler_close(handle, fd); - return; - } - if (readMsg) { - // Handle data - pubsub_protocol_message_t *header = NULL; - rc = pubsub_tcpHandler_read(handle, fd, index, &header); - if (rc < 0) - return; - celixThreadRwlock_readLock(&handle->dbLock); - if (handle->processMessageCallback && header != NULL && header->payload.payload != NULL && - header->payload.length) { - struct timespec receiveTime; - clock_gettime(CLOCK_REALTIME, &receiveTime); - bool releaseEntryBuffer = false; - handle->processMessageCallback(handle->processMessagePayload, header, &releaseEntryBuffer, &receiveTime); - if (releaseEntryBuffer) - pubsub_tcpHandler_releaseEntryBuffer(handle, fd, index); - } - celixThreadRwlock_unlock(&handle->dbLock); - } -} - -// // Handle sockets connection (sender) // static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd) { - int err = 0; - socklen_t len = sizeof(int); - int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len); - if (rc != 0) { - L_ERROR("[TCP Socket]:EPOLLOUT ERROR read from socket %s\n", strerror(errno)); - return; - } celixThreadRwlock_readLock(&handle->dbLock); psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); if (entry) @@ -1255,7 +1275,8 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) { int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry); pubsub_tcpHandler_connectionHandler(handle, fd); } else if (events[i].filter & EVFILT_READ) { - pubsub_tcpHandler_readHandler(handle, events[i].ident); + int rc = pubsub_tcpHandler_read(handle, events[i].data.fd); + if (rc == 0) pubsub_tcpHandler_close(handle, events[i].data.fd); } else if (events[i].flags & EV_EOF) { int err = 0; socklen_t len = sizeof(int); @@ -1304,7 +1325,8 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) { int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry); pubsub_tcpHandler_connectionHandler(handle, fd); } else if (events[i].events & EPOLLIN) { - pubsub_tcpHandler_readHandler(handle, events[i].data.fd); + rc = pubsub_tcpHandler_read(handle, events[i].data.fd); + if (rc == 0) pubsub_tcpHandler_close(handle, events[i].data.fd); } else if (events[i].events & EPOLLRDHUP) { int err = 0; socklen_t len = sizeof(int); diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h index 260edc1..ed4581c 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h @@ -68,8 +68,7 @@ void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout); void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout); -int pubsub_tcpHandler_dataAvailable(pubsub_tcpHandler_t *handle, int fd, unsigned int *index, bool *readMsg); -int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd, unsigned int index, pubsub_protocol_message_t **header); +int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd); int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message_t *message, struct iovec *msg_iovec, diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c index 8cbf8fc..0bd51c5 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c @@ -157,6 +157,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context receiver->protocol = protocol; receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024); receiver->topic = strndup(topic, 1024 * 1024); + bool isEndpoint = false; bool isServerEndPoint = false; /* Check if it's a static endpoint */ @@ -167,6 +168,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL); const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL); if (endPointType != NULL) { + isEndpoint = true; if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) { staticClientEndPointUrls = staticConnectUrls; @@ -207,8 +209,9 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL); long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT); - double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, - PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT); + double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, + (!isEndpoint) ? PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT : + PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_ENDPOINT_DEFAULT); long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS, PSA_TCP_DEFAULT_MAX_RECV_SESSIONS); long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE, @@ -789,4 +792,4 @@ static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t } return check; -} \ No newline at end of file +} diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c index 69c862a..47dc888 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c @@ -145,6 +145,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( } sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED); + bool isEndpoint = false; char *urls = NULL; const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL); const char *discUrl = NULL; @@ -155,6 +156,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( /* Check if it's a static endpoint */ const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL); if (endPointType != NULL) { + isEndpoint = true; if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) { staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL); @@ -192,7 +194,8 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT); double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, - PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT); + (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT : + PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT); pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope); pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched); pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
