This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2 in repository https://gitbox.apache.org/repos/asf/celix.git
commit 7b8608b30f97b87d4df76a9e3d2ca7a4bb02d424 Author: Roy Bulter <[email protected]> AuthorDate: Wed Apr 29 21:25:31 2020 +0200 Merge --- .../src/pubsub_psa_tcp_constants.h | 5 ++--- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 26 ++++++++++++++++------ .../pubsub_admin_tcp/src/pubsub_tcp_handler.h | 6 ++--- .../src/pubsub_tcp_topic_receiver.c | 7 ++---- .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 2 ++ 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h index 3e7a7b3..1ca4ff1 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h @@ -23,7 +23,7 @@ #define PSA_TCP_BASE_PORT "PSA_TCP_BASE_PORT" #define PSA_TCP_MAX_PORT "PSA_TCP_MAX_PORT" -#define PSA_TCP_MAX_RECV_SESSIONS "PSA_TCP_MAX_RECV_SESSIONS" +#define PSA_TCP_MAX_MESSAGE_SIZE "PSA_TCP_MAX_MESSAGE_SIZE" #define PSA_TCP_RECV_BUFFER_SIZE "PSA_TCP_RECV_BUFFER_SIZE" #define PSA_TCP_TIMEOUT "PSA_TCP_TIMEOUT" #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT" @@ -31,8 +31,7 @@ #define PSA_TCP_DEFAULT_BASE_PORT 5501 #define PSA_TCP_DEFAULT_MAX_PORT 6000 -#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS 1 - +#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE 0 #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE 65 * 1024 #define PSA_TCP_DEFAULT_TIMEOUT 2000 // 2 seconds #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 6e2072e..b63e45b 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -330,6 +330,7 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch entry->headerBufferSize = size; handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size); entry->syncSize = size; + entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : UINT32_MAX; entry->bufferSize = handle->bufferSize; entry->connected = false; entry->msg.msg_iov = calloc(sizeof(struct iovec), IOV_MAX); @@ -628,22 +629,32 @@ int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url) { } // -// Setup buffer sizes +// Setup receive buffer size // -int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle, - unsigned int maxNofBuffers - __attribute__((__unused__)), - unsigned int bufferSize) { +int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size) { if (handle != NULL) { celixThreadRwlock_writeLock(&handle->dbLock); - handle->bufferSize = bufferSize; - handle->maxNofBuffer = maxNofBuffers; + handle->bufferSize = size; celixThreadRwlock_unlock(&handle->dbLock); } return 0; } // +// Setup receive buffer size +// +int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size) { + if (handle != NULL) { + celixThreadRwlock_writeLock(&handle->dbLock); + handle->maxMsgSize = size; + celixThreadRwlock_unlock(&handle->dbLock); + } + return 0; +} + + +int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int maxMsgSize); +// // Setup thread timeout // void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, @@ -959,6 +970,7 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message int nofConnToClose = 0; if (handle) { hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map); + size_t max_msg_iov_len = IOV_MAX - 2; while (hashMapIterator_hasNext(&iter)) { psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter); void *payloadData = NULL; diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h index 6f69cd2..34d1838 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h @@ -58,10 +58,8 @@ int pubsub_tcpHandler_close(pubsub_tcpHandler_t *handle, int fd); int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url); int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url); int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url); - -int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle, - unsigned int maxNofBuffers, - unsigned int bufferSize); +int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size); +int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size); void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout); void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count); void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count); diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c index fca41d2..18d17f4 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c @@ -209,14 +209,11 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT); double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT); - long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS, - PSA_TCP_DEFAULT_MAX_RECV_SESSIONS); - long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE, + long bufferSize = celix_properties_getAsLong(topicProperties, PSA_TCP_RECV_BUFFER_SIZE, PSA_TCP_DEFAULT_RECV_BUFFER_SIZE); long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT); pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope); - pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, (unsigned int) sessions, - (unsigned int) buffer_size); + pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize); pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout); pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg); pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler, diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c index 7d0f5f7..0f3a83f 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c @@ -191,10 +191,12 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT); double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT); + long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE); pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope); pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched); pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt); pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout); + pubsub_tcpHandler_setReceiveBufferSize(sender->socketHandler, (unsigned int) maxMsgSize); } //setting up tcp socket for TCP TopicSender
