This is an automated email from the ASF dual-hosted git repository.

rbulter pushed a commit to branch 
feature/add_msg_segemenation_to_tcp_admin_with_wire_v2
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 7b8608b30f97b87d4df76a9e3d2ca7a4bb02d424
Author: Roy Bulter <[email protected]>
AuthorDate: Wed Apr 29 21:25:31 2020 +0200

    Merge
---
 .../src/pubsub_psa_tcp_constants.h                 |  5 ++---
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.c      | 26 ++++++++++++++++------
 .../pubsub_admin_tcp/src/pubsub_tcp_handler.h      |  6 ++---
 .../src/pubsub_tcp_topic_receiver.c                |  7 ++----
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |  2 ++
 5 files changed, 27 insertions(+), 19 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 3e7a7b3..1ca4ff1 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -23,7 +23,7 @@
 #define PSA_TCP_BASE_PORT                       "PSA_TCP_BASE_PORT"
 #define PSA_TCP_MAX_PORT                        "PSA_TCP_MAX_PORT"
 
-#define PSA_TCP_MAX_RECV_SESSIONS               "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE                "PSA_TCP_MAX_MESSAGE_SIZE"
 #define PSA_TCP_RECV_BUFFER_SIZE                "PSA_TCP_RECV_BUFFER_SIZE"
 #define PSA_TCP_TIMEOUT                         "PSA_TCP_TIMEOUT"
 #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT   
"PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
@@ -31,8 +31,7 @@
 #define PSA_TCP_DEFAULT_BASE_PORT               5501
 #define PSA_TCP_DEFAULT_MAX_PORT                6000
 
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       1
-
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE        0
 #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        65 * 1024
 #define PSA_TCP_DEFAULT_TIMEOUT                 2000 // 2 seconds
 #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 6e2072e..b63e45b 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -330,6 +330,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, 
int fd, char *url, ch
         entry->headerBufferSize = size;
         handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
         entry->syncSize = size;
+        entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : 
UINT32_MAX;
         entry->bufferSize = handle->bufferSize;
         entry->connected = false;
         entry->msg.msg_iov = calloc(sizeof(struct iovec), IOV_MAX);
@@ -628,22 +629,32 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, 
char *url) {
 }
 
 //
-// Setup buffer sizes
+// Setup receive buffer size
 //
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
-                                               unsigned int maxNofBuffers
-                                               __attribute__((__unused__)),
-                                               unsigned int bufferSize) {
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, 
unsigned int size) {
     if (handle != NULL) {
         celixThreadRwlock_writeLock(&handle->dbLock);
-        handle->bufferSize = bufferSize;
-        handle->maxNofBuffer = maxNofBuffers;
+        handle->bufferSize = size;
         celixThreadRwlock_unlock(&handle->dbLock);
     }
     return 0;
 }
 
 //
+// Setup receive buffer size
+//
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int 
size) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->maxMsgSize = size;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+    return 0;
+}
+
+
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int 
maxMsgSize);
+//
 // Setup thread timeout
 //
 void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle,
@@ -959,6 +970,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, 
pubsub_protocol_message
     int nofConnToClose = 0;
     if (handle) {
         hash_map_iterator_t iter = 
hashMapIterator_construct(handle->connection_fd_map);
+        size_t max_msg_iov_len = IOV_MAX - 2;
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
             void *payloadData = NULL;
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index 6f69cd2..34d1838 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -58,10 +58,8 @@ int pubsub_tcpHandler_close(pubsub_tcpHandler_t *handle, int 
fd);
 int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url);
 int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url);
 int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url);
-
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
-                                               unsigned int maxNofBuffers,
-                                               unsigned int bufferSize);
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, 
unsigned int size);
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int 
size);
 void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int 
timeout);
 void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned 
int count);
 void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, 
unsigned int count);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index fca41d2..18d17f4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -209,14 +209,11 @@ pubsub_tcp_topic_receiver_t 
*pubsub_tcpTopicReceiver_create(celix_bundle_context
                                                    
PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
         double rcvTimeout = celix_properties_getAsDouble(topicProperties, 
PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY,
                                                          
PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
-        long sessions = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_MAX_RECV_SESSIONS,
-                                                              
PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
-        long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_RECV_BUFFER_SIZE,
+        long bufferSize = celix_properties_getAsLong(topicProperties, 
PSA_TCP_RECV_BUFFER_SIZE,
                                                                  
PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
         long timeout = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
         pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope);
-        pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, 
(unsigned int) sessions,
-                                                   (unsigned int) buffer_size);
+        pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, 
(unsigned int) bufferSize);
         pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) 
timeout);
         pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, 
processMsg);
         
pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, 
receiver, psa_tcp_connectHandler,
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index 7d0f5f7..0f3a83f 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -191,10 +191,12 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create(
                                                    
PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
         double timeout = celix_properties_getAsDouble(topicProperties, 
PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
                                                       
PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, 
PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, 
sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned 
int) retryCnt);
         pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+        pubsub_tcpHandler_setReceiveBufferSize(sender->socketHandler, 
(unsigned int) maxMsgSize);
     }
 
     //setting up tcp socket for TCP TopicSender

Reply via email to