This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch feature/tcp_admin_msg_segmentation
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 90f472e1c4be2135cb9a1ddb273216a92e456da3
Author: Roy Bulter <[email protected]>
AuthorDate: Sun Aug 23 20:56:35 2020 +0200

    - Add Message segmenation (maxSize / IOVEC)
    - Fix pre-allocated buffer handling
    - Solve message corruption because of wrong locking in write function
    - Add interceptors for tcp_admin
---
 CMakeLists.txt                                     |  11 +-
 .../src/pubsub_psa_tcp_constants.h                 |   5 +-
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 672 +++++++++++----------
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.h      |   7 +-
 .../src/pubsub_tcp_topic_receiver.c                |  77 ++-
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |  11 +-
 .../pubsub/pubsub_utils/include/pubsub_utils_url.h |   8 +-
 bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c |  22 +-
 8 files changed, 425 insertions(+), 388 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 75fd9bd..a298fdd 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -32,13 +32,6 @@ IF (${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION} EQUAL 3.3 
AND ${CMAKE_GENERATO
     message( FATAL_ERROR "Building Celix using CMake 3.3 and makefiles is not 
supported due to a bug in the Makefile Generator (see Bug 15696). Please change 
the used CMake version - both, CMake 3.2 and CMake 3.4 are working fine. Or use 
a different generator (e.g. Ninja)." )
 ENDIF()
 
-if (ENABLE_TESTING)
-    find_package(GTest CONFIG QUIET)
-    if (NOT GTest_FOUND)
-        include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/AddGTest.cmake)
-    endif()
-endif ()
-
 set(ENABLE_MORE_WARNINGS OFF)
 
 # Set C specific flags
@@ -123,6 +116,10 @@ set(DEFAULT_VERSION 1.0.0)
 # Options
 option(ENABLE_TESTING "Enables unit/bundle testing" FALSE)
 if (ENABLE_TESTING)
+    find_package(GTest CONFIG QUIET)
+    if (NOT GTest_FOUND)
+        include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/AddGTest.cmake)
+    endif()
     enable_testing()
 endif()
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 4284042..9f03d13 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -23,7 +23,7 @@
 #define PSA_TCP_BASE_PORT                       "PSA_TCP_BASE_PORT"
 #define PSA_TCP_MAX_PORT                        "PSA_TCP_MAX_PORT"
 
-#define PSA_TCP_MAX_RECV_SESSIONS               "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE                "PSA_TCP_MAX_MESSAGE_SIZE"
 #define PSA_TCP_RECV_BUFFER_SIZE                "PSA_TCP_RECV_BUFFER_SIZE"
 #define PSA_TCP_TIMEOUT                         "PSA_TCP_TIMEOUT"
 #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT   
"PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
@@ -31,8 +31,7 @@
 #define PSA_TCP_DEFAULT_BASE_PORT               5501
 #define PSA_TCP_DEFAULT_MAX_PORT                6000
 
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       1
-
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE        UINT32_MAX
 #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        65 * 1024
 #define PSA_TCP_DEFAULT_TIMEOUT                 2000 // 2 seconds
 #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 0cbd28f..be5a694 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -31,6 +31,7 @@
 #include <errno.h>
 #include <array_list.h>
 #include <pthread.h>
+#include <sys/ioctl.h>
 #if defined(__APPLE__)
 #include <sys/types.h>
 #include <sys/event.h>
@@ -78,17 +79,23 @@ typedef struct psa_tcp_connection_entry {
     bool connected;
     bool headerError;
     pubsub_protocol_message_t header;
+    unsigned int maxMsgSize;
     unsigned int syncSize;
     unsigned int headerSize;
-    unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no 
headerBuffer -> included in payload
-    void *headerBuffer;
-    unsigned int footerSize;
-    void *footerBuffer;
+    unsigned int readHeaderBufferSize; // Size of headerBuffer, size = 0, no 
headerBuffer -> included in payload
+    void *readHeaderBuffer;
+    unsigned int writeHeaderBufferSize; // Size of headerBuffer, size = 0, no 
headerBuffer -> included in payload
+    void *writeHeaderBuffer;
+    unsigned int readFooterSize;
+    void *readFooterBuffer;
+    unsigned int writeFooterSize;
+    void *writeFooterBuffer;
     unsigned int bufferSize;
     void *buffer;
-    unsigned int bufferReadSize;
-    unsigned int metaBufferSize;
-    void *metaBuffer;
+    size_t readMetaBufferSize;
+    void *readMetaBuffer;
+    size_t writeMetaBufferSize;
+    void *writeMetaBuffer;
     unsigned int retryCount;
 } psa_tcp_connection_entry_t;
 
@@ -114,38 +121,25 @@ struct pubsub_tcpHandler {
     celix_log_helper_t *logHelper;
     pubsub_protocol_service_t *protocol;
     unsigned int bufferSize;
-    unsigned int maxNofBuffer;
+    unsigned int maxMsgSize;
     unsigned int maxSendRetryCount;
     unsigned int maxRcvRetryCount;
     double sendTimeout;
     double rcvTimeout;
     celix_thread_t thread;
     bool running;
+    bool isEndPoint;
 };
 
-static inline int
-pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, 
psa_tcp_connection_entry_t *entry, bool lock);
-
+static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t 
*handle, psa_tcp_connection_entry_t *entry, bool lock);
 static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t 
*handle, psa_tcp_connection_entry_t *entry);
-
 static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t 
*handle, int fd);
-
-static inline psa_tcp_connection_entry_t *
-pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, 
char *external_url,
-                              struct sockaddr_in *addr);
-
+static inline psa_tcp_connection_entry_t* 
pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, 
char *external_url, struct sockaddr_in *addr);
 static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t 
*entry);
-
 static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t 
*handle, int fd, unsigned int index);
-
-static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, 
psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, 
unsigned int size, int flag );
-
 static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t 
*handle, psa_tcp_connection_entry_t *entry);
-
 static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t 
*handle, int fd);
-
 static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
-
 static void *pubsub_tcpHandler_thread(void *data);
 
 //
