This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/fix_tcp_endpoint_handling in repository https://gitbox.apache.org/repos/asf/celix.git
commit 79884e3ced271df9c882360e727bb58cb83839e9 Author: Roy Bulter <[email protected]> AuthorDate: Wed Apr 29 11:28:54 2020 +0200 Fix TCP endpoint handling --- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 36 ++++++++++++++-------- .../src/pubsub_tcp_topic_receiver.c | 15 +++++---- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 5144e01..3ac3c04 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -412,23 +412,23 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { int fd = pubsub_tcpHandler_open(handle, url_info->interface_url); rc = fd; // Connect to sender + struct sockaddr_in sin; + socklen_t len = sizeof(sin); + getsockname(fd, (struct sockaddr *) &sin, &len); + char *interface_url = pubsub_utils_url_get_url(&sin, NULL); struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr); if ((rc >= 0) && addr) { rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr)); if (rc < 0 && errno != EINPROGRESS) { - L_ERROR("[TCP Socket] Cannot connect to %s:%d: err: %s\n", url_info->hostname, url_info->portnr, + L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname, url_info->portnr, interface_url, strerror(errno)); close(fd); } else { - struct sockaddr_in sin; - socklen_t len = sizeof(sin); - rc = getsockname(fd, (struct sockaddr *) &sin, &len); - char *interface_url = pubsub_utils_url_get_url(&sin, NULL); entry = pubsub_tcpHandler_createEntry(handle, fd, url, interface_url, &sin); - free(interface_url); } free(addr); } + free(interface_url); // Subscribe File Descriptor to epoll if ((rc >= 0) && (entry)) { #if defined(__APPLE__) @@ -454,6 +454,7 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) { hashMap_put(handle->connection_fd_map, (void *) (intptr_t) entry->fd, entry); celixThreadRwlock_unlock(&handle->dbLock); pubsub_tcpHandler_connectionHandler(handle, fd); + L_INFO("[TCP Socket] Connect to %s using; %s err: %s\n", entry->url, entry->interface_url); } pubsub_utils_url_free(url_info); } @@ -1018,19 +1019,28 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message void *headerData = NULL; size_t headerSize = 0; - // Encode the header, with payload size and metadata size - handle->protocol->encodeHeader(handle->protocol->handle, message, - &headerData, - &headerSize); - // Write header in 1st vector buffer item - if (headerSize && headerData) { + // check if header is not part of the payload (=> headerBufferSize = 0)s + if (entry->headerBufferSize) { + // Encode the header, with payload size and metadata size + handle->protocol->encodeHeader(handle->protocol->handle, message, + &headerData, + &headerSize); + } + if (!entry->headerBufferSize) { + // Skip header buffer, when header is part of payload; + msg.msg_iov = &msg_iov[1]; + } else if (headerSize && headerData) { + // Write header in 1st vector buffer item msg.msg_iov[0].iov_base = headerData; msg.msg_iov[0].iov_len = headerSize; msgSize += msg.msg_iov[0].iov_len; msg.msg_iovlen++; + } else { + L_ERROR("[TCP Socket] No header buffer is generated"); + msg.msg_iovlen = 0; } nbytes = 0; - if (entry->fd >= 0 && msgSize && headerData) + if (entry->fd >= 0 && msgSize && msg.msg_iovlen) nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL); // When a specific socket keeps reporting errors can indicate a subscriber // which is not active anymore, the connection will remain until the retry diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c index e065d05..57affc9 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c @@ -157,6 +157,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context receiver->protocol = protocol; receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024); receiver->topic = strndup(topic, 1024 * 1024); + bool isServerEndPoint = false; /* Check if it's a static endpoint */ const char *staticClientEndPointUrls = NULL; @@ -173,6 +174,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType, strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) { staticServerEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_BIND_URL, NULL); + isServerEndPoint = true; } } } @@ -249,7 +251,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context free(urlsCopy); } - if (receiver->socketHandler != NULL) { + if (receiver->socketHandler != NULL && (!isServerEndPoint)) { // Configure Receiver thread receiver->thread.running = true; celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, receiver); @@ -691,11 +693,12 @@ static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); while (hashMapIterator_hasNext(&iter)) { psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); - if (!entry->connected) { - int rc = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url); - if (rc < 0) { - //L_WARN("[PSA_TCP] Error connecting to tcp url %s. (%s)", entry->url, strerror(errno)); - allConnected = false; + if (entry) { + if (!entry->connected) { + int rc = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url); + if (rc < 0) { + allConnected = false; + } } } }
