This is an automated email from the ASF dual-hosted git repository. rbulter pushed a commit to branch feature/proposal_protocol_footer in repository https://gitbox.apache.org/repos/asf/celix.git
commit 9a94bd6ca9c245eea19e707dc87a582de71e9372 Author: Roy Bulter <[email protected]> AuthorDate: Mon Jun 8 20:16:33 2020 +0200 Add Footer --- .../pubsub_admin_tcp/src/pubsub_tcp_handler.c | 45 ++++++++++++----- .../src/ps_wire_protocol_activator.c | 3 ++ .../src/pubsub_wire_protocol_common.h | 1 + .../src/pubsub_wire_protocol_impl.c | 58 ++++++++++++++++++++-- .../src/pubsub_wire_protocol_impl.h | 3 ++ .../pubsub/pubsub_spi/include/pubsub_protocol.h | 35 +++++++++++++ libs/framework/src/celix_log.c | 1 + 7 files changed, 130 insertions(+), 16 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c index 91eb97a..3d647f8 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c @@ -82,6 +82,7 @@ typedef struct psa_tcp_connection_entry { unsigned int headerSize; unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload void *headerBuffer; + unsigned int footerSize; void *footerBuffer; unsigned int bufferSize; void *buffer; @@ -331,6 +332,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch entry->headerBufferSize = size; handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size); entry->syncSize = size; + handle->protocol->getFooterSize(handle->protocol->handle, &size); + entry->footerSize = size; entry->bufferSize = handle->bufferSize; entry->connected = false; entry->msg.msg_iov = calloc(sizeof(struct iovec), IOV_MAX); @@ -340,8 +343,8 @@ pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, ch entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->headerSize; entry->msg_iovlen++; } - entry->footerBuffer = calloc(sizeof(char), entry->headerSize); - entry->buffer = calloc(sizeof(char), entry->bufferSize); + if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize); + if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize); entry->msg.msg_iov[entry->msg.msg_iovlen].iov_base = entry->buffer; entry->msg.msg_iov[entry->msg.msg_iovlen].iov_len = entry->bufferSize; entry->msg_iovlen++; @@ -825,7 +828,7 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { // Message buffer is to small, reallocate to make it bigger if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) { - handle->bufferSize = MAX(handle->bufferSize, entry->headerSize); + handle->bufferSize = MAX(handle->bufferSize, entry->headerSize ); if (entry->buffer) free(entry->buffer); entry->buffer = malloc((size_t) handle->bufferSize); entry->bufferSize = handle->bufferSize; @@ -899,20 +902,19 @@ int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) { L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size); } } - // Check for end of message using, header of next message. Because of streaming protocol - // TODO: Add to protocol service to decode/EncodeFooter with unique sync word(different then header) + // Check for end of message using, footer of message. Because of streaming protocol if (nbytes > 0) { - pubsub_protocol_message_t header; - nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->headerSize, MSG_PEEK); - if (handle->protocol->decodeHeader(handle->protocol->handle, + validMsg = true; + nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer, 0, entry->footerSize, 0); + if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, - entry->headerSize, - &header) == CELIX_SUCCESS) { - // valid header for next buffer, this means that the message is valid + entry->footerSize, + &entry->header) == CELIX_SUCCESS) { + // valid footer, this means that the message is valid validMsg = true; } else { // Did not receive correct header - L_ERROR("[TCP Socket] Failed to decode next message header seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url); + L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url); entry->bufferReadSize = 0; } } @@ -1052,6 +1054,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message } message->header.metadataSize = metadataSize; + void *footerData = NULL; + size_t footerDataSize = 0; + if (entry->footerSize) { + handle->protocol->encodeFooter(handle->protocol->handle, message, + &footerData, + &footerDataSize); + } + size_t msgSize = 0; struct msghdr msg; struct iovec msg_iov[IOV_MAX]; @@ -1085,6 +1095,14 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; } + // Write optional footerData in vector buffer + if (footerData && footerDataSize) { + msg.msg_iovlen++; + msg.msg_iov[msg.msg_iovlen].iov_base = footerData; + msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize; + msgSize += msg.msg_iov[msg.msg_iovlen].iov_len; + } + void *headerData = NULL; size_t headerSize = 0; // check if header is not part of the payload (=> headerBufferSize = 0)s @@ -1142,6 +1160,9 @@ int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message if (metadataData) { free(metadataData); } + if (footerData) { + free(footerData); + } entry->seqNr++; } } diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c index 359ea8f..78ae2a2 100644 --- a/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c +++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/ps_wire_protocol_activator.c @@ -43,15 +43,18 @@ static int ps_wp_start(ps_wp_activator_t *act, celix_bundle_context_t *ctx) { act->protocolSvc.getHeaderBufferSize = pubsubProtocol_getHeaderBufferSize; act->protocolSvc.getSyncHeaderSize = pubsubProtocol_getSyncHeaderSize; act->protocolSvc.getSyncHeader = pubsubProtocol_getSyncHeader; + act->protocolSvc.getFooterSize = pubsubProtocol_getFooterSize; act->protocolSvc.isMessageSegmentationSupported = pubsubProtocol_isMessageSegmentationSupported; act->protocolSvc.encodeHeader = pubsubProtocol_encodeHeader; act->protocolSvc.encodePayload = pubsubProtocol_encodePayload; act->protocolSvc.encodeMetadata = pubsubProtocol_encodeMetadata; + act->protocolSvc.encodeFooter = pubsubProtocol_encodeFooter; act->protocolSvc.decodeHeader = pubsubProtocol_decodeHeader; act->protocolSvc.decodePayload = pubsubProtocol_decodePayload; act->protocolSvc.decodeMetadata = pubsubProtocol_decodeMetadata; + act->protocolSvc.decodeFooter = pubsubProtocol_decodeFooter; act->wireProtocolSvcId = celix_bundleContext_registerService(ctx, &act->protocolSvc, PUBSUB_PROTOCOL_SERVICE_NAME, props); } diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h index f5befd2..ec946a8 100644 --- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h +++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_common.h @@ -27,6 +27,7 @@ extern "C" { #endif static const unsigned int PROTOCOL_WIRE_SYNC = 0xABBABAAB; +static const unsigned int PROTOCOL_WIRE_SYNC_FOOTER = 0xBAABABBA; static const unsigned int PROTOCOL_WIRE_ENVELOPE_VERSION = 1; int readShort(const unsigned char *data, int offset, uint16_t *val); diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c index 5e265d9..97d7fdd 100644 --- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c +++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c @@ -59,15 +59,12 @@ celix_status_t pubsubProtocol_create(pubsub_protocol_wire_v1_t **protocol) { celix_status_t pubsubProtocol_destroy(pubsub_protocol_wire_v1_t* protocol) { celix_status_t status = CELIX_SUCCESS; - free(protocol); - return status; } celix_status_t pubsubProtocol_getHeaderSize(void* handle, size_t *length) { *length = sizeof(int) * 5 + sizeof(short) * 2; // header + sync + version = 24 - return CELIX_SUCCESS; } @@ -85,6 +82,11 @@ celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader) { return CELIX_SUCCESS; } +celix_status_t pubsubProtocol_getFooterSize(void* handle, size_t *length) { + *length = sizeof(int); + return CELIX_SUCCESS; +} + celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported) { *isSupported = false; return CELIX_SUCCESS; @@ -95,6 +97,10 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message size_t headerSize = 0; pubsubProtocol_getHeaderSize(handle, &headerSize); + // Get HeaderSize + size_t footerSize = 0; + pubsubProtocol_getFooterSize(handle, &footerSize); + if (*outBuffer == NULL) { *outBuffer = calloc(1, headerSize); *outLength = headerSize; @@ -110,7 +116,7 @@ celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message idx = writeShort(*outBuffer, idx, message->header.msgMinorVersion); idx = writeInt(*outBuffer, idx, message->header.payloadSize); idx = writeInt(*outBuffer, idx, message->header.metadataSize); - + idx = writeInt(*outBuffer, idx, footerSize); *outLength = idx; } @@ -175,6 +181,27 @@ celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_messa return status; } +celix_status_t pubsubProtocol_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength) { + celix_status_t status = CELIX_SUCCESS; + // Get HeaderSize + size_t footerSize = 0; + pubsubProtocol_getFooterSize(handle, &footerSize); + + if (*outBuffer == NULL) { + *outBuffer = calloc(1, footerSize); + *outLength = footerSize; + } + if (*outBuffer == NULL) { + status = CELIX_ENOMEM; + } else { + int idx = 0; + idx = writeInt(*outBuffer, idx, PROTOCOL_WIRE_SYNC_FOOTER); + *outLength = idx; + } + + return status; +} + celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) { celix_status_t status = CELIX_SUCCESS; @@ -253,6 +280,29 @@ celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t le return status; } +celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message) { + celix_status_t status = CELIX_SUCCESS; + + int idx = 0; + size_t footerSize = 0; + pubsubProtocol_getFooterSize(handle, &footerSize); + if (length == footerSize) { + unsigned int footerSync; + unsigned int footerSyncValue = PROTOCOL_WIRE_SYNC_FOOTER; + unsigned int headerSyncValue = PROTOCOL_WIRE_SYNC; + idx = readInt(data, idx, &footerSync); + if (footerSync != footerSyncValue) { + status = CELIX_ILLEGAL_ARGUMENT; + } + if (footerSync != headerSyncValue) { + status = CELIX_ILLEGAL_ARGUMENT; + } + } else { + status = CELIX_ILLEGAL_ARGUMENT; + } + return status; +} + static celix_status_t pubsubProtocol_createNetstring(const char* string, char** netstringOut) { celix_status_t status = CELIX_SUCCESS; diff --git a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h index 06c4dcd..1693d09 100644 --- a/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h +++ b/bundles/pubsub/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.h @@ -37,15 +37,18 @@ celix_status_t pubsubProtocol_getHeaderSize(void *handle, size_t *length); celix_status_t pubsubProtocol_getHeaderBufferSize(void *handle, size_t *length); celix_status_t pubsubProtocol_getSyncHeaderSize(void *handle, size_t *length); celix_status_t pubsubProtocol_getSyncHeader(void* handle, void *syncHeader); +celix_status_t pubsubProtocol_getFooterSize(void* handle, size_t *length); celix_status_t pubsubProtocol_isMessageSegmentationSupported(void* handle, bool *isSupported); celix_status_t pubsubProtocol_encodeHeader(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength); celix_status_t pubsubProtocol_encodePayload(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength); celix_status_t pubsubProtocol_encodeMetadata(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength); +celix_status_t pubsubProtocol_encodeFooter(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength); celix_status_t pubsubProtocol_decodeHeader(void* handle, void *data, size_t length, pubsub_protocol_message_t *message); celix_status_t pubsubProtocol_decodePayload(void* handle, void *data, size_t length, pubsub_protocol_message_t *message); celix_status_t pubsubProtocol_decodeMetadata(void* handle, void *data, size_t length, pubsub_protocol_message_t *message); +celix_status_t pubsubProtocol_decodeFooter(void* handle, void *data, size_t length, pubsub_protocol_message_t *message); #ifdef __cplusplus } diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h index 2ed8f81..17ca6dd 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_protocol.h @@ -112,6 +112,18 @@ typedef struct pubsub_protocol_service { * @return status code indicating failure or success */ celix_status_t (*getSyncHeader)(void *handle, void *sync); + + /** + * Returns the size of the footer. + * Is used by the receiver to configure the expected size of the footer. + * The receiver reads the footer to know if the complete message including paylaod is received. + * + * @param handle handle for service + * @param length output param for footer size + * @return status code indicating failure or success + */ + celix_status_t (*getFooterSize)(void *handle, size_t *length); + /** * Returns the if the protocol service supports the message segmentation attributes that is used by the underlying protocol. * @@ -157,6 +169,18 @@ typedef struct pubsub_protocol_service { celix_status_t (*encodeMetadata)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength); /** + * Encodes the footer + * + * @param handle handle for service + * @param message message to use footer from + * @param outBuffer byte array containing the encoded footer + * @param outLength length of the byte array + * @return status code indicating failure or success + */ + celix_status_t (*encodeFooter)(void *handle, pubsub_protocol_message_t *message, void **outBuffer, size_t *outLength); + + + /** * Decodes the given data into message.header. * * @param handle handle for service @@ -190,6 +214,17 @@ typedef struct pubsub_protocol_service { * @return status code indicating failure or success */ celix_status_t (*decodeMetadata)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message); + + /** + * Decodes the given data into message.header. + * + * @param handle handle for service + * @param data incoming byte array to decode + * @param length length of the byte array + * @param message pointer to message to be filled in with decoded footer + * @return status code indicating failure or success + */ + celix_status_t (*decodeFooter)(void* handle, void *data, size_t length, pubsub_protocol_message_t *message); } pubsub_protocol_service_t; #endif /* PUBSUB_PROTOCOL_SERVICE_H_ */ diff --git a/libs/framework/src/celix_log.c b/libs/framework/src/celix_log.c index 587189b..b2c074e 100644 --- a/libs/framework/src/celix_log.c +++ b/libs/framework/src/celix_log.c @@ -26,6 +26,7 @@ #include "celix_log.h" #include "celix_threads.h" #include "celix_array_list.h" +#include "memstream/open_memstream.h" #define LOG_NAME "celix_framework"