@@ -167,7 +161,6 @@ pubsub_tcpHandler_t 
*pubsub_tcpHandler_create(pubsub_protocol_service_t *protoco
         handle->logHelper = logHelper;
         handle->protocol = protocol;
         handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE;
-        handle->maxNofBuffer = 1; // Reserved for future Use;
         celixThreadRwlock_create(&handle->dbLock, 0);
         handle->running = true;
         celixThread_create(&handle->thread, NULL, pubsub_tcpHandler_thread, 
handle);
@@ -261,7 +254,7 @@ int pubsub_tcpHandler_open(pubsub_tcpHandler_t *handle, 
char *url) {
                 L_ERROR("[TCP Socket] Error setsockopt (SO_RCVTIMEO) to set 
send timeout: %s", strerror(errno));
             }
         }
-        struct sockaddr_in *addr = 
pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *addr = 
pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         if (addr) {
             rc = bind(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
             if (rc != 0) {
@@ -326,17 +319,25 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t 
*handle, int fd, char *url, ch
         handle->protocol->getHeaderSize(handle->protocol->handle, &size);
         entry->headerSize = size;
         handle->protocol->getHeaderBufferSize(handle->protocol->handle, &size);
-        entry->headerBufferSize = size;
+        entry->readHeaderBufferSize = size;
+        entry->writeHeaderBufferSize = size;
         handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
         entry->syncSize = size;
         handle->protocol->getFooterSize(handle->protocol->handle, &size);
-        entry->footerSize = size;
+        entry->readFooterSize = size;
+        entry->writeFooterSize = size;
         entry->bufferSize = handle->bufferSize;
         entry->connected = false;
-        if (entry->headerBufferSize) {
-            entry->headerBuffer = calloc(sizeof(char), entry->headerSize);
+        unsigned minimalMsgSize = entry->writeHeaderBufferSize + 
entry->writeFooterSize;
+        if ((minimalMsgSize > handle->maxMsgSize) && (handle->maxMsgSize)) {
+            L_ERROR("[TCP Socket] maxMsgSize (%d) < headerSize + FooterSize 
(%d): %s\n", handle->maxMsgSize, minimalMsgSize);
+        } else {
+            entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : 
UINT32_MAX;
         }
-        if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), 
entry->footerSize);
+        if (entry->readHeaderBufferSize) entry->readHeaderBuffer = 
calloc(sizeof(char), entry->readHeaderBufferSize);
+        if (entry->writeHeaderBufferSize) entry->writeHeaderBuffer = 
calloc(sizeof(char), entry->writeHeaderBufferSize);
+        if (entry->readFooterSize) entry->readFooterBuffer = 
calloc(sizeof(char), entry->readFooterSize);
+        if (entry->writeFooterSize) entry->writeFooterBuffer = 
calloc(sizeof(char), entry->writeFooterSize);
         if (entry->bufferSize) entry->buffer = calloc(sizeof(char), 
entry->bufferSize);
     }
     return entry;
@@ -348,40 +349,16 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t 
*handle, int fd, char *url, ch
 static inline void
 pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
     if (entry) {
-        if (entry->url) {
-            free(entry->url);
-            entry->url = NULL;
-        }
-        if (entry->interface_url) {
-            free(entry->interface_url);
-            entry->interface_url = NULL;
-        }
-        if (entry->fd >= 0) {
-            close(entry->fd);
-            entry->fd = -1;
-        }
-        if (entry->buffer) {
-            free(entry->buffer);
-            entry->buffer = NULL;
-            entry->bufferSize = 0;
-        }
-        if (entry->headerBuffer) {
-            free(entry->headerBuffer);
-            entry->headerBuffer = NULL;
-            entry->headerBufferSize = 0;
-        }
-
-        if (entry->footerBuffer) {
-            free(entry->footerBuffer);
-            entry->footerBuffer = NULL;
-        }
-
-        if (entry->metaBuffer) {
-            free(entry->metaBuffer);
-            entry->metaBuffer = NULL;
-            entry->metaBufferSize = 0;
-        }
-        entry->connected = false;
+        if (entry->url) free(entry->url);
+        if (entry->interface_url) free(entry->interface_url);
+        if (entry->fd >= 0) close(entry->fd);
+        if (entry->buffer) free(entry->buffer);
+        if (entry->readHeaderBuffer) free(entry->readHeaderBuffer);
+        if (entry->writeHeaderBuffer) free(entry->writeHeaderBuffer);
+        if (entry->readFooterBuffer) free(entry->readFooterBuffer);
+        if (entry->writeFooterBuffer) free(entry->writeFooterBuffer);
+        if (entry->readMetaBuffer) free(entry->readMetaBuffer);
+        if (entry->writeMetaBuffer) free(entry->writeMetaBuffer);
         free(entry);
     }
 }
@@ -403,8 +380,7 @@ pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t 
*handle, int fd, unsign
 //
 int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
     int rc = 0;
-    psa_tcp_connection_entry_t *entry =
-        hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
+    psa_tcp_connection_entry_t *entry = 
hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
     if (entry == NULL) {
         pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
         int fd = pubsub_tcpHandler_open(handle, url_info->interface_url);
@@ -414,11 +390,11 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t 
*handle, char *url) {
         socklen_t len = sizeof(sin);
         getsockname(fd, (struct sockaddr *) &sin, &len);
         char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
-        struct sockaddr_in *addr = 
pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *addr = 
pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         if ((rc >= 0) && addr) {
             rc = connect(fd, (struct sockaddr *) addr, sizeof(struct 
sockaddr));
             if (rc < 0 && errno != EINPROGRESS) {
-                L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: 
%s\n", url_info->hostname, url_info->portnr, interface_url,
+                L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s 
err(%d): %s\n", url_info->hostname, url_info->port_nr, interface_url, errno,
                         strerror(errno));
                 close(fd);
             } else {
@@ -445,6 +421,12 @@ int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, 
char *url) {
                 L_ERROR("[TCP Socket] Cannot create poll event %s\n", 
strerror(errno));
                 entry = NULL;
             }
+            rc = pubsub_tcpHandler_makeNonBlocking(handle, entry->fd);
+            if (rc < 0) {
+                pubsub_tcpHandler_freeEntry(entry);
+                L_ERROR("[TCP Socket] Cannot make not blocking %s\n", 
strerror(errno));
+                entry = NULL;
+            }
         }
         if ((rc >= 0) && (entry)) {
             celixThreadRwlock_writeLock(&handle->dbLock);
@@ -552,7 +534,7 @@ static inline int 
pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle,
     else {
         rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
         if (rc < 0) {
-            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", 
strerror(errno));
+            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", 
strerror(errno));
         }
     }
     return rc;
@@ -628,16 +610,24 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, 
char *url) {
 }
 
 //
-// Setup buffer sizes
+// Setup receive buffer size
 //
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
-                                               unsigned int maxNofBuffers
-                                               __attribute__((__unused__)),
-                                               unsigned int bufferSize) {
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, 
unsigned int size) {
     if (handle != NULL) {
         celixThreadRwlock_writeLock(&handle->dbLock);
-        handle->bufferSize = bufferSize;
-        handle->maxNofBuffer = maxNofBuffers;
+        handle->bufferSize = size;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+    return 0;
+}
+
+//
+// Set Maximum message size
+//
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int 
size) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->maxMsgSize = size;
         celixThreadRwlock_unlock(&handle->dbLock);
     }
     return 0;
