This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2 in repository https://gitbox.apache.org/repos/asf/celix.git
commit 4cfc6ecaf6a8055bd2c514cfc11f93426aac326b Author: Roy Bulter <[email protected]> AuthorDate: Mon Aug 3 16:52:22 2020 +0200 Fix --- .../src/pubsub_psa_tcp_constants.h | 2 +- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 369 ++++++--------------- .../pubsub/pubsub_utils/include/pubsub_utils_url.h | 8 +- bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c | 22 +- 4 files changed, 122 insertions(+), 279 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h index 9551c5b..9f03d13 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h @@ -31,7 +31,7 @@ #define PSA_TCP_DEFAULT_BASE_PORT 5501 #define PSA_TCP_DEFAULT_MAX_PORT 6000 -#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE 0 +#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE UINT32_MAX #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE 65 * 1024 #define PSA_TCP_DEFAULT_TIMEOUT 2000 // 2 seconds #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index aae0406..411e9d2 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -32,17 +32,13 @@ #include <array_list.h> #include <pthread.h> #if defined(__APPLE__) -#include <sys/types.h> #include <sys/event.h> #include <sys/time.h> #else #include <sys/epoll.h> #endif #include <limits.h> -#include <assert.h> #include "ctype.h" -#include <netdb.h> -#include <signal.h> #include <fcntl.h> #include <arpa/inet.h> #include <netinet/tcp.h> @@ -53,12 +49,13 @@ #define MAX_EVENTS 64 #define MAX_DEFAULT_BUFFER_SIZE 4u -#define READ_STATE_HEADER 0u -#define READ_STATE_PAYLOAD 1u -#define READ_STATE_META 2u -#define READ_STATE_FOOTER 3u -#define READ_STATE_READY 4u -#define READ_STATE_SYNC 5u +#define READ_STATE_INIT 0u +#define READ_STATE_HEADER 1u +#define READ_STATE_PAYLOAD 2u +#define READ_STATE_META 3u +#define READ_STATE_FOOTER 4u +#define READ_STATE_READY 5u +#define READ_STATE_SYNC 6u #if defined(__APPLE__) #define MSG_NOSIGNAL (0) @@ -95,7 +92,6 @@ typedef struct psa_tcp_connection_entry { void *footerBuffer; unsigned int bufferSize; void *buffer; - unsigned int bufferReadSize; unsigned int bufferReadReadOffset; unsigned int expectedBufferReadSize; unsigned int msgSizeReadSize; @@ -142,8 +138,7 @@ static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url, struct sockaddr_in *addr); static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry); static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index); -static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag ); -static inline int pubsub_tcpHandler_readSocket_(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, int flag ); +static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, int flag ); static inline void pubsub_tcpHandler_setReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int nextState); static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry); @@ -264,7 +259,7 @@ int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, char *url) { L_ERROR("[TCP Socket] Error setsockopt (SO_RCVTIMEO) to set send timeout: %s", strerror(errno)); } } - struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr); + struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr); if (addr) { rc = bind(fd, (struct sockaddr *) addr, sizeof(struct sockaddr)); if (rc != 0) { @@ -342,6 +337,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch } if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize); if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize); + pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); } return entry; } @@ -418,11 +414,11 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { socklen_t len = sizeof(sin); getsockname(fd, (struct sockaddr *) &sin, &len); char *interface_url = pubsub_utils_url_get_url(&sin, NULL); - struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr); + struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr); if ((rc >= 0) && addr) { rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr)); if (rc < 0 && errno != EINPROGRESS) { - L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname, url_info->portnr, interface_url, + L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname, url_info->port_nr, interface_url, strerror(errno)); close(fd); } else { @@ -449,13 +445,12 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { L_ERROR("[TCP Socket] Cannot create poll event %s\n", strerror(errno)); entry = NULL; } - //rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd); - //if (rc < 0) { - // pubsub_tcpHandler_freeEntry(entry); - // L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno)); - // entry = NULL; - // } - + rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd); + if (rc < 0) { + pubsub_tcpHandler_freeEntry(entry); + L_ERROR("[TCP Socket] Cannot make not blocking %s\n", strerror(errno)); + entry = NULL; + } } if ((rc >= 0) && (entry)) { celixThreadRwlock_writeLock(&handle->dbLock); @@ -657,6 +652,7 @@ int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int si if (handle != NULL) { celixThreadRwlock_writeLock(&handle->dbLock); handle->maxMsgSize = size; + handle->maxMsgSize = 4; celixThreadRwlock_unlock(&handle->dbLock); } return 0; @@ -727,7 +723,7 @@ void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio, if (prio > 0 && prio < 100) { struct sched_param sch; bzero(&sch, sizeof(struct sched_param)); - sch.sched_priority = prio; + sch.sched_priority = (int)prio; pthread_setschedparam(handle->thread.thread, policy, &sch); } else { L_INFO("Skipping configuration of thread prio to %i and thread " @@ -772,26 +768,8 @@ void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim } static inline -int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) { - int expectedReadSize = size; - int nbytes = size; - int msgSize = 0; - char* buffer = (char*)_buffer; - while (nbytes > 0 && expectedReadSize > 0) { - // Read the message header - nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL); - // Update buffer administration - offset += nbytes; - expectedReadSize -= nbytes; - msgSize += nbytes; - } - if (nbytes <=0) msgSize = nbytes; - return msgSize; -} - -static inline -int pubsub_tcpHandler_readSocket_(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, int flag ) { - int nbytes = entry->expectedBufferReadSize; +int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, int flag ) { + int long nbytes = entry->expectedBufferReadSize; char* buffer = (char*)_buffer; while (nbytes > 0 && entry->expectedBufferReadSize > 0) { // Read the message header @@ -800,9 +778,6 @@ int pubsub_tcpHandler_readSocket_(pubsub_tcpHandler_t *handle, psa_tcp_connectio entry->bufferReadReadOffset += nbytes; entry->expectedBufferReadSize-= nbytes; } - if (nbytes == 0) { - L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd, strerror(errno)); - } return nbytes; } @@ -837,7 +812,7 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connec if (entry->header.header.payloadSize) { entry->state = READ_STATE_PAYLOAD; entry->bufferReadReadOffset = entry->header.header.payloadOffset; - entry->expectedBufferReadSize = entry->header.header.payloadSize; + entry->expectedBufferReadSize = (entry->header.header.payloadPartSize) ? entry->header.header.payloadPartSize : entry->header.header.payloadSize; // For header less messages adjust offset and msg size; if (!entry->headerBufferSize) { entry->bufferReadReadOffset += entry->headerSize; @@ -910,6 +885,7 @@ static inline void pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHand + // // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure // If the message is completely reassembled true is returned and the index and size have valid values @@ -932,182 +908,49 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { // Message buffer is to small, reallocate to make it bigger if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) { - handle->bufferSize = MAX(handle->bufferSize, entry->headerSize ); - if (entry->buffer) free(entry->buffer); + handle->bufferSize = MAX(handle->bufferSize, entry->headerSize); + if (entry->buffer) + free(entry->buffer); entry->buffer = malloc((size_t) handle->bufferSize); entry->bufferSize = handle->bufferSize; } // Read the message - bool validMsg = false; - char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer; - int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK); - if (nbytes > 0) { - // Check header message buffer - if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) { - // Did not receive correct header - // skip sync word and try to read next header - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0); - if (!entry->headerError) { - L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); - } - entry->headerError = true; - entry->bufferReadSize = 0; - } else { - // Read header message from queue - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0); - if ((nbytes > 0) && (nbytes == entry->headerSize)) { - entry->headerError = false; - // For headerless message, add header to bufferReadSize; - if (!entry->headerBufferSize) - entry->bufferReadSize += nbytes; - // Alloc message buffers - if (entry->header.header.payloadSize > entry->bufferSize) { - handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize); - if (entry->buffer) - free(entry->buffer); - entry->buffer = malloc((size_t) handle->bufferSize); - entry->bufferSize = handle->bufferSize; - } - if (entry->header.header.metadataSize > entry->metaBufferSize) { - if (entry->metaBuffer) { - free(entry->metaBuffer); - } - entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize); - entry->metaBufferSize = entry->header.header.metadataSize; - L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd, - entry->url, entry->metaBufferSize, entry->header.header.metadataSize); - } - - if (entry->header.header.payloadSize) { - unsigned int offset = entry->header.header.payloadOffset; - unsigned int size = entry->header.header.payloadPartSize; - // For header less messages adjust offset and msg size; - if (!entry->headerBufferSize) { - offset = entry->headerSize; - size -= offset; - } - // Read payload data from queue - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0); - if (nbytes > 0) { - if (nbytes == size) { - entry->bufferReadSize += nbytes; - } else { - entry->bufferReadSize = 0; - L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); - } - } - } - if (nbytes > 0 && entry->header.header.metadataSize) { - // Read meta data from queue - unsigned int size = entry->header.header.metadataSize; - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0); - if ((nbytes > 0) && (nbytes != size)) { - L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); - } - } - // Check for end of message using, footer of message. Because of streaming protocol - if (nbytes > 0) { - if (entry->footerSize > 0) { - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0); - if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) { - // valid footer, this means that the message is valid - validMsg = true; - } else { - // Did not receive correct header - L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url); - entry->bufferReadSize = 0; - } - } else { - // No Footer, then complete message is received - validMsg = true; - } - } - } - } - } - if (nbytes > 0) { - entry->retryCount = 0; - // Check if complete message is received - if ((entry->bufferReadSize >= entry->header.header.payloadSize) && validMsg) { - entry->bufferReadSize = 0; - pubsub_tcpHandler_decodePayload(handle, entry); - } - } else { - if (entry->retryCount < handle->maxRcvRetryCount) { - entry->retryCount++; - L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd, - strerror(errno), entry->retryCount, handle->maxRcvRetryCount); - } else { - L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s", - entry->fd, handle->maxRcvRetryCount, strerror(errno)); - nbytes = 0; //Return 0 as indicator to close the connection - } - } - celixThreadRwlock_unlock(&handle->dbLock); - return nbytes; -} - - -// -// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure -// If the message is completely reassembled true is returned and the index and size have valid values -// -int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) { - celixThreadRwlock_writeLock(&handle->dbLock); - psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd); - if (entry == NULL) - entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); - // Find FD entry - if (entry == NULL) { - celixThreadRwlock_unlock(&handle->dbLock); - return -1; - } - // If it's not connected return from function - if (!entry->connected) { - celixThreadRwlock_unlock(&handle->dbLock); - return -1; - } - - // Message buffer is to small, reallocate to make it bigger - if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) { - handle->bufferSize = MAX(handle->bufferSize, entry->headerSize ); - if (entry->buffer) free(entry->buffer); - entry->buffer = malloc((size_t) handle->bufferSize); - entry->bufferSize = handle->bufferSize; - } - // Read the message - long int nbytes = 0; - char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer; + long int nbytes = UINT32_MAX; + char *header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer; if (entry->state == READ_STATE_SYNC) { - nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, 0); + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0); if (nbytes > 0) { pubsub_tcpHandler_setReadStateMachine(handle, entry); } } if (entry->state == READ_STATE_HEADER) { - nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, MSG_PEEK); - if (nbytes >= entry->headerSize) { // Check header message buffer - if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) { - // Did not receive correct header - // skip sync word and try to read next header - if (!entry->headerError) { - L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); - } - entry->headerError = true; - pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_SYNC); - } else { - // Read header message from queue - pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); - nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, header_buffer, 0); - if ((nbytes > 0) && (nbytes == entry->headerSize)) { - entry->headerError = false; - entry->msgSizeReadSize = 0; - // For headerless message, add header to bufferReadSize; - if (!entry->headerBufferSize) entry->msgSizeReadSize += nbytes; - pubsub_tcpHandler_setReadStateMachine(handle, entry); - } - } - } + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, MSG_PEEK); + if (nbytes >= entry->headerSize) { // Check header message buffer + if (handle->protocol->decodeHeader(handle->protocol->handle, + header_buffer, + entry->headerSize, + &entry->header) != CELIX_SUCCESS) { + // Did not receive correct header + // skip sync word and try to read next header + if (!entry->headerError) { + L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); + } + entry->headerError = true; + pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_SYNC); + } else { + // Read header message from queue + pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0); + if ((nbytes > 0) && (nbytes == entry->headerSize)) { + entry->headerError = false; + entry->msgSizeReadSize = 0; + // For headerless message, add header to bufferReadSize; + if (!entry->headerBufferSize) + entry->msgSizeReadSize += nbytes; + pubsub_tcpHandler_setReadStateMachine(handle, entry); + } + } + } } if (entry->state == READ_STATE_PAYLOAD) { @@ -1121,7 +964,7 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) { } // Read payload data from queue - nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->buffer, 0); + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, 0); if (nbytes > 0) { if (nbytes >= entry->header.header.payloadPartSize) { entry->msgSizeReadSize += nbytes; @@ -1142,7 +985,7 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) { } // Read meta data from (queue - nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->metaBuffer,0); + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0); if ((nbytes > 0) && (nbytes >= entry->header.header.metadataSize)) { entry->msgSizeReadSize += nbytes; pubsub_tcpHandler_setReadStateMachine(handle, entry); @@ -1150,7 +993,7 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) { } if (entry->state == READ_STATE_FOOTER) { // Check for end of message using, footer of message. Because of streaming protocol - nbytes = pubsub_tcpHandler_readSocket_(handle, entry, fd, entry->footerBuffer, 0); + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0); if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) { // valid footer, this means that the message is valid pubsub_tcpHandler_setReadStateMachine(handle, entry); @@ -1174,8 +1017,9 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) { entry->retryCount = 0; } else if (entry->retryCount < handle->maxRcvRetryCount) { entry->retryCount++; - L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd, - strerror(errno), entry->retryCount, handle->maxRcvRetryCount); + L_WARN( + "[TCP Socket] Failed to receive message (fd: %d), try again. error(%d): %s, Retry count %u of %u.", + entry->fd, errno, strerror(errno), entry->retryCount, handle->maxSendRetryCount); } else { L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxRcvRetryCount, strerror(errno)); @@ -1183,7 +1027,7 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) { } } celixThreadRwlock_unlock(&handle->dbLock); - return nbytes; + return (int)nbytes; } @@ -1421,8 +1265,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message if (entry->retryCount < handle->maxSendRetryCount) { entry->retryCount++; L_ERROR( - "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ", - entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount); + "[TCP Socket] Failed to send message (fd: %d), try again. Retry count %u of %u, error(%d): %s.", + entry->fd, entry->retryCount, handle->maxSendRetryCount, errno, strerror(errno)); } else { L_ERROR( "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno)); @@ -1560,49 +1404,48 @@ void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd) { // static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle) { - int rc = 0; - if (handle->efd >= 0) { - int nof_events = 0; - // Wait for events. - struct kevent events[MAX_EVENTS]; - struct timespec ts = {handle->timeout / 1000, (handle->timeout % 1000) * 1000000}; - nof_events = kevent (handle->efd, NULL, 0, &events[0], MAX_EVENTS, handle->timeout ? &ts : NULL); - if (nof_events < 0) { - if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { - } else - L_ERROR("[TCP Socket] Cannot create poll wait (%d) %s\n", nof_events, strerror(errno)); - } - for (int i = 0; i < nof_events; i++) { - hash_map_iterator_t iter = hashMapIterator_construct(handle->interface_fd_map); - psa_tcp_connection_entry_t *pendingConnectionEntry = NULL; - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter); - if (events[i].ident == entry->fd) - pendingConnectionEntry = entry; - } - if (pendingConnectionEntry) { - int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry); - pubsub_tcpHandler_connectionHandler(handle, fd); - } else if (events[i].filter & EVFILT_READ) { - int rc = pubsub_tcpHandler_read(handle, events[i].ident); - if (rc == 0) pubsub_tcpHandler_close(handle, events[i].ident); - } else if (events[i].flags & EV_EOF) { - int err = 0; - socklen_t len = sizeof(int); - rc = getsockopt(events[i].ident, SOL_SOCKET, SO_ERROR, &err, &len); - if (rc != 0) { - L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno)); - continue; + int rc = 0; + if (handle->efd >= 0) { + int nof_events = 0; + // Wait for events. + struct kevent events[MAX_EVENTS]; + struct timespec ts = {handle->timeout / 1000, (handle->timeout % 1000) * 1000000}; + nof_events = kevent (handle->efd, NULL, 0, &events[0], MAX_EVENTS, handle->timeout ? &ts : NULL); + if (nof_events < 0) { + if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { + } else + L_ERROR("[TCP Socket] Cannot create poll wait (%d) %s\n", nof_events, strerror(errno)); + } + for (int i = 0; i < nof_events; i++) { + hash_map_iterator_t iter = hashMapIterator_construct(handle->interface_fd_map); + psa_tcp_connection_entry_t *pendingConnectionEntry = NULL; + while (hashMapIterator_hasNext(&iter)) { + psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter); + if (events[i].ident == entry->fd) + pendingConnectionEntry = entry; + } + if (pendingConnectionEntry) { + int fd = pubsub_tcpHandler_acceptHandler(handle, pendingConnectionEntry); + pubsub_tcpHandler_connectionHandler(handle, fd); + } else if (events[i].filter & EVFILT_READ) { + rc = pubsub_tcpHandler_read(handle, events[i].ident); + if (rc == 0) pubsub_tcpHandler_close(handle, events[i].ident); + } else if (events[i].flags & EV_EOF) { + int err = 0; + socklen_t len = sizeof(int); + rc = getsockopt(events[i].ident, SOL_SOCKET, SO_ERROR, &err, &len); + if (rc != 0) { + L_ERROR("[TCP Socket]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno)); + continue; + } + pubsub_tcpHandler_close(handle, events[i].ident); + } else if (events[i].flags & EV_ERROR) { + L_ERROR("[TCP Socket]:EPOLLERR ERROR read from socket %s\n", strerror(errno)); + pubsub_tcpHandler_close(handle, events[i].ident); + continue; + } } - pubsub_tcpHandler_close(handle, events[i].ident); - } else if (events[i].flags & EV_ERROR) { - L_ERROR("[TCP Socket]:EPOLLERR ERROR read from socket %s\n", strerror(errno)); - pubsub_tcpHandler_close(handle, events[i].ident); - continue; - } } - } - return; } #else diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h index 87d4263..b10863c 100644 --- a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h +++ b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h @@ -28,16 +28,16 @@ typedef struct pubsub_utils_url { char *url; char *protocol; char *hostname; - unsigned int portnr; + unsigned int port_nr; char *uri; char *interface; - unsigned int interface_portnr; + unsigned int interface_port_nr; char *interface_url; } pubsub_utils_url_t; struct sockaddr_in *pubsub_utils_url_from_fd(int fd); -struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port); -char *pubsub_utils_url_generate_url(char *hostname, unsigned int portnr, char *protocol); +struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port); +char *pubsub_utils_url_generate_url(char *hostname, unsigned int port_nr, char *protocol); char *pubsub_utils_url_get_url(struct sockaddr_in *inp, char *protocol); bool pubsub_utils_url_is_multicast(char *hostname); char *pubsub_utils_url_get_multicast_ip(char *hostname); diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c index d8d518c..65a1ff2 100644 --- a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c +++ b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c @@ -56,7 +56,7 @@ struct sockaddr_in *pubsub_utils_url_from_fd(int fd) { return inp; } -struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port) { +struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port) { struct hostent *hp; struct sockaddr_in *inp = malloc(sizeof(struct sockaddr_in)); bzero(inp, sizeof(struct sockaddr_in)); // zero the struct @@ -220,11 +220,11 @@ void pubsub_utils_url_parse_url(char *_url, pubsub_utils_url_t *url_info) { maxPortnr += 1; unsigned int minDigits = (unsigned int) atoi(portnr); unsigned int maxDigits = (unsigned int) atoi(maxPortnr); - url_info->portnr = pubsub_utils_url_rand_range(minDigits, maxDigits); + url_info->port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits); } else { unsigned int portDigits = (unsigned int) atoi(portnr); if (portDigits != 0) - url_info->portnr = portDigits; + url_info->port_nr = portDigits; uri = strstr(port, "/"); if ((uri) && (!url_info->uri)) url_info->uri = celix_utils_strdup(uri); @@ -256,11 +256,11 @@ void pubsub_utils_url_parse_url(char *_url, pubsub_utils_url_t *url_info) { maxPortnr += 1; unsigned int minDigits = (unsigned int) atoi(portnr); unsigned int maxDigits = (unsigned int) atoi(maxPortnr); - url_info->interface_portnr = pubsub_utils_url_rand_range(minDigits, maxDigits); + url_info->interface_port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits); } else { unsigned int portDigits = (unsigned int) atoi(portnr); if (portDigits != 0) - url_info->interface_portnr = portDigits; + url_info->interface_port_nr = portDigits; uri = strstr(port, "/"); if ((uri) && (!url_info->uri)) url_info->uri = celix_utils_strdup(uri); @@ -289,13 +289,13 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) { free(url_info->interface); url_info->interface = ip; } - struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_portnr); + struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_port_nr); url_info->interface_url = pubsub_utils_url_get_url(m_sin, NULL); free(m_sin); pubsub_utils_url_parse_url(url_info->interface_url, &interface_url_info); free(url_info->interface); url_info->interface = interface_url_info.hostname; - url_info->interface_portnr = interface_url_info.portnr; + url_info->interface_port_nr = interface_url_info.port_nr; } if (url_info->hostname) { @@ -306,11 +306,11 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) { free(url_info->hostname); url_info->hostname = ip; } - struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr); + struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr); url_info->url = pubsub_utils_url_get_url(sin, url_info->protocol); free(url_info->hostname); free(sin); - url_info->portnr = 0; + url_info->port_nr = 0; url_info->hostname = NULL; pubsub_utils_url_parse_url(url_info->url, url_info); } @@ -338,7 +338,7 @@ void pubsub_utils_url_free(pubsub_utils_url_t *url_info) { url_info->hostname = NULL; url_info->protocol = NULL; url_info->interface = NULL; - url_info->portnr = 0; - url_info->interface_portnr = 0; + url_info->port_nr = 0; + url_info->interface_port_nr = 0; free(url_info); }
