This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/fix_tcp_endpoint_handling
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 79884e3ced271df9c882360e727bb58cb83839e9
Author: Roy Bulter <[email protected]>
AuthorDate: Wed Apr 29 11:28:54 2020 +0200

    Fix TCP endpoint handling
---
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 36 ++++++++++++++--------
 .../src/pubsub_tcp_topic_receiver.c                | 15 +++++----
 2 files changed, 32 insertions(+), 19 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 5144e01..3ac3c04 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -412,23 +412,23 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t 
*handle, char *url) {
         int fd = pubsub_tcpHandler_open(handle, url_info->interface_url);
         rc = fd;
         // Connect to sender
+        struct sockaddr_in sin;
+        socklen_t len = sizeof(sin);
+        getsockname(fd, (struct sockaddr *) &sin, &len);
+        char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
         struct sockaddr_in *addr = 
pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
         if ((rc >= 0) && addr) {
             rc = connect(fd, (struct sockaddr *) addr, sizeof(struct 
sockaddr));
             if (rc < 0 && errno != EINPROGRESS) {
-                L_ERROR("[TCP Socket] Cannot connect to %s:%d: err: %s\n", 
url_info->hostname, url_info->portnr,
+                L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: 
%s\n", url_info->hostname, url_info->portnr, interface_url,
                         strerror(errno));
                 close(fd);
             } else {
-                struct sockaddr_in sin;
-                socklen_t len = sizeof(sin);
-                rc = getsockname(fd, (struct sockaddr *) &sin, &len);
-                char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
                 entry = pubsub_tcpHandler_createEntry(handle, fd, url, 
interface_url, &sin);
-                free(interface_url);
             }
             free(addr);
         }
+        free(interface_url);
         // Subscribe File Descriptor to epoll
         if ((rc >= 0) && (entry)) {
 #if defined(__APPLE__)
@@ -454,6 +454,7 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, 
char *url) {
             hashMap_put(handle->connection_fd_map, (void *) (intptr_t) 
entry->fd, entry);
             celixThreadRwlock_unlock(&handle->dbLock);
             pubsub_tcpHandler_connectionHandler(handle, fd);
+            L_INFO("[TCP Socket] Connect to %s using; %s err: %s\n", 
entry->url, entry->interface_url);
         }
         pubsub_utils_url_free(url_info);
     }
@@ -1018,19 +1019,28 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t 
*handle, pubsub_protocol_message
 
             void *headerData = NULL;
             size_t headerSize = 0;
-            // Encode the header, with payload size and metadata size
-            handle->protocol->encodeHeader(handle->protocol->handle, message,
-                                           &headerData,
-                                           &headerSize);
-            // Write header in 1st vector buffer item
-            if (headerSize && headerData) {
+            // check if header is not part of the payload (=> headerBufferSize 
= 0)s
+            if (entry->headerBufferSize) {
+              // Encode the header, with payload size and metadata size
+              handle->protocol->encodeHeader(handle->protocol->handle, message,
+                                             &headerData,
+                                             &headerSize);
+            }
+            if (!entry->headerBufferSize) {
+              // Skip header buffer, when header is part of payload;
+              msg.msg_iov = &msg_iov[1];
+            } else if (headerSize && headerData) {
+              // Write header in 1st vector buffer item
                 msg.msg_iov[0].iov_base = headerData;
                 msg.msg_iov[0].iov_len = headerSize;
                 msgSize += msg.msg_iov[0].iov_len;
                 msg.msg_iovlen++;
+            } else {
+              L_ERROR("[TCP Socket] No header buffer is generated");
+              msg.msg_iovlen = 0;
             }
             nbytes = 0;
-            if (entry->fd >= 0 && msgSize && headerData)
+            if (entry->fd >= 0 && msgSize && msg.msg_iovlen)
                 nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
             //  When a specific socket keeps reporting errors can indicate a 
subscriber
             //  which is not active anymore, the connection will remain until 
the retry
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index e065d05..57affc9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -157,6 +157,7 @@ pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context
     receiver->protocol = protocol;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
+    bool isServerEndPoint = false;
 
     /* Check if it's a static endpoint */
     const char *staticClientEndPointUrls = NULL;
@@ -173,6 +174,7 @@ pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context
             if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
                         strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
                 staticServerEndPointUrls = 
celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_BIND_URL, NULL);
+                isServerEndPoint = true;
             }
         }
     }
@@ -249,7 +251,7 @@ pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context
         free(urlsCopy);
     }
 
-    if (receiver->socketHandler != NULL) {
+    if (receiver->socketHandler != NULL && (!isServerEndPoint)) {
         // Configure Receiver thread
         receiver->thread.running = true;
         celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, 
receiver);
@@ -691,11 +693,12 @@ static void 
psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t
         hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-            if (!entry->connected) {
-                int rc = 
pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
-                if (rc < 0) {
-                    //L_WARN("[PSA_TCP] Error connecting to tcp url %s. (%s)", 
entry->url, strerror(errno));
-                    allConnected = false;
+            if (entry) {
+                if (!entry->connected) {
+                  int rc = 
pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
+                  if (rc < 0) {
+                      allConnected = false;
+                  }
                 }
             }
         }

Reply via email to