@@ -708,7 +698,7 @@ void 
pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio,
             if (prio > 0 && prio < 100) {
                 struct sched_param sch;
                 bzero(&sch, sizeof(struct sched_param));
-                sch.sched_priority = prio;
+                sch.sched_priority = (int)prio;
                 pthread_setschedparam(handle->thread.thread, policy, &sch);
             } else {
                 L_INFO("Skipping configuration of thread prio to %i and thread 
"
@@ -752,35 +742,23 @@ void 
pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double tim
     }
 }
 
-static inline
-int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, 
psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, 
unsigned int size, int flag ) {
-    int expectedReadSize = size;
-    int nbytes = size;
-    int msgSize = 0;
-    char* buffer = (char*)_buffer;
-    while (nbytes > 0 && expectedReadSize > 0) {
-        // Read the message header
-        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | 
MSG_NOSIGNAL);
-        // Update buffer administration
-        offset += nbytes;
-        expectedReadSize -= nbytes;
-        msgSize += nbytes;
+void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle,bool 
isEndPoint) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->isEndPoint = isEndPoint;
+        celixThreadRwlock_unlock(&handle->dbLock);
     }
-    if (nbytes <=0)  msgSize = nbytes;
-    return msgSize;
 }
 
-
 static inline
 void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, 
psa_tcp_connection_entry_t *entry) {
 
   if (entry->header.header.payloadSize > 0) {
-    handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, 
entry->header.header.payloadSize, &entry->header);
+      handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, 
entry->header.header.payloadSize, &entry->header);
   }
   if (entry->header.header.metadataSize > 0) {
-    handle->protocol->decodeMetadata(handle->protocol->handle, 
entry->metaBuffer,
-                                     entry->header.header.metadataSize, 
&entry->header);
-    entry->metaBufferSize = entry->header.header.metadataSize;
+      handle->protocol->decodeMetadata(handle->protocol->handle, 
entry->readMetaBuffer,
+                                       entry->header.header.metadataSize, 
&entry->header);
   }
   if (handle->processMessageCallback && entry->header.payload.payload != NULL 
&& entry->header.payload.length) {
     struct timespec receiveTime;
@@ -791,7 +769,6 @@ void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t 
*handle, psa_tcp_connec
   }
 }
 
-
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) 
and stores it in the internal structure
 // If the message is completely reassembled true is returned and the index and 
size have valid values
@@ -812,113 +789,147 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, 
int fd) {
         return -1;
     }
 
+    if (entry->readHeaderBufferSize && entry->readHeaderBuffer) {
+        entry->readHeaderBuffer = malloc(entry->readHeaderBufferSize);
+    }
+
     // Message buffer is to small, reallocate to make it bigger
-    if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) 
{
-        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
-        if (entry->buffer) free(entry->buffer);
-            entry->buffer = malloc((size_t) handle->bufferSize);
+    if ((!entry->readHeaderBufferSize) && (entry->headerSize > 
entry->bufferSize)) {
+        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize);
+        char *buffer = realloc(entry->buffer, (size_t) handle->bufferSize);
+        if (buffer) {
+            entry->buffer = buffer;
             entry->bufferSize = handle->bufferSize;
         }
+    }
+    struct msghdr msg;
+    struct iovec msg_iov[IOV_MAX];
+    memset(&msg, 0x00, sizeof(struct msghdr));
+    msg.msg_iov = msg_iov;
+
     // Read the message
