This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/tcp_admin_msg_segmentation in repository https://gitbox.apache.org/repos/asf/celix.git
commit 90f472e1c4be2135cb9a1ddb273216a92e456da3 Author: Roy Bulter <[email protected]> AuthorDate: Sun Aug 23 20:56:35 2020 +0200 - Add Message segmenation (maxSize / IOVEC) - Fix pre-allocated buffer handling - Solve message corruption because of wrong locking in write function - Add interceptors for tcp_admin --- CMakeLists.txt | 11 +- .../src/pubsub_psa_tcp_constants.h | 5 +- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 672 +++++++++++---------- .../pubsub_admin_tcp/src/pubsub_tcp_handler.h | 7 +- .../src/pubsub_tcp_topic_receiver.c | 77 ++- .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 11 +- .../pubsub/pubsub_utils/include/pubsub_utils_url.h | 8 +- bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c | 22 +- 8 files changed, 425 insertions(+), 388 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 75fd9bd..a298fdd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,13 +32,6 @@ IF (${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION} EQUAL 3.3 AND ${CMAKE_GENERATO message( FATAL_ERROR "Building Celix using CMake 3.3 and makefiles is not supported due to a bug in the Makefile Generator (see Bug 15696). Please change the used CMake version - both, CMake 3.2 and CMake 3.4 are working fine. Or use a different generator (e.g. Ninja)." ) ENDIF() -if (ENABLE_TESTING) - find_package(GTest CONFIG QUIET) - if (NOT GTest_FOUND) - include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/AddGTest.cmake) - endif() -endif () - set(ENABLE_MORE_WARNINGS OFF) # Set C specific flags @@ -123,6 +116,10 @@ set(DEFAULT_VERSION 1.0.0) # Options option(ENABLE_TESTING "Enables unit/bundle testing" FALSE) if (ENABLE_TESTING) + find_package(GTest CONFIG QUIET) + if (NOT GTest_FOUND) + include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/AddGTest.cmake) + endif() enable_testing() endif() diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h index 4284042..9f03d13 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h @@ -23,7 +23,7 @@ #define PSA_TCP_BASE_PORT "PSA_TCP_BASE_PORT" #define PSA_TCP_MAX_PORT "PSA_TCP_MAX_PORT" -#define PSA_TCP_MAX_RECV_SESSIONS "PSA_TCP_MAX_RECV_SESSIONS" +#define PSA_TCP_MAX_MESSAGE_SIZE "PSA_TCP_MAX_MESSAGE_SIZE" #define PSA_TCP_RECV_BUFFER_SIZE "PSA_TCP_RECV_BUFFER_SIZE" #define PSA_TCP_TIMEOUT "PSA_TCP_TIMEOUT" #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT" @@ -31,8 +31,7 @@ #define PSA_TCP_DEFAULT_BASE_PORT 5501 #define PSA_TCP_DEFAULT_MAX_PORT 6000 -#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS 1 - +#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE UINT32_MAX #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE 65 * 1024 #define PSA_TCP_DEFAULT_TIMEOUT 2000 // 2 seconds #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 0cbd28f..be5a694 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -31,6 +31,7 @@ #include <errno.h> #include <array_list.h> #include <pthread.h> +#include <sys/ioctl.h> #if defined(__APPLE__) #include <sys/types.h> #include <sys/event.h> @@ -78,17 +79,23 @@ typedef struct psa_tcp_connection_entry { bool connected; bool headerError; pubsub_protocol_message_t header; + unsigned int maxMsgSize; unsigned int syncSize; unsigned int headerSize; - unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload - void *headerBuffer; - unsigned int footerSize; - void *footerBuffer; + unsigned int readHeaderBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload + void *readHeaderBuffer; + unsigned int writeHeaderBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload + void *writeHeaderBuffer; + unsigned int readFooterSize; + void *readFooterBuffer; + unsigned int writeFooterSize; + void *writeFooterBuffer; unsigned int bufferSize; void *buffer; - unsigned int bufferReadSize; - unsigned int metaBufferSize; - void *metaBuffer; + size_t readMetaBufferSize; + void *readMetaBuffer; + size_t writeMetaBufferSize; + void *writeMetaBuffer; unsigned int retryCount; } psa_tcp_connection_entry_t; @@ -114,38 +121,25 @@ struct pubsub_tcpHandler { celix_log_helper_t *logHelper; pubsub_protocol_service_t *protocol; unsigned int bufferSize; - unsigned int maxNofBuffer; + unsigned int maxMsgSize; unsigned int maxSendRetryCount; unsigned int maxRcvRetryCount; double sendTimeout; double rcvTimeout; celix_thread_t thread; bool running; + bool isEndPoint; }; -static inline int -pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock); - +static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock); static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); - static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd); - -static inline psa_tcp_connection_entry_t * -pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, - struct sockaddr_in *addr); - +static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, struct sockaddr_in *addr); static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry); - static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index); - -static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag ); - static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); - static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd); - static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle); - static void *pubsub_tcpHandler_thread(void *data); // @@ -167,7 +161,6 @@ pubsub_tcpHandler_t *pubsub_tcpHandler_create(pubsub_protocol_service_t *protoco handle->logHelper = logHelper; handle->protocol = protocol; handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE; - handle->maxNofBuffer = 1; // Reserved for future Use; celixThreadRwlock_create(&handle->dbLock, 0); handle->running = true; celixThread_create(&handle->thread, NULL, pubsub_tcpHandler_thread, handle); @@ -261,7 +254,7 @@ int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url) { L_ERROR("[TCP Socket] Error setsockopt (SO_RCVTIMEO) to set send timeout: %s", strerror(errno)); } } - struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr); + struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr); if (addr) { rc = bind(fd, (struct sockaddr *) addr, sizeof(struct sockaddr)); if (rc != 0) { @@ -326,17 +319,25 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch handle->protocol->getHeaderSize(handle->protocol->handle, &size); entry->headerSize = size; handle->protocol->getHeaderBufferSize(handle->protocol->handle, &size); - entry->headerBufferSize = size; + entry->readHeaderBufferSize = size; + entry->writeHeaderBufferSize = size; handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size); entry->syncSize = size; handle->protocol->getFooterSize(handle->protocol->handle, &size); - entry->footerSize = size; + entry->readFooterSize = size; + entry->writeFooterSize = size; entry->bufferSize = handle->bufferSize; entry->connected = false; - if (entry->headerBufferSize) { - entry->headerBuffer = calloc(sizeof(char), entry->headerSize); + unsigned minimalMsgSize = entry->writeHeaderBufferSize + entry->writeFooterSize; + if ((minimalMsgSize > handle->maxMsgSize) && (handle->maxMsgSize)) { + L_ERROR("[TCP Socket] maxMsgSize (%d) < headerSize + FooterSize (%d): %s\n", handle->maxMsgSize, minimalMsgSize); + } else { + entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : UINT32_MAX; } - if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize); + if (entry->readHeaderBufferSize) entry->readHeaderBuffer = calloc(sizeof(char), entry->readHeaderBufferSize); + if (entry->writeHeaderBufferSize) entry->writeHeaderBuffer = calloc(sizeof(char), entry->writeHeaderBufferSize); + if (entry->readFooterSize) entry->readFooterBuffer = calloc(sizeof(char), entry->readFooterSize); + if (entry->writeFooterSize) entry->writeFooterBuffer = calloc(sizeof(char), entry->writeFooterSize); if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize); } return entry; @@ -348,40 +349,16 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) { if (entry) { - if (entry->url) { - free(entry->url); - entry->url = NULL; - } - if (entry->interface_url) { - free(entry->interface_url); - entry->interface_url = NULL; - } - if (entry->fd >= 0) { - close(entry->fd); - entry->fd = -1; - } - if (entry->buffer) { - free(entry->buffer); - entry->buffer = NULL; - entry->bufferSize = 0; - } - if (entry->headerBuffer) { - free(entry->headerBuffer); - entry->headerBuffer = NULL; - entry->headerBufferSize = 0; - } - - if (entry->footerBuffer) { - free(entry->footerBuffer); - entry->footerBuffer = NULL; - } - - if (entry->metaBuffer) { - free(entry->metaBuffer); - entry->metaBuffer = NULL; - entry->metaBufferSize = 0; - } - entry->connected = false; + if (entry->url) free(entry->url); + if (entry->interface_url) free(entry->interface_url); + if (entry->fd >= 0) close(entry->fd); + if (entry->buffer) free(entry->buffer); + if (entry->readHeaderBuffer) free(entry->readHeaderBuffer); + if (entry->writeHeaderBuffer) free(entry->writeHeaderBuffer); + if (entry->readFooterBuffer) free(entry->readFooterBuffer); + if (entry->writeFooterBuffer) free(entry->writeFooterBuffer); + if (entry->readMetaBuffer) free(entry->readMetaBuffer); + if (entry->writeMetaBuffer) free(entry->writeMetaBuffer); free(entry); } } @@ -403,8 +380,7 @@ pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsign // int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { int rc = 0; - psa_tcp_connection_entry_t *entry = - hashMap_get(handle->connection_url_map, (void *) (intptr_t) url); + psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url); if (entry == NULL) { pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url); int fd = pubsub_tcpHandler_open(handle, url_info->interface_url); @@ -414,11 +390,11 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { socklen_t len = sizeof(sin); getsockname(fd, (struct sockaddr *) &sin, &len); char *interface_url = pubsub_utils_url_get_url(&sin, NULL); - struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr); + struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr); if ((rc >= 0) && addr) { rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr)); if (rc < 0 && errno != EINPROGRESS) { - L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname, url_info->portnr, interface_url, + L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err(%d): %s\n", url_info->hostname, url_info->port_nr, interface_url, errno, strerror(errno)); close(fd); } else { @@ -445,6 +421,12 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno)); entry = NULL; } + rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd); + if (rc < 0) { + pubsub_tcpHandler_freeEntry(entry); + L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno)); + entry = NULL; + } } if ((rc >= 0) && (entry)) { celixThreadRwlock_writeLock(&handle->dbLock); @@ -552,7 +534,7 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, else { rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK); if (rc < 0) { - L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno)); + L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", strerror(errno)); } } return rc; @@ -628,16 +610,24 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) { } // -// Setup buffer sizes +// Setup receive buffer size // -int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle, - unsigned int maxNofBuffers - __attribute__((__unused__)), - unsigned int bufferSize) { +int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size) { if (handle != NULL) { celixThreadRwlock_writeLock(&handle->dbLock); - handle->bufferSize = bufferSize; - handle->maxNofBuffer = maxNofBuffers; + handle->bufferSize = size; + celixThreadRwlock_unlock(&handle->dbLock); + } + return 0; +} + +// +// Set Maximum message size +// +int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size) { + if (handle != NULL) { + celixThreadRwlock_writeLock(&handle->dbLock); + handle->maxMsgSize = size; celixThreadRwlock_unlock(&handle->dbLock); } return 0; @@ -708,7 +698,7 @@ void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio, if (prio > 0 && prio < 100) { struct sched_param sch; bzero(&sch, sizeof(struct sched_param)); - sch.sched_priority = prio; + sch.sched_priority = (int)prio; pthread_setschedparam(handle->thread.thread, policy, &sch); } else { L_INFO("Skipping configuration of thread prio to %i and thread " @@ -752,35 +742,23 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim } } -static inline -int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) { - int expectedReadSize = size; - int nbytes = size; - int msgSize = 0; - char* buffer = (char*)_buffer; - while (nbytes > 0 && expectedReadSize > 0) { - // Read the message header - nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL); - // Update buffer administration - offset += nbytes; - expectedReadSize -= nbytes; - msgSize += nbytes; +void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle,bool isEndPoint) { + if (handle != NULL) { + celixThreadRwlock_writeLock(&handle->dbLock); + handle->isEndPoint = isEndPoint; + celixThreadRwlock_unlock(&handle->dbLock); } - if (nbytes <=0) msgSize = nbytes; - return msgSize; } - static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) { if (entry->header.header.payloadSize > 0) { - handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header); + handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header); } if (entry->header.header.metadataSize > 0) { - handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer, - entry->header.header.metadataSize, &entry->header); - entry->metaBufferSize = entry->header.header.metadataSize; + handle->protocol->decodeMetadata(handle->protocol->handle, entry->readMetaBuffer, + entry->header.header.metadataSize, &entry->header); } if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) { struct timespec receiveTime; @@ -791,7 +769,6 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec } } - // // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure // If the message is completely reassembled true is returned and the index and size have valid values @@ -812,113 +789,147 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { return -1; } + if (entry->readHeaderBufferSize && entry->readHeaderBuffer) { + entry->readHeaderBuffer = malloc(entry->readHeaderBufferSize); + } + // Message buffer is to small, reallocate to make it bigger - if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) { - handle->bufferSize = MAX(handle->bufferSize, entry->headerSize ); - if (entry->buffer) free(entry->buffer); - entry->buffer = malloc((size_t) handle->bufferSize); + if ((!entry->readHeaderBufferSize) && (entry->headerSize > entry->bufferSize)) { + handle->bufferSize = MAX(handle->bufferSize, entry->headerSize); + char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize); + if (buffer) { + entry->buffer = buffer; entry->bufferSize = handle->bufferSize; } + } + struct msghdr msg; + struct iovec msg_iov[IOV_MAX]; + memset(&msg, 0x00, sizeof(struct msghdr)); + msg.msg_iov = msg_iov; + // Read the message - bool validMsg = false; - char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer; - int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK); - if (nbytes > 0) { - // Check header message buffer - if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) { + msg.msg_iovlen = 0; + msg.msg_iov[msg.msg_iovlen].iov_base = (entry->readHeaderBufferSize) ? entry->readHeaderBuffer : entry->buffer; + msg.msg_iov[msg.msg_iovlen].iov_len = entry->headerSize; + msg.msg_iovlen++; + long int msgSize = 0; + long int nbytes = recvmsg(fd, &msg, MSG_PEEK | MSG_NOSIGNAL); + if (nbytes >= entry->headerSize) { + msg.msg_iovlen--; + if (handle->protocol->decodeHeader(handle->protocol->handle, + msg.msg_iov[msg.msg_iovlen].iov_base, + msg.msg_iov[msg.msg_iovlen].iov_len, + &entry->header) != CELIX_SUCCESS) { // Did not receive correct header // skip sync word and try to read next header - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0); if (!entry->headerError) { L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); } entry->headerError = true; - entry->bufferReadSize = 0; + msg.msg_iov[msg.msg_iovlen].iov_len = entry->syncSize; + msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + msg.msg_iovlen++; } else { - // Read header message from queue - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0); - if ((nbytes > 0) && (nbytes == entry->headerSize)) { - entry->headerError = false; - // For headerless message, add header to bufferReadSize; - if (!entry->headerBufferSize) - entry->bufferReadSize += nbytes; - // Alloc message buffers - if (entry->header.header.payloadSize > entry->bufferSize) { - handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); - if (entry->buffer) - free(entry->buffer); - entry->buffer = malloc((size_t) handle->bufferSize); + // Alloc message buffers + if (entry->header.header.payloadSize > entry->bufferSize) { + handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); + char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize); + if (buffer) { + entry->buffer = buffer; entry->bufferSize = handle->bufferSize; } - if (entry->header.header.metadataSize > entry->metaBufferSize) { - if (entry->metaBuffer) { - free(entry->metaBuffer); - } - entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize); - entry->metaBufferSize = entry->header.header.metadataSize; - L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, - entry->url, entry->metaBufferSize, entry->header.header.metadataSize); - } + } - if (entry->header.header.payloadSize) { - unsigned int offset = entry->header.header.payloadOffset; - unsigned int size = entry->header.header.payloadPartSize; - // For header less messages adjust offset and msg size; - if (!entry->headerBufferSize) { - offset = entry->headerSize; - size -= offset; - } - // Read payload data from queue - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0); - if (nbytes > 0) { - if (nbytes == size) { - entry->bufferReadSize += nbytes; - } else { - entry->bufferReadSize = 0; - L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); - } - } - } - if (nbytes > 0 && entry->header.header.metadataSize) { - // Read meta data from queue - unsigned int size = entry->header.header.metadataSize; - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0); - if ((nbytes > 0) && (nbytes != size)) { - L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); - } - } - // Check for end of message using, footer of message. Because of streaming protocol - if (nbytes > 0) { - if (entry->footerSize > 0) { - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0); - if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) { - // valid footer, this means that the message is valid - validMsg = true; - } else { - // Did not receive correct header - L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url); - entry->bufferReadSize = 0; - } - } else { - // No Footer, then complete message is received - validMsg = true; +#ifdef USE_TCP_PUB_SUB_BUFFER_MEMSET + // For Debugging + if ((entry->header.header.payloadOffset == 0 ) && (msgSize == entry->headerSize)) { + memset(entry->buffer, 0x00, entry->bufferSize); + } +#endif + entry->headerError = false; + if (entry->readHeaderBufferSize) { + msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + msg.msg_iovlen++; + } + if (entry->header.header.payloadPartSize) { + char* buffer = entry->buffer; + msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[entry->header.header.payloadOffset]; + msg.msg_iov[msg.msg_iovlen].iov_len = entry->header.header.payloadPartSize; + msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + msg.msg_iovlen++; + } + if (entry->header.header.metadataSize) { + if (entry->header.header.metadataSize > entry->readMetaBufferSize) { + char *buffer = realloc(entry->readMetaBuffer, (size_t) entry->header.header.metadataSize); + if (buffer) { + entry->readMetaBuffer = buffer; + entry->readMetaBufferSize = entry->header.header.metadataSize; + L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, + entry->url, entry->readMetaBufferSize, entry->header.header.metadataSize); } } + msg.msg_iov[msg.msg_iovlen].iov_base = entry->readMetaBuffer; + msg.msg_iov[msg.msg_iovlen].iov_len = entry->header.header.metadataSize; + msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + msg.msg_iovlen++; } + if (entry->readFooterSize) { + msg.msg_iov[msg.msg_iovlen].iov_base = entry->readFooterBuffer; + msg.msg_iov[msg.msg_iovlen].iov_len = entry->readFooterSize; + msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + msg.msg_iovlen++; + } + } + int nofBytesInReadBuffer = 0; + if (ioctl(fd, FIONREAD, &nofBytesInReadBuffer)) { + L_ERROR("[TCP Socket] socket: %d, url: %s, cannot read nof bytes in socket read buffer \n", entry->fd, entry->url); + } + if (nofBytesInReadBuffer >= msgSize) { + nbytes = recvmsg(fd, &msg, MSG_NOSIGNAL); + } else { + // Not enough to read return the amount in the socket read buffer + nbytes = nofBytesInReadBuffer; } } - if (nbytes > 0) { - entry->retryCount = 0; - // Check if complete message is received - if ((entry->bufferReadSize >= entry->header.header.payloadSize) && validMsg) { - entry->bufferReadSize = 0; + if ((nbytes >= msgSize)&&(!entry->headerError)) { + bool valid = true; + if (entry->readFooterSize) { + if (handle->protocol->decodeFooter(handle->protocol->handle, + entry->readFooterBuffer, + entry->readFooterSize, + &entry->header) != CELIX_SUCCESS) { + + // Did not receive correct footer + L_ERROR( + "[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", + entry->header.header.seqNr, + entry->fd, + entry->url); + valid = false; + } + } + if (!entry->header.header.isLastSegment) { + // Not last Segment of message + valid = false; + } + + if (valid) { + // Complete message is received pubsub_tcpHandler_decodePayload(handle, entry); } - } else { - if (entry->retryCount < handle->maxRcvRetryCount) { + } + + if (nbytes > 0) { + entry->retryCount = 0; + } else if (nbytes < 0) { + if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { + // Non blocking interrupt + entry->retryCount = 0; + } else if (entry->retryCount < handle->maxRcvRetryCount) { entry->retryCount++; - L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd, - strerror(errno), entry->retryCount, handle->maxRcvRetryCount); + L_WARN( + "[TCP Socket] Failed to receive message (fd: %d), try again. error(%d): %s, Retry count %u of %u.", + entry->fd, errno, strerror(errno), entry->retryCount, handle->maxSendRetryCount); } else { L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxRcvRetryCount, strerror(errno)); @@ -926,10 +937,9 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { } } celixThreadRwlock_unlock(&handle->dbLock); - return nbytes; + return (int)nbytes; } - int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void *payload, pubsub_tcpHandler_processMessage_callback_t processMessageCallback) { int result = 0; @@ -964,55 +974,19 @@ int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle, v return result; } -static inline -int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, struct msghdr* msg, unsigned int size, int flag ) { - int nbytes = 0; - int msgSize = 0; - if (entry->fd >= 0 && size && msg->msg_iovlen) { - int expectedReadSize = size; - unsigned int offset = 0; - nbytes = size; - while (nbytes > 0 && expectedReadSize > 0) { - // Read the message header - nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL); - // Update admin - expectedReadSize -= nbytes; - msgSize += nbytes; - // Not all written - if (expectedReadSize && nbytes > 0) { - unsigned int readSize = 0; - unsigned int readIndex = 0; - unsigned int i = 0; - for (i = 0; i < msg->msg_iovlen; i++) { - if (nbytes < msg->msg_iov[i].iov_len) { - readIndex = i; - break; - } - readSize+= msg->msg_iov[i].iov_len; - } - msg->msg_iov = &msg->msg_iov[readIndex]; - msg->msg_iovlen -= readIndex; - char* buffer = (char*)msg->msg_iov->iov_base; - offset = nbytes - readSize; - msg->msg_iov->iov_base = &buffer[offset]; - msg->msg_iov->iov_len = msg->msg_iov->iov_len - offset; - } - } - } - if (nbytes <=0) msgSize = nbytes; - return msgSize; -} + // // Write large data to TCP. . // int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message_t *message, struct iovec *msgIoVec, size_t msg_iov_len, int flags) { - celixThreadRwlock_readLock(&handle->dbLock); int result = 0; int connFdCloseQueue[hashMap_size(handle->connection_fd_map)]; int nofConnToClose = 0; if (handle) { + celixThreadRwlock_writeLock(&handle->dbLock); hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map); + size_t max_msg_iov_len = IOV_MAX - 2; // header , footer, padding while (hashMapIterator_hasNext(&iter)) { psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter); if (!entry->connected) continue; @@ -1025,6 +999,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message payloadSize += msgIoVec[i].iov_len; } } + // When maxMsgSize is zero then payloadSize is disabled + if (!entry->maxMsgSize) payloadSize = 0; + message->header.convertEndianess = 0; message->header.payloadSize = payloadSize; message->header.payloadPartSize = payloadSize; @@ -1034,130 +1011,172 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message void *metadataData = NULL; size_t metadataSize = 0; if (message->metadata.metadata) { - metadataData = entry->metaBuffer; + metadataSize = entry->writeMetaBufferSize; + metadataData = entry->writeMetaBuffer; handle->protocol->encodeMetadata(handle->protocol->handle, message, &metadataData, &metadataSize); - entry->metaBufferSize = metadataSize; } + // When maxMsgSize is smaller then meta data is disabled + if (!entry->maxMsgSize || (metadataSize > entry->maxMsgSize)) metadataSize = 0; + message->header.metadataSize = metadataSize; + size_t totalMessageSize = payloadSize + metadataSize; + + bool isMessageSegmentationSupported = false; + handle->protocol->isMessageSegmentationSupported(handle->protocol->handle, &isMessageSegmentationSupported); + if (((!isMessageSegmentationSupported) && (msg_iov_len > max_msg_iov_len)) || + ((!isMessageSegmentationSupported) && (totalMessageSize > entry->maxMsgSize))) { + L_WARN("[TCP Socket] Failed to send message (fd: %d), Message segmentation is not supported\n", + entry->fd); + continue; + } void *footerData = NULL; size_t footerDataSize = 0; - if (entry->footerSize) { - footerData = entry->footerBuffer; + if (entry->writeFooterSize) { + footerDataSize = entry->writeFooterSize; + footerData = entry->writeFooterBuffer; handle->protocol->encodeFooter(handle->protocol->handle, message, &footerData, &footerDataSize); - entry->footerSize = footerDataSize; + entry->writeFooterSize = MAX(footerDataSize, entry->writeFooterSize); + if (footerData && entry->writeFooterBuffer != footerData) entry->writeFooterBuffer = footerData; } size_t msgSize = 0; - struct msghdr msg; - struct iovec msg_iov[IOV_MAX]; - memset(&msg, 0x00, sizeof(struct msghdr)); - msg.msg_name = &entry->addr; - msg.msg_namelen = entry->len; - msg.msg_flags = flags; - msg.msg_iov = msg_iov; - - // Write generic seralized payload in vector buffer - if (payloadSize && payloadData) { - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = payloadData; - msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; - } else { - // copy serialized vector into vector buffer - for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) { - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base; - msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + size_t msgPayloadSize = 0; + size_t msgMetaDataSize = 0; + size_t msgIovLen = 0; + long int nbytes = UINT32_MAX; + while (msgSize < totalMessageSize && nbytes > 0) { + struct msghdr msg; + struct iovec msg_iov[IOV_MAX]; + memset(&msg, 0x00, sizeof(struct msghdr)); + msg.msg_name = &entry->addr; + msg.msg_namelen = entry->len; + msg.msg_flags = flags; + msg.msg_iov = msg_iov; + size_t msgPartSize = 0; + message->header.payloadPartSize = 0; + message->header.payloadOffset = 0; + message->header.metadataSize = 0; + message->header.isLastSegment = 0; + + // Write generic seralized payload in vector buffer + if (msgPayloadSize < payloadSize) { + if (payloadSize && payloadData && entry->maxMsgSize) { + char *buffer = payloadData; + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadSize]; + msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadSize), entry->maxMsgSize); + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + message->header.payloadPartSize = msgPartSize; + message->header.payloadOffset = msgPayloadSize; + msgPayloadSize += message->header.payloadPartSize; + msgSize = msgPayloadSize; + } else { + // copy serialized vector into vector buffer + for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) { + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[msgIovLen + i].iov_base; + msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len; + if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize) { + msg.msg_iovlen--; + break; + } + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + } + msgIovLen += msg.msg_iovlen; + message->header.payloadOffset = msgPayloadSize; + message->header.payloadPartSize = msgPartSize; + msgPayloadSize += message->header.payloadPartSize; + msgSize = msgPayloadSize; + } } - } - // Write optional metadata in vector buffer - if (metadataSize && metadataData) { - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = metadataData; - msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; - } + // Write optional metadata in vector buffer + if ((msgPayloadSize >= payloadSize) && + (msgMetaDataSize < metadataSize) && + (msgPartSize < entry->maxMsgSize) && + (msg.msg_iovlen+1 < max_msg_iov_len-1) && + (metadataSize && metadataData)) { + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = metadataData; + msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize; + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + message->header.metadataSize = metadataSize; + msgSize += metadataSize; + } + if (msgSize >= totalMessageSize) { + message->header.isLastSegment = 0x1; + } - // Write optional footerData in vector buffer - if (footerData && footerDataSize) { - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = footerData; - msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; - } + // Write optional footerData in vector buffer + if (footerData && footerDataSize) { + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = footerData; + msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize; + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + } - void *headerData = NULL; - size_t headerSize = 0; - // check if header is not part of the payload (=> headerBufferSize = 0)s - if (entry->headerBufferSize) { - headerData = entry->headerBuffer; - // Encode the header, with payload size and metadata size - handle->protocol->encodeHeader(handle->protocol->handle, message, - &headerData, - &headerSize); - entry->headerBufferSize = headerSize; - } - if (!entry->headerBufferSize) { - // Skip header buffer, when header is part of payload; - msg.msg_iov = &msg_iov[1]; - } else if (headerSize && headerData) { - // Write header in 1st vector buffer item - msg.msg_iov[0].iov_base = headerData; - msg.msg_iov[0].iov_len = headerSize; - msgSize += msg.msg_iov[0].iov_len; - msg.msg_iovlen++; - } else { - L_ERROR("[TCP Socket] No header buffer is generated"); - msg.msg_iovlen = 0; - } - long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags); - // When a specific socket keeps reporting errors can indicate a subscriber - // which is not active anymore, the connection will remain until the retry - // counter exceeds the maximum retry count. - // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error. - if (nbytes == -1) { - if (entry->retryCount < handle->maxSendRetryCount) { - entry->retryCount++; - L_ERROR( - "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ", - entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount); + void *headerData = NULL; + size_t headerSize = entry->writeHeaderBufferSize; + // check if header is not part of the payload (=> headerBufferSize = 0)s + if (entry->writeHeaderBufferSize) { + headerSize = entry->writeHeaderBufferSize; + headerData = entry->writeHeaderBuffer; + // Encode the header, with payload size and metadata size + handle->protocol->encodeHeader(handle->protocol->handle, message, + &headerData, + &headerSize); + entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize); + if (headerData && entry->writeHeaderBuffer != headerData) entry->writeHeaderBuffer = headerData; + } + if (!entry->writeHeaderBufferSize) { + // Skip header buffer, when header is part of payload; + msg.msg_iov = &msg_iov[1]; + } else if (headerSize && headerData) { + // Write header in 1st vector buffer item + msg.msg_iov[0].iov_base = headerData; + msg.msg_iov[0].iov_len = headerSize; + msgPartSize += msg.msg_iov[0].iov_len; + msg.msg_iovlen++; } else { - L_ERROR( - "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", - entry->fd, handle->maxSendRetryCount, strerror(errno)); - connFdCloseQueue[nofConnToClose++] = entry->fd; + L_ERROR("[TCP Socket] No header buffer is generated"); + msg.msg_iovlen = 0; } - result = -1; //At least one connection failed sending - } else if (msgSize) { - entry->retryCount = 0; - if (nbytes != msgSize) { - L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno)); + nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL); + // When a specific socket keeps reporting errors can indicate a subscriber + // which is not active anymore, the connection will remain until the retry + // counter exceeds the maximum retry count. + // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error. + if (nbytes == -1) { + if (entry->retryCount < handle->maxSendRetryCount) { + entry->retryCount++; + L_ERROR( + "[TCP Socket] Failed to send message (fd: %d), try again. Retry count %u of %u, error(%d): %s.", + entry->fd, entry->retryCount, handle->maxSendRetryCount, errno, strerror(errno)); + } else { + L_ERROR( + "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno)); + connFdCloseQueue[nofConnToClose++] = entry->fd; + } + result = -1; //At least one connection failed sending + } else if (msgPartSize) { + entry->retryCount = 0; + if (nbytes != msgPartSize) { + L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno)); + } + } + // Note: serialized Payload is deleted by serializer + if (payloadData && (payloadData != message->payload.payload)) { + free(payloadData); } - } - // Release data - if (headerData && headerData != entry->headerBuffer) { - free(headerData); - } - // Note: serialized Payload is deleted by serializer - if (payloadData && (payloadData != message->payload.payload)) { - free(payloadData); - } - if (metadataData && metadataData != entry->metaBuffer) { - free(metadataData); - } - if (footerData && footerData != entry->footerBuffer) { - free(footerData); } } + celixThreadRwlock_unlock(&handle->dbLock); } - celixThreadRwlock_unlock(&handle->dbLock); //Force close all connections that are queued in a list, done outside of locking handle->dbLock to prevent deadlock for (int i = 0; i < nofConnToClose; i++) { pubsub_tcpHandler_close(handle, connFdCloseQueue[i]); @@ -1216,7 +1235,8 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t *handle, psa_tcp_connect #else struct epoll_event event; bzero(&event, sizeof(event)); // zero the struct - event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR; + event.events = EPOLLRDHUP | EPOLLERR; + if (handle->isEndPoint) event.events |= EPOLLIN; event.data.fd = entry->fd; // Register Read to epoll rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event); diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h index ed4581c..a08911c 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h @@ -58,15 +58,14 @@ int pubsub_tcpHandler_close(pubsub_tcpHandler_t *handle, int fd); int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url); int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url); int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url); - -int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle, - unsigned int maxNofBuffers, - unsigned int bufferSize); +int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size); +int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size); void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout); void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count); void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count); void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout); void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout); +void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle, bool isEndPoint); int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd); int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c index 4fa4586..533e773 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c @@ -38,6 +38,7 @@ #include <uuid/uuid.h> #include <pubsub_admin_metrics.h> #include <pubsub_utils.h> +#include "pubsub_interceptors_handler.h" #include <celix_api.h> #define MAX_EPOLL_EVENTS 16 @@ -67,6 +68,7 @@ struct pubsub_tcp_topic_receiver { bool metricsEnabled; pubsub_tcpHandler_t *socketHandler; pubsub_tcpHandler_t *sharedSocketHandler; + pubsub_interceptors_handler_t *interceptorsHandler; struct { celix_thread_t thread; @@ -164,6 +166,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context const char *staticServerEndPointUrls = NULL; const char *staticConnectUrls = NULL; + pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler); staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope); if (topicProperties != NULL) { @@ -213,14 +216,11 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT); double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT); - long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS, - PSA_TCP_DEFAULT_MAX_RECV_SESSIONS); - long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE, + long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE, PSA_TCP_DEFAULT_RECV_BUFFER_SIZE); long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT); pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope); - pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, (unsigned int) sessions, - (unsigned int) buffer_size); + pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize); pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout); pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg); pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler, @@ -559,31 +559,48 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs } if (status == CELIX_SUCCESS) { - hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices); + const char *msgType = msgSer->msgName; + uint32_t msgId = message->header.msgId; + celix_properties_t *metadata = message->metadata.metadata; + bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata); bool release = true; - while (hashMapIterator_hasNext(&iter)) { - pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); - svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, - &release); - if (!release && hashMapIterator_hasNext(&iter)) { - //receive function has taken ownership and still more receive function to come .. - //deserialize again for new message - status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg); - if (status != CELIX_SUCCESS) { - L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, - receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); - break; + if (cont) { + hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices); + while (hashMapIterator_hasNext(&iter)) { + pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); + svc->receive(svc->handle, + msgSer->msgName, + msgSer->msgId, + deSerializedMsg, + message->metadata.metadata, + &release); + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, + msgType, + msgId, + deSerializedMsg, + metadata); + if (!release && hashMapIterator_hasNext(&iter)) { + //receive function has taken ownership and still more receive function to come .. + //deserialize again for new message + status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg); + if (status != CELIX_SUCCESS) { + L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", + msgSer->msgName, + receiver->scope == NULL ? "(null)" : receiver->scope, + receiver->topic); + break; + } + release = true; } - release = true; } + if (release) { + msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg); + } + if (message->metadata.metadata) { + celix_properties_destroy(message->metadata.metadata); + } + updateReceiveCount += 1; } - if (release) { - msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg); - } - if (message->metadata.metadata) { - celix_properties_destroy(message->metadata.metadata); - } - updateReceiveCount += 1; } else { updateSerError += 1; L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName, @@ -650,10 +667,7 @@ static void *psa_tcp_recvThread(void *data) { pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) { pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result)); - snprintf(result->scope, - PUBSUB_AMDIN_METRICS_NAME_MAX, - "%s", - receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope); + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope); snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic); int msgTypesCount = 0; @@ -677,8 +691,7 @@ pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics); while (hashMapIterator_hasNext(&iter2)) { hash_map_t *origins = hashMapIterator_nextValue(&iter2); - result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), - sizeof(*(result->msgTypes[i].origins))); + result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), sizeof(*(result->msgTypes[i].origins))); result->msgTypes[i].nrOfOrigins = hashMap_size(origins); int k = 0; hash_map_iterator_t iter3 = hashMapIterator_construct(origins); diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c index dbb5e26..f7598f9 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c @@ -35,6 +35,7 @@ #include "celix_constants.h" #include <signal.h> #include <pubsub_utils.h> +#include "pubsub_interceptors_handler.h" #define FIRST_SEND_DELAY_IN_SECONDS 2 #define TCP_BIND_MAX_RETRY 10 @@ -59,6 +60,7 @@ struct pubsub_tcp_topic_sender { bool metricsEnabled; pubsub_tcpHandler_t *socketHandler; pubsub_tcpHandler_t *sharedSocketHandler; + pubsub_interceptors_handler_t *interceptorsHandler; char *scope; char *topic; @@ -146,6 +148,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( } sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED); + pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler); bool isEndpoint = false; char *urls = NULL; const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL); @@ -202,10 +205,13 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT : PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT); + long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE); pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope); pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched); pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt); pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout); + pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize); + pubsub_tcpHandler_setEndPoint(sender->socketHandler, isEndpoint); } //setting up tcp socket for TCP TopicSender @@ -312,6 +318,7 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) { celixThreadMutex_unlock(&sender->boundedServices.mutex); celixThreadMutex_destroy(&sender->boundedServices.mutex); + pubsubInterceptorsHandler_destroy(sender->interceptorsHandler); if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) { pubsub_tcpHandler_destroy(sender->socketHandler); sender->socketHandler = NULL; @@ -533,7 +540,8 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i clock_gettime(CLOCK_REALTIME, &serializationEnd); } - if (status == CELIX_SUCCESS /*ser ok*/) { + bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata); + if (status == CELIX_SUCCESS /*ser ok*/ && cont) { pubsub_protocol_message_t message; message.metadata.metadata = NULL; message.payload.payload = NULL; @@ -561,6 +569,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i status = -1; sendOk = false; } + pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata); if (message.metadata.metadata) celix_properties_destroy(message.metadata.metadata); if (serializedIoVecOutput) { diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h index 87d4263..b10863c 100644 --- a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h +++ b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h @@ -28,16 +28,16 @@ typedef struct pubsub_utils_url { char *url; char *protocol; char *hostname; - unsigned int portnr; + unsigned int port_nr; char *uri; char *interface; - unsigned int interface_portnr; + unsigned int interface_port_nr; char *interface_url; } pubsub_utils_url_t; struct sockaddr_in *pubsub_utils_url_from_fd(int fd); -struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port); -char *pubsub_utils_url_generate_url(char *hostname, unsigned int portnr, char *protocol); +struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port); +char *pubsub_utils_url_generate_url(char *hostname, unsigned int port_nr, char *protocol); char *pubsub_utils_url_get_url(struct sockaddr_in *inp, char *protocol); bool pubsub_utils_url_is_multicast(char *hostname); char *pubsub_utils_url_get_multicast_ip(char *hostname); diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c index d8d518c..65a1ff2 100644 --- a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c +++ b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c @@ -56,7 +56,7 @@ struct sockaddr_in *pubsub_utils_url_from_fd(int fd) { return inp; } -struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port) { +struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port) { struct hostent *hp; struct sockaddr_in *inp = malloc(sizeof(struct sockaddr_in)); bzero(inp, sizeof(struct sockaddr_in)); // zero the struct @@ -220,11 +220,11 @@ void pubsub_utils_url_parse_url(char *_url, pubsub_utils_url_t *url_info) { maxPortnr += 1; unsigned int minDigits = (unsigned int) atoi(portnr); unsigned int maxDigits = (unsigned int) atoi(maxPortnr); - url_info->portnr = pubsub_utils_url_rand_range(minDigits, maxDigits); + url_info->port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits); } else { unsigned int portDigits = (unsigned int) atoi(portnr); if (portDigits != 0) - url_info->portnr = portDigits; + url_info->port_nr = portDigits; uri = strstr(port, "/"); if ((uri) && (!url_info->uri)) url_info->uri = celix_utils_strdup(uri); @@ -256,11 +256,11 @@ void pubsub_utils_url_parse_url(char *_url, pubsub_utils_url_t *url_info) { maxPortnr += 1; unsigned int minDigits = (unsigned int) atoi(portnr); unsigned int maxDigits = (unsigned int) atoi(maxPortnr); - url_info->interface_portnr = pubsub_utils_url_rand_range(minDigits, maxDigits); + url_info->interface_port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits); } else { unsigned int portDigits = (unsigned int) atoi(portnr); if (portDigits != 0) - url_info->interface_portnr = portDigits; + url_info->interface_port_nr = portDigits; uri = strstr(port, "/"); if ((uri) && (!url_info->uri)) url_info->uri = celix_utils_strdup(uri); @@ -289,13 +289,13 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) { free(url_info->interface); url_info->interface = ip; } - struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_portnr); + struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_port_nr); url_info->interface_url = pubsub_utils_url_get_url(m_sin, NULL); free(m_sin); pubsub_utils_url_parse_url(url_info->interface_url, &interface_url_info); free(url_info->interface); url_info->interface = interface_url_info.hostname; - url_info->interface_portnr = interface_url_info.portnr; + url_info->interface_port_nr = interface_url_info.port_nr; } if (url_info->hostname) { @@ -306,11 +306,11 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) { free(url_info->hostname); url_info->hostname = ip; } - struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr); + struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr); url_info->url = pubsub_utils_url_get_url(sin, url_info->protocol); free(url_info->hostname); free(sin); - url_info->portnr = 0; + url_info->port_nr = 0; url_info->hostname = NULL; pubsub_utils_url_parse_url(url_info->url, url_info); } @@ -338,7 +338,7 @@ void pubsub_utils_url_free(pubsub_utils_url_t *url_info) { url_info->hostname = NULL; url_info->protocol = NULL; url_info->interface = NULL; - url_info->portnr = 0; - url_info->interface_portnr = 0; + url_info->port_nr = 0; + url_info->interface_port_nr = 0; free(url_info); }
