This is an automated email from the ASF dual-hosted git repository.
rbulter pushed a commit to branch
feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v3
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to
refs/heads/feature/add_msg_segemenation_to_tcp_admin_with_wire_v2_add_make_non_blocking_v3
by this push:
new fe4cef0 FIx
fe4cef0 is described below
commit fe4cef0c141de6ec0575f5dec1a5040a6916b91d
Author: Roy Bulter <[email protected]>
AuthorDate: Mon Oct 12 21:16:21 2020 +0200
FIx
---
CMakeLists.txt | 2 +-
.../publisher/private/src/pubsub_publisher.c | 24 ++-
.../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 194 ++++++++++-----------
3 files changed, 108 insertions(+), 112 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 788200f..598600b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -43,7 +43,7 @@ set(ENABLE_MORE_WARNINGS OFF)
# Set C specific flags
set(CMAKE_C_FLAGS "-D_GNU_SOURCE -std=gnu99 -fPIC ${CMAKE_C_FLAGS}")
-set(CMAKE_C_FLAGS "-Wall -Werror ${CMAKE_C_FLAGS}")
+#set(CMAKE_C_FLAGS "-Wall -Werror ${CMAKE_C_FLAGS}")
# Set C++ specific flags
set(CMAKE_CXX_FLAGS "-std=c++11 -fno-rtti ${CMAKE_CXX_FLAGS}")
diff --git
a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
index 92e0e65..03857dd 100644
--- a/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
+++ b/bundles/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c
@@ -54,16 +54,15 @@ static void* send_thread(void* arg) {
//poi_t point = calloc(1,sizeof(*point));
location_t place = calloc(1, sizeof(*place));
- char *desc = calloc(1, sizeof(char)); //calloc(64, sizeof(char));
- //snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned
long)pthread_self());
+ char *desc = calloc(64, sizeof(char));
+ snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned
long)pthread_self());
- char *name = calloc(1, sizeof(char));//calloc(64, sizeof(char));
- //snprintf(name, 64, "Bundle#%ld", publisher->bundleId);
+ char *name = calloc(64, sizeof(char));
+ snprintf(name, 64, "Bundle#%ld", publisher->bundleId);
place->name = name;
place->description = desc;
- //place->extra = "extra value";
- place->extra = calloc(1, 1);
+ place->extra = "extra value";
printf("TOPIC : %s\n", st_struct->topic);
unsigned int msgId = 0;
@@ -78,21 +77,18 @@ static void* send_thread(void* arg) {
if (msgId > 0) {
place->position.lat = randCoordinate(MIN_LAT, MAX_LAT);
place->position.lon = randCoordinate(MIN_LON, MAX_LON);
- //int nr_char = (int) randCoordinate(5, 100000);
- int nr_char = 1;//(int) randCoordinate(5, 20);
+ int nr_char = (int) randCoordinate(5, 100000);
place->data = calloc(nr_char, 1);
for (int i = 0; i < (nr_char - 1); i++) {
place->data[i] = i % 10 + '0';
}
place->data[nr_char - 1] = '\0';
if (publish_svc->send) {
- celix_properties_t *metadata = NULL;
- //celix_properties_t *metadata = celix_properties_create();
- //celix_properties_set(metadata, "Key", "Value");
-
+ celix_properties_t *metadata = celix_properties_create();
+ celix_properties_set(metadata, "Key", "Value");
if (publish_svc->send(publish_svc->handle, msgId, place,
metadata) == 0) {
- // printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",
st_struct->topic,
- // place->position.lat, place->position.lon,
place->name, place->description, nr_char);
+ printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",
st_struct->topic,
+ place->position.lat, place->position.lon,
place->name, place->description, nr_char);
}
} else {
printf("No send for %s\n", st_struct->topic);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 2749794..6827e52 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -640,7 +640,7 @@ int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t
*handle, unsigned int si
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
handle->maxMsgSize = size;
- handle->maxMsgSize = 16;
+ handle->maxMsgSize = 4;
celixThreadRwlock_unlock(&handle->dbLock);
}
return 0;
@@ -887,7 +887,7 @@ static inline void
pubsub_tcpHandler_setNextStateReadStateMachine(pubsub_tcpHand
// Reads data from the filedescriptor which has date (determined by epoll())
and stores it in the internal structure
// If the message is completely reassembled true is returned and the index and
size have valid values
//
-int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
+int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
celixThreadRwlock_writeLock(&handle->dbLock);
psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map,
(void *) (intptr_t) fd);
if (entry == NULL)
@@ -914,116 +914,110 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t
*handle, int fd) {
entry->bufferSize = handle->bufferSize;
}
}
+ struct msghdr msg;
+ struct iovec msg_iov[IOV_MAX];
+ memset(&msg, 0x00, sizeof(struct msghdr));
+ msg.msg_iov = msg_iov;
+
// Read the message
- // Read the message
- entry->msg.msg_iovlen = 0;
- entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base =
(entry->headerBufferSize) ? entry->headerBuffer
-
: entry->buffer;
- entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize;
- entry->msg.msg_iovlen++;
- int nbytes = recvmsg(fd, &entry->msg, MSG_PEEK | MSG_NOSIGNAL);
- long int nbytes = UINT32_MAX;
- char *header_buffer = (entry->readHeaderBufferSize) ?
entry->readHeaderBuffer : entry->buffer;
- if (entry->state == READ_STATE_SYNC) {
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd,
header_buffer, 0);
- nbytes = recv(fd, header_buffer, entry->expectedBufferReadSize, flag |
MSG_NOSIGNAL);
- if (nbytes && (entry->expectedBufferReadSize <= 0)) {
- pubsub_tcpHandler_setReadStateMachine(handle, entry);
- }
- }
- if (entry->state == READ_STATE_HEADER) {
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd,
header_buffer, MSG_PEEK);
- if (nbytes && (entry->expectedBufferReadSize <= 0)) { // Check header
message buffer
- if (handle->protocol->decodeHeader(handle->protocol->handle,
- header_buffer,
- entry->headerSize,
- &entry->header) !=
CELIX_SUCCESS) {
- // Did not receive correct header
- // skip sync word and try to read next header
- if (!entry->headerError) {
- L_WARN("[TCP Socket] Failed to decode message header (fd:
%d) (url: %s)", entry->fd, entry->url);
- }
- entry->headerError = true;
- pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry,
READ_STATE_SYNC);
- } else {
- // Read header message from queue
- pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry,
READ_STATE_HEADER);
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd,
header_buffer, 0);
- if ((nbytes > 0) && (nbytes == entry->headerSize)) {
- entry->headerError = false;
- entry->msgSizeReadSize = 0;
- // For headerless message, add header to bufferReadSize;
- if (!entry->readHeaderBufferSize) entry->msgSizeReadSize
+= nbytes;
- pubsub_tcpHandler_setReadStateMachine(handle, entry);
+ msg.msg_iovlen = 0;
+ msg.msg_iov[msg.msg_iovlen].iov_base = (entry->readHeaderBufferSize) ?
entry->readHeaderBuffer : entry->buffer;
+ msg.msg_iov[msg.msg_iovlen].iov_len = entry->headerSize;
+ msg.msg_iovlen++;
+ long int msgSize = 0;
+ long int nbytes = recvmsg(fd, &msg, MSG_PEEK | MSG_NOSIGNAL);
+ if (nbytes >= entry->headerSize) {
+ msg.msg_iovlen--;
+ if (handle->protocol->decodeHeader(handle->protocol->handle,
+
msg.msg_iov[msg.msg_iovlen].iov_base,
+ msg.msg_iov[msg.msg_iovlen].iov_len,
+ &entry->header) != CELIX_SUCCESS) {
+ // Did not receive correct header
+ // skip sync word and try to read next header
+ if (!entry->headerError) {
+ L_WARN("[TCP Socket] Failed to decode message header (fd: %d)
(url: %s)", entry->fd, entry->url);
+ }
+ entry->headerError = true;
+ msg.msg_iov[msg.msg_iovlen].iov_len = entry->syncSize;
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
+ } else {
+ //printf("PayRR[%d]: Offset %d, %d total: %d uel %s\n",
entry->header.header.seqNr, (int)entry->header.header.payloadOffset,
(int)entry->header.header.payloadPartSize,
(int)entry->header.header.payloadSize, entry->url);
+ // Alloc message buffers
+ if (entry->header.header.payloadSize > entry->bufferSize) {
+ handle->bufferSize = MAX(handle->bufferSize,
entry->header.header.payloadSize);
+ char *buffer = realloc(entry->buffer, (size_t)
handle->bufferSize);
+ if (buffer) {
+ entry->buffer = buffer;
+ entry->bufferSize = handle->bufferSize;
}
}
- }
- }
-
- if (entry->state == READ_STATE_PAYLOAD) {
- // Alloc message buffers
- if (entry->header.header.payloadSize > entry->bufferSize) {
- handle->bufferSize = MAX(handle->bufferSize,
entry->header.header.payloadSize + PADDING_BUFFER_SIZE);
- if (entry->buffer) {
- free(entry->buffer);
+ if (entry->header.header.payloadOffset ==0 ){
+ memset(entry->buffer, 0x00, entry->bufferSize);
}
- entry->buffer = malloc((size_t) handle->bufferSize);
- entry->bufferSize = handle->bufferSize;
- }
-
- //if (entry->header.header.isLastSegment)
entry->expectedBufferReadSize+=4;
- // Read payload data from queue
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd,
entry->buffer, 0);
- if (nbytes && (entry->expectedBufferReadSize <= 0)) {
- entry->msgSizeReadSize += nbytes;
- pubsub_tcpHandler_setReadStateMachine(handle, entry);
- }
- }
- if (entry->state == READ_STATE_META) {
- if (entry->header.header.metadataSize > entry->readMetaBufferSize) {
- if (entry->readMetaBuffer) {
- free(entry->readMetaBuffer);
- L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta
buffer: (%d, %d) \n", entry->fd,
- entry->url, entry->readMetaBufferSize,
entry->header.header.metadataSize);
+ entry->headerError = false;
+ if (entry->readHeaderBufferSize) {
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
+ }
+ if (entry->header.header.payloadPartSize) {
+ char* buffer = entry->buffer;
+ msg.msg_iov[msg.msg_iovlen].iov_base =
&buffer[entry->header.header.payloadOffset];
+ msg.msg_iov[msg.msg_iovlen].iov_len =
entry->header.header.payloadPartSize;
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
+ }
+ if (entry->header.header.metadataSize) {
+ if (entry->header.header.metadataSize >
entry->readMetaBufferSize) {
+ char *buffer = realloc(entry->readMetaBuffer, (size_t)
entry->header.header.metadataSize);
+ if (buffer) {
+ entry->readMetaBuffer = buffer;
+ entry->readMetaBufferSize =
entry->header.header.metadataSize;
+ L_WARN("[TCP Socket] socket: %d, url: %s, realloc
read meta buffer: (%d, %d) \n", entry->fd,
+ entry->url, entry->readMetaBufferSize,
entry->header.header.metadataSize);
+ }
+ }
+ msg.msg_iov[msg.msg_iovlen].iov_base = entry->readMetaBuffer;
+ msg.msg_iov[msg.msg_iovlen].iov_len =
entry->header.header.metadataSize;
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
+ }
+ if (entry->readFooterSize) {
+ msg.msg_iov[msg.msg_iovlen].iov_base = entry->readFooterBuffer;
+ msg.msg_iov[msg.msg_iovlen].iov_len = entry->readFooterSize;
+ msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
}
- entry->readMetaBufferSize = entry->header.header.metadataSize +
PADDING_BUFFER_SIZE;
- entry->readMetaBuffer = malloc((size_t)
entry->readMetaBufferSize);
- }
-
- // Read meta data from (queue
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd,
entry->readMetaBuffer,0);
- if (nbytes && (entry->expectedBufferReadSize <= 0)) {
- entry->msgSizeReadSize += nbytes;
- pubsub_tcpHandler_setReadStateMachine(handle, entry);
}
+ nbytes = recvmsg(fd, &msg, MSG_NOSIGNAL);
}
- if (entry->state == READ_STATE_FOOTER) {
- // Check for end of message using, footer of message. Because of
streaming protocol
- if (!entry->readFooterBuffer) entry->readFooterBuffer =
malloc(entry->readFooterSize);
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd,
entry->readFooterBuffer, 0);
- if (nbytes && (entry->expectedBufferReadSize <= 0)) {
+ if ((nbytes >= msgSize)&&(!entry->headerError)) {
+ bool valid = true;
+ if (entry->readFooterSize) {
if (handle->protocol->decodeFooter(handle->protocol->handle,
entry->readFooterBuffer,
entry->readFooterSize,
- &entry->header) ==
CELIX_SUCCESS) {
- // valid footer, this means that the message is valid
- pubsub_tcpHandler_setReadStateMachine(handle, entry);
- } else {
+ &entry->header) !=
CELIX_SUCCESS) {
+
// Did not receive correct footer
L_ERROR(
"[TCP Socket] Failed to decode message footer seq %d
(received corrupt message, transmit buffer full?) (fd: %d) (url: %s)",
entry->header.header.seqNr,
entry->fd,
entry->url);
- pubsub_tcpHandler_setNextStateReadStateMachine(handle, entry,
READ_STATE_HEADER);
+ valid = false;
}
}
- }
- if (entry->state == READ_STATE_READY) {
- // Complete message is received
- pubsub_tcpHandler_decodePayload(handle, entry);
- pubsub_tcpHandler_setReadStateMachine(handle, entry);
+ if (!entry->header.header.isLastSegment) {
+ // Not last Segment of message
+ valid = false;
+ }
+
+ if (valid) {
+ // Complete message is received
+ pubsub_tcpHandler_decodePayload(handle, entry);
+ }
}
if (nbytes > 0) {
@@ -1051,7 +1045,7 @@ int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle,
int fd) {
// Reads data from the filedescriptor which has date (determined by epoll())
and stores it in the internal structure
// If the message is completely reassembled true is returned and the index and
size have valid values
//
-int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
+int pubsub_tcpHandler_read_(pubsub_tcpHandler_t *handle, int fd) {
celixThreadRwlock_writeLock(&handle->dbLock);
psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map,
(void *) (intptr_t) fd);
if (entry == NULL)
@@ -1280,11 +1274,11 @@ int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t
*handle, psa_tcp_connectio
//
int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
pubsub_protocol_message_t *message, struct iovec *msgIoVec,
size_t msg_iov_len, int flags) {
- celixThreadRwlock_readLock(&handle->dbLock);
int result = 0;
int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];
int nofConnToClose = 0;
if (handle) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
hash_map_iterator_t iter =
hashMapIterator_construct(handle->connection_fd_map);
size_t max_msg_iov_len = IOV_MAX - 3; // header , footer, padding
while (hashMapIterator_hasNext(&iter)) {
@@ -1294,6 +1288,10 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
pubsub_protocol_message
size_t payloadSize = 0;
if (msg_iov_len == 1) {
handle->protocol->encodePayload(handle->protocol->handle,
message, &payloadData, &payloadSize);
+ //char* b = (char*)payloadData;
+ //for (int i = 0; i < payloadSize; i++) {
+ // b[i] = '0' + (i % 10);
+ //}
} else {
for (size_t i = 0; i < msg_iov_len; i++) {
payloadSize += msgIoVec[i].iov_len;
@@ -1371,7 +1369,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
pubsub_protocol_message
msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize
- msgPayloadSize), entry->maxMsgSize);
msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
message->header.payloadPartSize = msgPartSize;
- message->header.payloadOffset = msgPayloadSize;
+ message->header.payloadOffset = msgPayloadSize;
+ //printf("Pay[%d: MsgId: %d, Offset %d, %d (%d) url:
%s\n ", message->header.seqNr, message->header.msgId, (int)msgPayloadSize,
(int)msgPartSize, (int)message->header.payloadSize, entry->url);
msgPayloadSize += msg.msg_iov[msg.msg_iovlen].iov_len;
msgSize = msgPayloadSize;
} else {
@@ -1489,8 +1488,8 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
pubsub_protocol_message
}
}
}
+ celixThreadRwlock_unlock(&handle->dbLock);
}
- celixThreadRwlock_unlock(&handle->dbLock);
//Force close all connections that are queued in a list, done outside of
locking handle->dbLock to prevent deadlock
for (int i = 0; i < nofConnToClose; i++) {
pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);
@@ -1661,6 +1660,7 @@ void pubsub_tcpHandler_handler(pubsub_tcpHandler_t
*handle) {
L_ERROR("[TCP Socket] Cannot create epoll wait (%d) %s\n",
nof_events, strerror(errno));
}
for (int i = 0; i < nof_events; i++) {
+ size_t size = hashMap_size(handle->interface_fd_map);
hash_map_iterator_t iter =
hashMapIterator_construct(handle->interface_fd_map);
psa_tcp_connection_entry_t *pendingConnectionEntry = NULL;
while (hashMapIterator_hasNext(&iter)) {