-    bool validMsg = false;
-    char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : 
entry->buffer;
-    int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, 
header_buffer, 0, entry->headerSize, MSG_PEEK);
-    if (nbytes > 0) {
-        // Check header message buffer
-        if (handle->protocol->decodeHeader(handle->protocol->handle, 
header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
+    msg.msg_iovlen = 0;
+    msg.msg_iov[msg.msg_iovlen].iov_base = (entry->readHeaderBufferSize) ? 
entry->readHeaderBuffer : entry->buffer;
+    msg.msg_iov[msg.msg_iovlen].iov_len  = entry->headerSize;
+    msg.msg_iovlen++;
+    long int msgSize = 0;
+    long int nbytes = recvmsg(fd, &msg, MSG_PEEK | MSG_NOSIGNAL);
+    if (nbytes >= entry->headerSize) {
+        msg.msg_iovlen--;
+        if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                           
msg.msg_iov[msg.msg_iovlen].iov_base,
+                                           msg.msg_iov[msg.msg_iovlen].iov_len,
+                                           &entry->header) != CELIX_SUCCESS) {
             // Did not receive correct header
             // skip sync word and try to read next header
-            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, 
header_buffer, 0, entry->syncSize, 0);
             if (!entry->headerError) {
                 L_WARN("[TCP Socket] Failed to decode message header (fd: %d) 
(url: %s)", entry->fd, entry->url);
             }
             entry->headerError = true;
-            entry->bufferReadSize = 0;
+            msg.msg_iov[msg.msg_iovlen].iov_len = entry->syncSize;
+            msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+            msg.msg_iovlen++;
         } else {
-            // Read header message from queue
-            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, 
header_buffer, 0, entry->headerSize, 0);
-            if ((nbytes > 0) && (nbytes == entry->headerSize)) {
-                entry->headerError = false;
-                // For headerless message, add header to bufferReadSize;
-                if (!entry->headerBufferSize)
-                    entry->bufferReadSize += nbytes;
-                // Alloc message buffers
-                if (entry->header.header.payloadSize > entry->bufferSize) {
-                    handle->bufferSize = MAX(handle->bufferSize, 
entry->header.header.payloadSize);
-                    if (entry->buffer)
-                        free(entry->buffer);
-                    entry->buffer = malloc((size_t) handle->bufferSize);
+            // Alloc message buffers
+            if (entry->header.header.payloadSize > entry->bufferSize) {
+                handle->bufferSize = MAX(handle->bufferSize, 
entry->header.header.payloadSize);
+                char *buffer = realloc(entry->buffer, (size_t) 
handle->bufferSize);
+                if (buffer) {
+                    entry->buffer = buffer;
                     entry->bufferSize = handle->bufferSize;
                 }
-                if (entry->header.header.metadataSize > entry->metaBufferSize) 
{
-                    if (entry->metaBuffer) {
-                      free(entry->metaBuffer);
-                    }
-                    entry->metaBuffer = malloc((size_t) 
entry->header.header.metadataSize);
-                    entry->metaBufferSize = entry->header.header.metadataSize;
-                    L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read 
meta buffer: (%d, %d) \n", entry->fd,
-                      entry->url, entry->metaBufferSize, 
entry->header.header.metadataSize);
-                }
+            }
 
-                if (entry->header.header.payloadSize) {
-                    unsigned int offset = entry->header.header.payloadOffset;
-                    unsigned int size = entry->header.header.payloadPartSize;
-                    // For header less messages adjust offset and msg size;
-                    if (!entry->headerBufferSize) {
-                        offset = entry->headerSize;
-                        size -= offset;
-                    }
-                    // Read payload data from queue
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, 
entry->buffer, offset, size, 0);
-                    if (nbytes > 0) {
-                        if (nbytes == size) {
-                            entry->bufferReadSize += nbytes;
-                        } else {
-                            entry->bufferReadSize = 0;
-                            L_ERROR("[TCP Socket] Failed to receive complete 
payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
-                        }
-                    }
-                }
-                if (nbytes > 0 && entry->header.header.metadataSize) {
-                    // Read meta data from queue
-                    unsigned int size = entry->header.header.metadataSize;
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, 
entry->metaBuffer,0, size,0);
-                    if ((nbytes > 0) && (nbytes != size)) {
-                        L_ERROR("[TCP Socket] Failed to receive complete 
payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
-                    }
-                }
-                // Check for end of message using, footer of message. Because 
of streaming protocol
-                if (nbytes > 0) {
-                    if (entry->footerSize > 0) {
-                        nbytes = pubsub_tcpHandler_readSocket(handle, entry, 
fd, entry->footerBuffer,0, entry->footerSize,0);
-                        if 
(handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, 
entry->footerSize, &entry->header) == CELIX_SUCCESS) {
-                            // valid footer, this means that the message is 
valid
-                            validMsg = true;
-                        } else {
-                            // Did not receive correct header
-                            L_ERROR("[TCP Socket] Failed to decode message 
footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: 
%s)", entry->header.header.seqNr, entry->fd, entry->url);
-                            entry->bufferReadSize = 0;
-                        }
-                    } else {
-                        // No Footer, then complete message is received
-                        validMsg = true;
+#ifdef USE_TCP_PUB_SUB_BUFFER_MEMSET
+            // For Debugging
+            if ((entry->header.header.payloadOffset == 0 ) && (msgSize == 
entry->headerSize)) {
+                memset(entry->buffer, 0x00, entry->bufferSize);
+            }
+#endif
+            entry->headerError = false;
+            if (entry->readHeaderBufferSize) {
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
+            }
+            if (entry->header.header.payloadPartSize) {
+                char* buffer = entry->buffer;
+                msg.msg_iov[msg.msg_iovlen].iov_base = 
&buffer[entry->header.header.payloadOffset];
+                msg.msg_iov[msg.msg_iovlen].iov_len = 
entry->header.header.payloadPartSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
+            }
+            if (entry->header.header.metadataSize) {
+                if (entry->header.header.metadataSize > 
entry->readMetaBufferSize) {
+                    char *buffer = realloc(entry->readMetaBuffer, (size_t) 
entry->header.header.metadataSize);
+                    if (buffer) {
+                        entry->readMetaBuffer = buffer;
+                        entry->readMetaBufferSize = 
entry->header.header.metadataSize;
+                        L_WARN("[TCP Socket] socket: %d, url: %s,  realloc 
read meta buffer: (%d, %d) \n", entry->fd,
+                               entry->url, entry->readMetaBufferSize, 
entry->header.header.metadataSize);
                     }
                 }
+                msg.msg_iov[msg.msg_iovlen].iov_base = entry->readMetaBuffer;
+                msg.msg_iov[msg.msg_iovlen].iov_len  = 
entry->header.header.metadataSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
             }
+            if (entry->readFooterSize) {
+                msg.msg_iov[msg.msg_iovlen].iov_base = entry->readFooterBuffer;
+                msg.msg_iov[msg.msg_iovlen].iov_len = entry->readFooterSize;
+                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                msg.msg_iovlen++;
+            }
+        }
+        int nofBytesInReadBuffer = 0;
+        if (ioctl(fd, FIONREAD, &nofBytesInReadBuffer)) {
+            L_ERROR("[TCP Socket] socket: %d, url: %s, cannot  read nof bytes 
in socket read buffer \n", entry->fd, entry->url);
+        }
+        if (nofBytesInReadBuffer >= msgSize) {
+            nbytes = recvmsg(fd, &msg, MSG_NOSIGNAL);
+        } else {
+            // Not enough to read return the amount in the socket read buffer
+            nbytes = nofBytesInReadBuffer;
         }
     }
-    if (nbytes > 0) {
-        entry->retryCount = 0;
-        // Check if complete message is received
-        if ((entry->bufferReadSize >= entry->header.header.payloadSize) && 
validMsg) {
-            entry->bufferReadSize = 0;
+    if ((nbytes >= msgSize)&&(!entry->headerError))  {
+        bool valid = true;
+        if (entry->readFooterSize) {
+            if (handle->protocol->decodeFooter(handle->protocol->handle,
+                                               entry->readFooterBuffer,
+                                               entry->readFooterSize,
+                                               &entry->header) != 
CELIX_SUCCESS) {
+
+                // Did not receive correct footer
+                L_ERROR(
+                    "[TCP Socket] Failed to decode message footer seq %d 
(received corrupt message, transmit buffer full?) (fd: %d) (url: %s)",
+                    entry->header.header.seqNr,
+                    entry->fd,
+                    entry->url);
+                valid = false;
+            }
+        }
+        if (!entry->header.header.isLastSegment) {
+            // Not last Segment of message
+            valid = false;
+        }
+
+        if (valid) {
+            // Complete message is received
             pubsub_tcpHandler_decodePayload(handle, entry);
         }
-    } else {
-        if (entry->retryCount < handle->maxRcvRetryCount) {
+    }
+
+    if (nbytes > 0) {
+        entry->retryCount = 0;
+    } else if (nbytes < 0) {
+        if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+            // Non blocking interrupt
+            entry->retryCount = 0;
+        } else if (entry->retryCount < handle->maxRcvRetryCount) {
             entry->retryCount++;
-            L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: 
%s. Retry count %u of %u,", entry->fd,
-                   strerror(errno), entry->retryCount, 
handle->maxRcvRetryCount);
+            L_WARN(
+                "[TCP Socket] Failed to receive message (fd: %d), try again. 
error(%d): %s, Retry count %u of %u.",
+                entry->fd, errno, strerror(errno), entry->retryCount, 
handle->maxSendRetryCount);
         } else {
             L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u 
retries! Closing connection... Error: %s",
                     entry->fd, handle->maxRcvRetryCount, strerror(errno));
@@ -926,10 +937,9 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, 
int fd) {
         }
     }
     celixThreadRwlock_unlock(&handle->dbLock);
-    return nbytes;
+    return (int)nbytes;
 }
 
-
 int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void 
*payload,
                                         
pubsub_tcpHandler_processMessage_callback_t processMessageCallback) {
     int result = 0;
@@ -964,55 +974,19 @@ int 
pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle, v
     return result;
 }
 
-static inline
-int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, 
psa_tcp_connection_entry_t *entry, struct msghdr* msg, unsigned int size, int 
flag ) {
-  int nbytes = 0;
-  int msgSize = 0;
-  if (entry->fd >= 0 && size && msg->msg_iovlen) {
-    int expectedReadSize = size;
-    unsigned int offset = 0;
-    nbytes = size;
-    while (nbytes > 0 && expectedReadSize > 0) {
-      // Read the message header
-      nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL);
-      // Update admin
-      expectedReadSize -= nbytes;
-      msgSize += nbytes;
-      // Not all written
-      if (expectedReadSize && nbytes > 0) {
-        unsigned int readSize = 0;
-        unsigned int readIndex = 0;
-        unsigned int i = 0;
-        for (i = 0; i < msg->msg_iovlen; i++) {
-          if (nbytes < msg->msg_iov[i].iov_len) {
-            readIndex = i;
-            break;
-          }
-          readSize+= msg->msg_iov[i].iov_len;
-        }
-        msg->msg_iov = &msg->msg_iov[readIndex];
-        msg->msg_iovlen -= readIndex;
-        char* buffer = (char*)msg->msg_iov->iov_base;
-        offset = nbytes - readSize;
-        msg->msg_iov->iov_base = &buffer[offset];
-        msg->msg_iov->iov_len  = msg->msg_iov->iov_len - offset;
-      }
-    }
-  }
-  if (nbytes <=0)  msgSize = nbytes;
-  return msgSize;
-}
+
 //
 // Write large data to TCP. .
 //
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, 
pubsub_protocol_message_t *message, struct iovec *msgIoVec,
                             size_t msg_iov_len, int flags) {
-    celixThreadRwlock_readLock(&handle->dbLock);
     int result = 0;
     int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];
     int nofConnToClose = 0;
     if (handle) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
         hash_map_iterator_t iter = 
