This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2 in repository https://gitbox.apache.org/repos/asf/celix.git
commit a35435e8df3347151aa9db6b1cd2eb6731ff1a1c Merge: dbf6bdc 3be06db Author: Roy Bulter <[email protected]> AuthorDate: Mon Aug 3 12:43:47 2020 +0200 Merge branch 'master' into feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v2 # Conflicts: # bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c .github/workflows/coverity-scan.yml | 57 +++ CHANGES.md | 4 + bundles/deployment_admin/README.md | 4 + bundles/device_access/README.md | 4 + .../device_access/example/base_driver/README.md | 4 + .../example/consuming_driver/README.md | 4 + .../example/refining_driver/README.md | 4 + bundles/http_admin/README.md | 4 + bundles/http_admin/civetweb/src/civetweb.c | 2 +- bundles/logging/README.md | 4 + bundles/logging/log_writers/README.md | 4 + bundles/pubsub/README.md | 4 + bundles/pubsub/examples/keys/README.md | 4 + .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 16 +- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 251 ++++-------- .../src/pubsub_tcp_topic_receiver.c | 3 + bundles/pubsub/pubsub_admin_udp_mc/README.md | 4 + .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c | 15 +- .../src/pubsub_udpmc_topic_receiver.c | 5 +- .../src/pubsub_websocket_admin.c | 15 +- .../src/pubsub_websocket_topic_receiver.c | 3 + .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 35 +- .../src/pubsub_zmq_topic_receiver.c | 7 +- .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 118 +++--- .../pubsub_protocol_lib/CMakeLists.txt | 8 +- .../pubsub_protocol_lib/gtest/CMakeLists.txt} | 15 +- .../gtest/src/PS_WP_common_tests.cc | 57 +++ .../pubsub_protocol_lib/gtest/src/main.cc | 26 ++ .../src/pubsub_wire_protocol_common.c | 93 +++-- bundles/pubsub/pubsub_spi/CMakeLists.txt | 7 +- .../gtest/CMakeLists.txt} | 12 +- .../gtest/src/PubSubEndpointUtilsTestSuite.cc | 47 +++ .../pubsub/pubsub_spi/include/pubsub_endpoint.h | 16 +- bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 3 +- .../pubsub/pubsub_spi/src/pubsub_endpoint_match.c | 11 + .../src/pubsub_topology_manager.c | 452 +++++++++++++-------- .../src/pubsub_topology_manager.h | 16 +- bundles/pubsub/test/CMakeLists.txt | 142 ++++++- .../{ping.properties => deadlock.scope.properties} | 6 +- ...{ping.properties => deadlock.scope2.properties} | 6 +- bundles/pubsub/test/meta_data/ping.properties | 2 +- bundles/pubsub/test/meta_data/ping2.properties | 2 +- bundles/pubsub/test/meta_data/pong2.properties | 2 +- .../pubsub/test/pstm_deadlock_test/test_runner.cc | 142 +++++++ bundles/pubsub/test/test/test_endpoint_runner.cc | 4 +- bundles/pubsub/test/test/test_runner.cc | 92 ++--- bundles/remote_services/README.md | 6 +- bundles/remote_services/discovery_etcd/README.md | 4 + .../remote_service_admin_dfi/README.md | 4 + .../remote_services/remote_services_api/README.md | 4 + bundles/remote_services/rsa_spi/README.md | 4 + bundles/remote_services/topology_manager/README.md | 4 + bundles/shell/remote_shell/README.md | 4 + bundles/shell/shell/README.md | 4 + bundles/shell/shell_tui/README.md | 4 + bundles/shell/shell_wui/README.md | 4 + documents/building/README.md | 4 + documents/cmake_commands/README.md | 4 + documents/getting_started/README.md | 4 + .../getting_started/creating_a_simple_bundle.md | 6 +- documents/getting_started/using_services_with_c.md | 4 + .../getting_started/using_services_with_cxx.md | 4 + documents/intro/README.md | 6 +- documents/subprojects/README.md | 6 + examples/celix-examples/README.md | 4 + examples/celix-examples/http_example/README.md | 7 +- .../celix-examples/services_example_c/README.md | 4 + libs/dependency_manager/README.md | 4 + libs/dependency_manager/TODO.md | 6 +- libs/dependency_manager_cxx/README.md | 4 + libs/dependency_manager_cxx/TODO.md | 6 +- libs/etcdlib/README.md | 4 + libs/framework/src/dm_dependency_manager_impl.c | 44 +- libs/launcher/README.md | 4 + libs/utils/README.md | 4 + libs/utils/gtest/src/LogUtilsTestSuite.cc | 2 - misc/experimental/README.md | 4 + misc/experimental/bundles/config_admin/README.md | 4 + misc/experimental/promise/README.md | 4 + 79 files changed, 1351 insertions(+), 570 deletions(-) diff --cc bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 20eb401,0cbd28f..4734497 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@@ -95,13 -86,9 +93,11 @@@ typedef struct psa_tcp_connection_entr void *footerBuffer; unsigned int bufferSize; void *buffer; - unsigned int bufferReadReadOffset; - unsigned int expectedBufferReadSize; - unsigned int msgSizeReadSize; + unsigned int bufferReadSize; unsigned int metaBufferSize; void *metaBuffer; + struct msghdr msg; + size_t msg_iovlen; /* Number of elements in the vector. */ unsigned int retryCount; } psa_tcp_connection_entry_t; @@@ -343,13 -338,6 +348,9 @@@ pubsub_tcpHandler_createEntry(pubsub_tc } if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize); if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize); + entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer; + entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize; + entry->msg_iovlen++; - pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry, READ_STATE_HEADER); } return entry; } @@@ -652,25 -628,16 +641,24 @@@ int pubsub_tcpHandler_listen(pubsub_tcp } // -// Setup buffer sizes +// Setup receive buffer size // -int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle, - unsigned int maxNofBuffers - __attribute__((__unused__)), - unsigned int bufferSize) { +int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size) { if (handle != NULL) { celixThreadRwlock_writeLock(&handle->dbLock); - handle->bufferSize = bufferSize; - handle->maxNofBuffer = maxNofBuffers; + handle->bufferSize = size; + celixThreadRwlock_unlock(&handle->dbLock); + } + return 0; +} + +// +// Setup receive buffer size +// +int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size) { + if (handle != NULL) { + celixThreadRwlock_writeLock(&handle->dbLock); + handle->maxMsgSize = size; - handle->maxMsgSize = 4; celixThreadRwlock_unlock(&handle->dbLock); } return 0; @@@ -809,11 -775,12 +795,12 @@@ static inlin void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) { if (entry->header.header.payloadSize > 0) { -- handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header); ++ handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header); } if (entry->header.header.metadataSize > 0) { -- handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer, ++ handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer, entry->header.header.metadataSize, &entry->header); - entry->metaBufferSize = entry->header.header.metadataSize; ++ entry->metaBufferSize = entry->header.header.metadataSize; } if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) { struct timespec receiveTime; @@@ -1173,136 -1048,113 +1077,135 @@@ int pubsub_tcpHandler_write(pubsub_tcpH handle->protocol->encodeFooter(handle->protocol->handle, message, &footerData, &footerDataSize); - entry->footerSize = footerDataSize; } + size_t msgSize = 0; - struct msghdr msg; - struct iovec msg_iov[IOV_MAX]; - memset(&msg, 0x00, sizeof(struct msghdr)); - msg.msg_name = &entry->addr; - msg.msg_namelen = entry->len; - msg.msg_flags = flags; - msg.msg_iov = msg_iov; - - // Write generic seralized payload in vector buffer - if (payloadSize && payloadData) { - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = payloadData; - msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; - } else { - // copy serialized vector into vector buffer - for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) { + size_t msgIovLen = 0; + long int nbytes = UINT32_MAX; + while (msgSize < totalMessageSize && nbytes > 0) { + struct msghdr msg; + struct iovec msg_iov[IOV_MAX]; + memset(&msg, 0x00, sizeof(struct msghdr)); + msg.msg_name = &entry->addr; + msg.msg_namelen = entry->len; + msg.msg_flags = flags; + msg.msg_iov = msg_iov; + size_t msgPartSize = 0; + message->header.payloadPartSize = 0; + message->header.payloadOffset = 0; + message->header.metadataSize = 0; + message->header.isLastSegment = 0; + + // Write generic seralized payload in vector buffer + if (payloadSize && payloadData) { + char *payloadDataBuffer = payloadData; msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base; - msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len; + msg.msg_iov[msg.msg_iovlen].iov_base = &payloadDataBuffer[msgSize]; + msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgSize), entry->maxMsgSize); + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + message->header.payloadPartSize = msgPartSize; + message->header.payloadOffset = msgSize; msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + } else { + // copy serialized vector into vector buffer + for (size_t i = 0; i < MIN(msg_iov_len, max_msg_iov_len); i++) { + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[msgIovLen + i].iov_base; + msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[msgIovLen + i].iov_len; + if ((msgPartSize + msg.msg_iov[msg.msg_iovlen].iov_len) > entry->maxMsgSize) + break; + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + } + message->header.payloadPartSize = msgPartSize; + message->header.payloadOffset = msgSize; + msgSize += msgPartSize; + msgIovLen += (msg.msg_iovlen - 1); } - } - // Write optional metadata in vector buffer - if (metadataSize && metadataData) { - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = metadataData; - msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; - } + // Write optional metadata in vector buffer + if ((msgSize < (payloadSize + metadataSize)) && + (msgPartSize < entry->maxMsgSize) && + (metadataSize && metadataData)) { + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = metadataData; + msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize; + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + message->header.metadataSize = metadataSize; + msgSize += metadataSize; + } + if (msgSize >= totalMessageSize) { + message->header.isLastSegment = 0x1; + } - // Write optional footerData in vector buffer - if (footerData && footerDataSize) { - msg.msg_iovlen++; - msg.msg_iov[msg.msg_iovlen].iov_base = footerData; - msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize; - msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; - } + // Write optional footerData in vector buffer + if (footerData && footerDataSize) { + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = footerData; + msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize; + msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; + } - void *headerData = NULL; - size_t headerSize = 0; - // check if header is not part of the payload (=> headerBufferSize = 0)s - if (entry->headerBufferSize) { - // Encode the header, with payload size and metadata size - handle->protocol->encodeHeader(handle->protocol->handle, message, - &headerData, - &headerSize); - } - if (!entry->headerBufferSize) { - // Skip header buffer, when header is part of payload; - msg.msg_iov = &msg_iov[1]; - } else if (headerSize && headerData) { - // Write header in 1st vector buffer item - msg.msg_iov[0].iov_base = headerData; - msg.msg_iov[0].iov_len = headerSize; - msgPartSize += msg.msg_iov[0].iov_len; - msg.msg_iovlen++; + void *headerData = NULL; + size_t headerSize = 0; + // check if header is not part of the payload (=> headerBufferSize = 0)s + if (entry->headerBufferSize) { - headerData = entry->headerBuffer; + // Encode the header, with payload size and metadata size + handle->protocol->encodeHeader(handle->protocol->handle, message, + &headerData, + &headerSize); - entry->headerBufferSize = headerSize; + } + if (!entry->headerBufferSize) { + // Skip header buffer, when header is part of payload; + msg.msg_iov = &msg_iov[1]; + } else if (headerSize && headerData) { + // Write header in 1st vector buffer item + msg.msg_iov[0].iov_base = headerData; + msg.msg_iov[0].iov_len = headerSize; + msgSize += msg.msg_iov[0].iov_len; + msg.msg_iovlen++; + } else { + L_ERROR("[TCP Socket] No header buffer is generated"); + msg.msg_iovlen = 0; + } + long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags); + // When a specific socket keeps reporting errors can indicate a subscriber + // which is not active anymore, the connection will remain until the retry + // counter exceeds the maximum retry count. + // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error. + if (nbytes == -1) { + if (entry->retryCount < handle->maxSendRetryCount) { + entry->retryCount++; + L_ERROR( + "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ", + entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount); } else { - L_ERROR("[TCP Socket] No header buffer is generated"); - msg.msg_iovlen = 0; - } - nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgPartSize, flags); - // When a specific socket keeps reporting errors can indicate a subscriber - // which is not active anymore, the connection will remain until the retry - // counter exceeds the maximum retry count. - // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error. - if (nbytes == -1) { - if (entry->retryCount < handle->maxSendRetryCount) { - entry->retryCount++; - L_ERROR( - "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ", - entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount); - } else { - L_ERROR( - "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno)); - connFdCloseQueue[nofConnToClose++] = entry->fd; - } - result = -1; //At least one connection failed sending - } else if (msgPartSize) { - entry->retryCount = 0; - if (nbytes != msgPartSize) { - L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno)); - } - } - // Release data - if (headerData) { - free(headerData); - } - // Note: serialized Payload is deleted by serializer - if (payloadData && (payloadData != message->payload.payload)) { - free(payloadData); + L_ERROR( + "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", + entry->fd, handle->maxSendRetryCount, strerror(errno)); + connFdCloseQueue[nofConnToClose++] = entry->fd; } - if (metadataData) { - free(metadataData); - } - if (footerData) { - free(footerData); + result = -1; //At least one connection failed sending + } else if (msgSize) { + entry->retryCount = 0; + if (nbytes != msgSize) { + L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno)); } } - + // Release data - if (headerData && headerData != entry->headerBuffer) { ++ if (headerData) { + free(headerData); + } + // Note: serialized Payload is deleted by serializer + if (payloadData && (payloadData != message->payload.payload)) { + free(payloadData); + } - if (metadataData && metadataData != entry->metaBuffer) { ++ if (metadataData) { + free(metadataData); + } - if (footerData && footerData != entry->footerBuffer) { ++ if (footerData) { + free(footerData); + } } } celixThreadRwlock_unlock(&handle->dbLock);