hashMapIterator_construct(handle->connection_fd_map);
+        size_t max_msg_iov_len = IOV_MAX - 2; // header , footer, padding
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
             if (!entry->connected) continue;
@@ -1025,6 +999,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, 
pubsub_protocol_message
                     payloadSize += msgIoVec[i].iov_len;
                 }
             }
+            // When maxMsgSize is zero then payloadSize is disabled
+            if (!entry->maxMsgSize) payloadSize = 0;
+
             message->header.convertEndianess = 0;
             message->header.payloadSize = payloadSize;
             message->header.payloadPartSize = payloadSize;
@@ -1034,130 +1011,172 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t 
*handle, pubsub_protocol_message
             void *metadataData = NULL;
             size_t metadataSize = 0;
             if (message->metadata.metadata) {
-                metadataData = entry->metaBuffer;
+                metadataSize = entry->writeMetaBufferSize;
+                metadataData = entry->writeMetaBuffer;
                 handle->protocol->encodeMetadata(handle->protocol->handle, 
message,
                                                  &metadataData,
                                                  &metadataSize);
-                entry->metaBufferSize = metadataSize;
             }
+            // When maxMsgSize is smaller then meta data is disabled
+            if (!entry->maxMsgSize || (metadataSize > entry->maxMsgSize)) 
metadataSize = 0;
+
             message->header.metadataSize = metadataSize;
+            size_t totalMessageSize = payloadSize + metadataSize;
+
+            bool isMessageSegmentationSupported = false;
+            
handle->protocol->isMessageSegmentationSupported(handle->protocol->handle, 
&isMessageSegmentationSupported);
+            if (((!isMessageSegmentationSupported) && (msg_iov_len > 
max_msg_iov_len)) ||
+                ((!isMessageSegmentationSupported) && (totalMessageSize > 
entry->maxMsgSize))) {
+                L_WARN("[TCP Socket] Failed to send message (fd: %d), Message 
segmentation is not supported\n",
+                       entry->fd);
+                continue;
+            }
 
             void *footerData = NULL;
             size_t footerDataSize = 0;
-            if (entry->footerSize) {
-                footerData = entry->footerBuffer;
+            if (entry->writeFooterSize) {
+                footerDataSize = entry->writeFooterSize;
+                footerData = entry->writeFooterBuffer;
                 handle->protocol->encodeFooter(handle->protocol->handle, 
message,
                                                  &footerData,
                                                  &footerDataSize);
-                entry->footerSize = footerDataSize;
+                entry->writeFooterSize = MAX(footerDataSize, 
entry->writeFooterSize);
+                if (footerData && entry->writeFooterBuffer != footerData) 
entry->writeFooterBuffer = footerData;
             }
 
             size_t msgSize = 0;
-            struct msghdr msg;
-            struct iovec msg_iov[IOV_MAX];
-            memset(&msg, 0x00, sizeof(struct msghdr));
-            msg.msg_name = &entry->addr;
-            msg.msg_namelen = entry->len;
-            msg.msg_flags = flags;
-            msg.msg_iov = msg_iov;
-
-            // Write generic seralized payload in vector buffer
-            if (payloadSize && payloadData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = payloadData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            } else {
-                // copy serialized vector into vector buffer
-                for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) {
-                    msg.msg_iovlen++;
-                    msg.msg_iov[msg.msg_iovlen].iov_base = 
msgIoVec[i].iov_base;
-                    msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
-                    msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+            size_t msgPayloadSize = 0;
+            size_t msgMetaDataSize = 0;
+            size_t msgIovLen = 0;
+            long int nbytes = UINT32_MAX;
+            while (msgSize < totalMessageSize && nbytes > 0) {
+                struct msghdr msg;
+                struct iovec msg_iov[IOV_MAX];
+                memset(&msg, 0x00, sizeof(struct msghdr));
+                msg.msg_name = &entry->addr;
+                msg.msg_namelen = entry->len;
+                msg.msg_flags = flags;
+                msg.msg_iov = msg_iov;
+                size_t msgPartSize = 0;
+                message->header.payloadPartSize = 0;
+                message->header.payloadOffset = 0;
+                message->header.metadataSize = 0;
+                message->header.isLastSegment = 0;
+
+                // Write generic seralized payload in vector buffer
+                if (msgPayloadSize < payloadSize) {
+                    if (payloadSize && payloadData && entry->maxMsgSize) {
+                        char *buffer = payloadData;
+                        msg.msg_iovlen++;
+                        msg.msg_iov[msg.msg_iovlen].iov_base = 
&buffer[msgPayloadSize];
+                        msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize 
- msgPayloadSize), entry->maxMsgSize);
+                        msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        message->header.payloadPartSize = msgPartSize;
+                        message->header.payloadOffset   = msgPayloadSize;
+                        msgPayloadSize += message->header.payloadPartSize;
+                        msgSize = msgPayloadSize;
+                    } else {
+                        // copy serialized vector into vector buffer
+                        for (size_t i = 0; i < MIN(msg_iov_len, 
max_msg_iov_len); i++) {
+                            msg.msg_iovlen++;
+                            msg.msg_iov[msg.msg_iovlen].iov_base = 
msgIoVec[msgIovLen + i].iov_base;
+                            msg.msg_iov[msg.msg_iovlen].iov_len = 
msgIoVec[msgIovLen + i].iov_len;
+                            if ((msgPartSize + 
msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize) {
+                                msg.msg_iovlen--;
+                                break;
+                            }
+                            msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        }
+                        msgIovLen += msg.msg_iovlen;
+                        message->header.payloadOffset = msgPayloadSize;
+                        message->header.payloadPartSize = msgPartSize;
+                        msgPayloadSize  += message->header.payloadPartSize;
+                        msgSize = msgPayloadSize;
+                    }
                 }
-            }
 
-            // Write optional metadata in vector buffer
-            if (metadataSize && metadataData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
+                // Write optional metadata in vector buffer
+                if ((msgPayloadSize >= payloadSize) &&
+                    (msgMetaDataSize < metadataSize) &&
+                    (msgPartSize < entry->maxMsgSize) &&
+                    (msg.msg_iovlen+1 < max_msg_iov_len-1) &&
+                    (metadataSize && metadataData)) {
+                    msg.msg_iovlen++;
+                    msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
+                    msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
+                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                    message->header.metadataSize = metadataSize;
+                    msgSize += metadataSize;
+                }
+                if (msgSize >= totalMessageSize) {
+                    message->header.isLastSegment = 0x1;
+                }
 
-            // Write optional footerData in vector buffer
-            if (footerData && footerDataSize) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
+                // Write optional footerData in vector buffer
+                if (footerData && footerDataSize) {
+                    msg.msg_iovlen++;
+                    msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+                    msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
+                    msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                }
 
-            void *headerData = NULL;
-            size_t headerSize = 0;
-            // check if header is not part of the payload (=> headerBufferSize 
= 0)s
-            if (entry->headerBufferSize) {
-              headerData = entry->headerBuffer;
-              // Encode the header, with payload size and metadata size
-              handle->protocol->encodeHeader(handle->protocol->handle, message,
-                                             &headerData,
-                                             &headerSize);
-              entry->headerBufferSize = headerSize;
-            }
-            if (!entry->headerBufferSize) {
-              // Skip header buffer, when header is part of payload;
-              msg.msg_iov = &msg_iov[1];
-            } else if (headerSize && headerData) {
-              // Write header in 1st vector buffer item
-                msg.msg_iov[0].iov_base = headerData;
-                msg.msg_iov[0].iov_len = headerSize;
-                msgSize += msg.msg_iov[0].iov_len;
-                msg.msg_iovlen++;
-            } else {
-              L_ERROR("[TCP Socket] No header buffer is generated");
-              msg.msg_iovlen = 0;
-            }
-            long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, 
&msg, msgSize, flags);
-            //  When a specific socket keeps reporting errors can indicate a 
subscriber
-            //  which is not active anymore, the connection will remain until 
the retry
-            //  counter exceeds the maximum retry count.
-            //  Btw, also, SIGSTOP issued by a debugging tool can result in 
EINTR error.
-            if (nbytes == -1) {
-                if (entry->retryCount < handle->maxSendRetryCount) {
-                    entry->retryCount++;
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d), error: 
%s. try again. Retry count %u of %u, ",
-                        entry->fd, strerror(errno), entry->retryCount, 
handle->maxSendRetryCount);
+                void *headerData = NULL;
+                size_t headerSize = entry->writeHeaderBufferSize;
+                // check if header is not part of the payload (=> 
headerBufferSize = 0)s
+                if (entry->writeHeaderBufferSize) {
+                    headerSize = entry->writeHeaderBufferSize;
+                    headerData = entry->writeHeaderBuffer;
+                    // Encode the header, with payload size and metadata size
+                    handle->protocol->encodeHeader(handle->protocol->handle, 
message,
+                                                   &headerData,
+                                                   &headerSize);
+                    entry->writeHeaderBufferSize = MAX(headerSize, 
entry->writeHeaderBufferSize);
+                    if (headerData && entry->writeHeaderBuffer != headerData) 
entry->writeHeaderBuffer = headerData;
+                }
+                if (!entry->writeHeaderBufferSize) {
+                    // Skip header buffer, when header is part of payload;
+                    msg.msg_iov = &msg_iov[1];
+                } else if (headerSize && headerData) {
+                    // Write header in 1st vector buffer item
+                    msg.msg_iov[0].iov_base = headerData;
+                    msg.msg_iov[0].iov_len = headerSize;
+                    msgPartSize += msg.msg_iov[0].iov_len;
+                    msg.msg_iovlen++;
                 } else {
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d) after %u 
retries! Closing connection... Error: %s",
-                        entry->fd, handle->maxSendRetryCount, strerror(errno));
-                    connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    L_ERROR("[TCP Socket] No header buffer is generated");
+                    msg.msg_iovlen = 0;
                 }
-                result = -1; //At least one connection failed sending
-            } else if (msgSize) {
-                entry->retryCount = 0;
-                if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != 
%d (%s)\n", message->header.seqNr, msgSize, nbytes,  strerror(errno));
+                nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
+                //  When a specific socket keeps reporting errors can indicate 
a subscriber
+                //  which is not active anymore, the connection will remain 
until the retry
+                //  counter exceeds the maximum retry count.
+                //  Btw, also, SIGSTOP issued by a debugging tool can result 
in EINTR error.
+                if (nbytes == -1) {
+                    if (entry->retryCount < handle->maxSendRetryCount) {
+                        entry->retryCount++;
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d), try 
again. Retry count %u of %u, error(%d): %s.",
+                            entry->fd, entry->retryCount, 
handle->maxSendRetryCount, errno, strerror(errno));
+                    } else {
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d) 
after %u retries! Closing connection... Error: %s", entry->fd, 
handle->maxSendRetryCount, strerror(errno));
+                        connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    }
+                    result = -1; //At least one connection failed sending
+                } else if (msgPartSize) {
+                    entry->retryCount = 0;
+                    if (nbytes != msgPartSize) {
+                        L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d 
!= %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno));
+                    }
+                }
+                // Note: serialized Payload is deleted by serializer
+                if (payloadData && (payloadData != message->payload.payload)) {
+                    free(payloadData);
                 }
-            }
-            // Release data
-            if (headerData && headerData != entry->headerBuffer) {
-                free(headerData);
-            }
-            // Note: serialized Payload is deleted by serializer
-            if (payloadData && (payloadData != message->payload.payload)) {
-                free(payloadData);
-            }
-            if (metadataData && metadataData != entry->metaBuffer) {
-                free(metadataData);
-            }
-            if (footerData && footerData != entry->footerBuffer) {
-                free(footerData);
             }
         }
+        celixThreadRwlock_unlock(&handle->dbLock);
     }
-    celixThreadRwlock_unlock(&handle->dbLock);
     //Force close all connections that are queued in a list, done outside of 
locking handle->dbLock to prevent deadlock
     for (int i = 0; i < nofConnToClose; i++) {
         pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);
@@ -1216,7 +1235,8 @@ int pubsub_tcpHandler_acceptHandler(pubsub_tcpHandler_t 
*handle, psa_tcp_connect
 #else
         struct epoll_event event;
         bzero(&event, sizeof(event)); // zero the struct
-        event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
+        event.events = EPOLLRDHUP | EPOLLERR;
+        if (handle->isEndPoint) event.events |= EPOLLIN;
         event.data.fd = entry->fd;
         // Register Read to epoll
         rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index ed4581c..a08911c 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -58,15 +58,14 @@ int pubsub_tcpHandler_close(pubsub_tcpHandler_t *handle, 
int fd);
 int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url);
 int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url);
 int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url);
-
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
-                                               unsigned int maxNofBuffers,
-                                               unsigned int bufferSize);
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, 
unsigned int size);
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int 
size);
 void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int 
timeout);
 void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned 
int count);
 void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, 
unsigned int count);
 void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double 
timeout);
 void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double 
timeout);
+void pubsub_tcpHandler_setEndPoint(pubsub_tcpHandler_t *handle, bool 
isEndPoint);
 
 int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 4fa4586..533e773 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -38,6 +38,7 @@
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
 #include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
 #include <celix_api.h>
 
 #define MAX_EPOLL_EVENTS     16
@@ -67,6 +68,7 @@ struct pubsub_tcp_topic_receiver {
     bool metricsEnabled;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     struct {
         celix_thread_t thread;
@@ -164,6 +166,7 @@ pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context
     const char *staticServerEndPointUrls = NULL;
     const char *staticConnectUrls = NULL;
 
+    pubsubInterceptorsHandler_create(ctx, scope, topic, 
&receiver->interceptorsHandler);
     staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, 
PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
 
     if (topicProperties != NULL) {
@@ -213,14 +216,11 @@ pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context
         long retryCnt = celix_properties_getAsLong(topicProperties, 
PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
                                                    
PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
         double rcvTimeout = celix_properties_getAsDouble(topicProperties, 
PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
-        long sessions = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_MAX_RECV_SESSIONS,
-                                                              
PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
-        long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_RECV_BUFFER_SIZE,
+        long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_RECV_BUFFER_SIZE,
                                                                  
PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
         long timeout = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
         pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope);
-        pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, 
(unsigned int) sessions,
-                                                   (unsigned int) buffer_size);
+        pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, 
(unsigned int) bufferSize);
         pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) 
timeout);
         pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, 
processMsg);
         
pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, 
receiver, psa_tcp_connectHandler,
@@ -559,31 +559,48 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t 
*receiver, psa_tcp_subs
             }
 
             if (status == CELIX_SUCCESS) {
-                hash_map_iterator_t iter = 
hashMapIterator_construct(entry->subscriberServices);
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = 
pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, 
msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = 
hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, 
deSerializedMsg, message->metadata.metadata,
-                                 &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more 
receive function to come ..
-                        //deserialize again for new message
-                        status = msgSer->deserialize(msgSer->handle, 
&deSerializeBuffer, 1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type 
%s for scope/topic %s/%s", msgSer->msgName,
-                                   receiver->scope == NULL ? "(null)" : 
receiver->scope, receiver->topic);
-                            break;
+                if (cont) {
+                    hash_map_iterator_t iter = 
hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter)) {
+                        pubsub_subscriber_t *svc = 
hashMapIterator_nextValue(&iter);
+                        svc->receive(svc->handle,
+                                     msgSer->msgName,
+                                     msgSer->msgId,
+                                     deSerializedMsg,
+                                     message->metadata.metadata,
+                                     &release);
+                        
pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler,
+                                                                   msgType,
+                                                                   msgId,
+                                                                   
deSerializedMsg,
+                                                                   metadata);
+                        if (!release && hashMapIterator_hasNext(&iter)) {
+                            //receive function has taken ownership and still 
more receive function to come ..
+                            //deserialize again for new message
+                            status = msgSer->deserialize(msgSer->handle, 
&deSerializeBuffer, 1, &deSerializedMsg);
+                            if (status != CELIX_SUCCESS) {
+                                L_WARN("[PSA_TCP_TR] Cannot deserialize msg 
type %s for scope/topic %s/%s",
+                                       msgSer->msgName,
+                                       receiver->scope == NULL ? "(null)" : 
receiver->scope,
+                                       receiver->topic);
+                                break;
+                            }
+                            release = true;
                         }
-                        release = true;
                     }
+                    if (release) {
+                        msgSer->freeDeserializeMsg(msgSer->handle, 
deSerializedMsg);
+                    }
+                    if (message->metadata.metadata) {
+                        celix_properties_destroy(message->metadata.metadata);
+                    }
+                    updateReceiveCount += 1;
                 }
-                if (release) {
-                    msgSer->freeDeserializeMsg(msgSer->handle, 
deSerializedMsg);
-                }
-                if (message->metadata.metadata) {
-                    celix_properties_destroy(message->metadata.metadata);
-                }
-                updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
                 L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for 
scope/topic %s/%s", msgSer->msgName,
@@ -650,10 +667,7 @@ static void *psa_tcp_recvThread(void *data) {
 
 pubsub_admin_receiver_metrics_t 
*pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) {
     pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope,
-             PUBSUB_AMDIN_METRICS_NAME_MAX,
-             "%s",
-             receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : 
receiver->scope);
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
receiver->topic);
 
     int msgTypesCount = 0;
@@ -677,8 +691,7 @@ pubsub_admin_receiver_metrics_t 
*pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topi
         hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
         while (hashMapIterator_hasNext(&iter2)) {
             hash_map_t *origins = hashMapIterator_nextValue(&iter2);
-            result->msgTypes[i].origins = calloc((size_t) 
hashMap_size(origins),
-                                                 
sizeof(*(result->msgTypes[i].origins)));
+            result->msgTypes[i].origins = calloc((size_t) 
hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
             result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
             int k = 0;
             hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index dbb5e26..f7598f9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -35,6 +35,7 @@
 #include "celix_constants.h"
 #include <signal.h>
 #include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
 
 #define FIRST_SEND_DELAY_IN_SECONDS              2
 #define TCP_BIND_MAX_RETRY                      10
@@ -59,6 +60,7 @@ struct pubsub_tcp_topic_sender {
     bool metricsEnabled;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     char *scope;
     char *topic;
@@ -146,6 +148,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
     }
     sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, 
PSA_TCP_METRICS_ENABLED,
                                                                    
PSA_TCP_DEFAULT_METRICS_ENABLED);
+    pubsubInterceptorsHandler_create(ctx, scope, topic, 
&sender->interceptorsHandler);
     bool isEndpoint = false;
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, 
PUBSUB_TCP_PSA_IP_KEY, NULL);
@@ -202,10 +205,13 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
         double timeout = celix_properties_getAsDouble(topicProperties, 
PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
                                                                        
(!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
                                                                                
        PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, 
PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, 
sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned 
int) retryCnt);
         pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+        pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) 
maxMsgSize);
+        pubsub_tcpHandler_setEndPoint(sender->socketHandler, isEndpoint);
     }
 
     //setting up tcp socket for TCP TopicSender
@@ -312,6 +318,7 @@ void 
pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
         celixThreadMutex_unlock(&sender->boundedServices.mutex);
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
 
+        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
         if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) {
             pubsub_tcpHandler_destroy(sender->socketHandler);
             sender->socketHandler = NULL;
@@ -533,7 +540,8 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int 
msgTypeId, const void *i
             clock_gettime(CLOCK_REALTIME, &serializationEnd);
         }
 
-        if (status == CELIX_SUCCESS /*ser ok*/) {
+        bool cont = 
pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, 
entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
+        if (status == CELIX_SUCCESS /*ser ok*/ && cont) {
             pubsub_protocol_message_t message;
             message.metadata.metadata = NULL;
             message.payload.payload = NULL;
@@ -561,6 +569,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int 
msgTypeId, const void *i
                     status = -1;
                     sendOk = false;
                 }
+                
pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, 
entry->msgSer->msgName, msgTypeId, inMsg, metadata);
                 if (message.metadata.metadata)
                     celix_properties_destroy(message.metadata.metadata);
                 if (serializedIoVecOutput) {
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h 
b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
index 87d4263..b10863c 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
@@ -28,16 +28,16 @@ typedef struct pubsub_utils_url {
   char *url;
   char *protocol;
   char *hostname;
-  unsigned int portnr;
+  unsigned int port_nr;
   char *uri;
   char *interface;
-  unsigned int interface_portnr;
+  unsigned int interface_port_nr;
   char *interface_url;
 } pubsub_utils_url_t;
 
 struct sockaddr_in *pubsub_utils_url_from_fd(int fd);
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port);
-char *pubsub_utils_url_generate_url(char *hostname, unsigned int portnr, char 
*protocol);
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned 
int port);
+char *pubsub_utils_url_generate_url(char *hostname, unsigned int port_nr, char 
*protocol);
 char *pubsub_utils_url_get_url(struct sockaddr_in *inp, char *protocol);
 bool pubsub_utils_url_is_multicast(char *hostname);
 char *pubsub_utils_url_get_multicast_ip(char *hostname);
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c 
b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
index d8d518c..65a1ff2 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
@@ -56,7 +56,7 @@ struct sockaddr_in *pubsub_utils_url_from_fd(int fd) {
     return inp;
 }
 
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port) 
{
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned 
int port) {
     struct hostent *hp;
     struct sockaddr_in *inp = malloc(sizeof(struct sockaddr_in));
     bzero(inp, sizeof(struct sockaddr_in)); // zero the struct
@@ -220,11 +220,11 @@ void pubsub_utils_url_parse_url(char *_url, 
pubsub_utils_url_t *url_info) {
                 maxPortnr += 1;
                 unsigned int minDigits = (unsigned int) atoi(portnr);
                 unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
-                url_info->portnr = pubsub_utils_url_rand_range(minDigits, 
maxDigits);
+                url_info->port_nr = pubsub_utils_url_rand_range(minDigits, 
maxDigits);
             } else {
                 unsigned int portDigits = (unsigned int) atoi(portnr);
                 if (portDigits != 0)
-                    url_info->portnr = portDigits;
+                    url_info->port_nr = portDigits;
                 uri = strstr(port, "/");
                 if ((uri) && (!url_info->uri))
                     url_info->uri = celix_utils_strdup(uri);
@@ -256,11 +256,11 @@ void pubsub_utils_url_parse_url(char *_url, 
pubsub_utils_url_t *url_info) {
                 maxPortnr += 1;
                 unsigned int minDigits = (unsigned int) atoi(portnr);
                 unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
-                url_info->interface_portnr = 
pubsub_utils_url_rand_range(minDigits, maxDigits);
+                url_info->interface_port_nr = 
pubsub_utils_url_rand_range(minDigits, maxDigits);
             } else {
                 unsigned int portDigits = (unsigned int) atoi(portnr);
                 if (portDigits != 0)
-                    url_info->interface_portnr = portDigits;
+                    url_info->interface_port_nr = portDigits;
                 uri = strstr(port, "/");
                 if ((uri) && (!url_info->uri))
                     url_info->uri = celix_utils_strdup(uri);
@@ -289,13 +289,13 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) {
             free(url_info->interface);
             url_info->interface = ip;
         }
-        struct sockaddr_in *m_sin = 
pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_portnr);
+        struct sockaddr_in *m_sin = 
pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_port_nr);
         url_info->interface_url = pubsub_utils_url_get_url(m_sin, NULL);
         free(m_sin);
         pubsub_utils_url_parse_url(url_info->interface_url, 
&interface_url_info);
         free(url_info->interface);
         url_info->interface = interface_url_info.hostname;
-        url_info->interface_portnr = interface_url_info.portnr;
+        url_info->interface_port_nr = interface_url_info.port_nr;
     }
 
     if (url_info->hostname) {
@@ -306,11 +306,11 @@ pubsub_utils_url_t *pubsub_utils_url_parse(char *url) {
             free(url_info->hostname);
             url_info->hostname = ip;
         }
-        struct sockaddr_in *sin = 
pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *sin = 
pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         url_info->url = pubsub_utils_url_get_url(sin, url_info->protocol);
         free(url_info->hostname);
         free(sin);
-        url_info->portnr = 0;
+        url_info->port_nr = 0;
         url_info->hostname = NULL;
         pubsub_utils_url_parse_url(url_info->url, url_info);
     }
@@ -338,7 +338,7 @@ void pubsub_utils_url_free(pubsub_utils_url_t *url_info) {
     url_info->hostname = NULL;
     url_info->protocol = NULL;
     url_info->interface = NULL;
-    url_info->portnr = 0;
-    url_info->interface_portnr = 0;
+    url_info->port_nr = 0;
+    url_info->interface_port_nr = 0;
     free(url_info);
 }

Reply via email to