This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/use_ser_hander_in_psa in repository https://gitbox.apache.org/repos/asf/celix.git
commit fea21167fcfc4c351f9d35a1bf72898bfbee89d5 Author: Pepijn Noltes <[email protected]> AuthorDate: Wed May 26 20:03:57 2021 +0200 Adds serializer handler to pubsub websocket. --- .../v2/src/pubsub_tcp_topic_receiver.c | 4 + .../v2/src/pubsub_tcp_topic_sender.c | 80 ++------ .../pubsub_admin_websocket/v2/src/psa_activator.c | 11 - .../v2/src/pubsub_websocket_admin.c | 227 +++++---------------- .../v2/src/pubsub_websocket_topic_receiver.c | 52 +++-- .../v2/src/pubsub_websocket_topic_receiver.h | 5 +- .../v2/src/pubsub_websocket_topic_sender.c | 102 +++------ .../v2/src/pubsub_websocket_topic_sender.h | 3 +- .../v2/src/pubsub_zmq_topic_receiver.c | 4 + .../v2/src/pubsub_zmq_topic_sender.c | 1 + .../pubsub_utils/src/pubsub_serializer_handler.c | 17 +- 11 files changed, 144 insertions(+), 362 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c index b02768a..f602be9 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c @@ -459,6 +459,10 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs //NOTE receiver->subscribers.mutex locked const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId); + if (msgFqn == NULL) { + L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId); + return; + } void *deSerializedMsg = NULL; bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion); diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c index 11d73d9..47ef05a 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c @@ -67,6 +67,7 @@ struct pubsub_tcp_topic_sender { bool isPassive; bool verbose; unsigned long send_delay; + int seqNr; //atomic struct { long svcId; @@ -79,23 +80,10 @@ struct pubsub_tcp_topic_sender { } boundedServices; }; -typedef struct psa_tcp_send_msg_entry { - uint32_t type; //msg type id (hash of fqn) - const char *fqn; - uint8_t major; - uint8_t minor; - unsigned char originUUID[16]; - pubsub_protocol_service_t *protSer; - struct iovec *serializedIoVecOutput; - size_t serializedIoVecOutputLen; - unsigned int seqNr; -} psa_tcp_send_msg_entry_t; - typedef struct psa_tcp_bounded_service_entry { pubsub_tcp_topic_sender_t *parent; pubsub_publisher_t service; long bndId; - hash_map_t *msgEntries; //key = msg type id, value = psa_tcp_send_msg_entry_t int getCount; } psa_tcp_bounded_service_entry_t; @@ -272,18 +260,7 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) { hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map); while (hashMapIterator_hasNext(&iter)) { psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries); - while (hashMapIterator_hasNext(&iter2)) { - psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2); - if (msgEntry->serializedIoVecOutput) - free(msgEntry->serializedIoVecOutput); - msgEntry->serializedIoVecOutput = NULL; - free(msgEntry); - } - hashMap_destroy(entry->msgEntries, false, false); - free(entry); - } + free(entry); } hashMap_destroy(sender->boundedServices.map, false, false); celixThreadMutex_unlock(&sender->boundedServices.mutex); @@ -349,7 +326,6 @@ static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *req entry->getCount = 1; entry->parent = sender; entry->bndId = bndId; - entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL); entry->service.handle = entry; entry->service.localMsgTypeIdForMsgType = psa_tcp_localMsgTypeIdForMsgType; entry->service.send = psa_tcp_topicPublicationSend; @@ -373,16 +349,6 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re if (entry != NULL && entry->getCount == 0) { //free entry hashMap_remove(sender->boundedServices.map, (void *) bndId); - - hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries); - while (hashMapIterator_hasNext(&iter)) { - psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter); - if (msgEntry->serializedIoVecOutput) - free(msgEntry->serializedIoVecOutput); - msgEntry->serializedIoVecOutput = NULL; - free(msgEntry); - } - hashMap_destroy(entry->msgEntries, false, false); free(entry); } celixThreadMutex_unlock(&sender->boundedServices.mutex); @@ -391,27 +357,17 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re static int psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) { - int status = CELIX_SUCCESS; psa_tcp_bounded_service_entry_t *bound = handle; pubsub_tcp_topic_sender_t *sender = bound->parent; - - - psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId)); - - if (entry == NULL) { - const char* fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId); - if (fqn != NULL) { - entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t)); - entry->protSer = sender->protocol; - entry->type = msgTypeId; - entry->fqn = fqn; - entry->major = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId); - entry->minor = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId); - uuid_copy(entry->originUUID, sender->fwUUID); - hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry); - } else { - L_WARN("Cannot find message serialization for msg id %i", (int)msgTypeId); - } + const char* msgFqn; + int majorVersion; + int minorversion; + celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorversion); + + if (status != CELIX_SUCCESS) { + L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId, + pubsub_serializerHandler_getSerializationType(sender->serializerHandler)); + return status; } delay_first_send_for_late_joiners(sender); @@ -419,11 +375,10 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen; struct iovec *serializedIoVecOutput = NULL; status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen); - entry->serializedIoVecOutputLen = MAX(serializedIoVecOutputLen, entry->serializedIoVecOutputLen); bool cont = false; if (status == CELIX_SUCCESS) /*ser ok*/ { - cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, &metadata); + cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata); } if (cont) { pubsub_protocol_message_t message; @@ -435,9 +390,9 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i message.payload.length = serializedIoVecOutput->iov_len; } message.header.msgId = msgTypeId; - message.header.seqNr = entry->seqNr; - message.header.msgMajorVersion = entry->major; - message.header.msgMinorVersion = entry->minor; + message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED); + message.header.msgMajorVersion = (uint16_t)majorVersion; + message.header.msgMinorVersion = (uint16_t)minorversion; message.header.payloadSize = 0; message.header.payloadPartSize = 0; message.header.payloadOffset = 0; @@ -445,7 +400,6 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i if (metadata != NULL) { message.metadata.metadata = metadata; } - entry->seqNr++; bool sendOk = true; { int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0); @@ -453,7 +407,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i status = -1; sendOk = false; } - pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, metadata); + pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata); if (message.metadata.metadata) { celix_properties_destroy(message.metadata.metadata); } @@ -467,7 +421,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno)); } } else { - L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", entry->fqn, + L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic); } diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c index 159d8ed..33cc86f 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c +++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c @@ -53,17 +53,6 @@ int psa_websocket_start(psa_websocket_activator_t *act, celix_bundle_context_t * act->admin = pubsub_websocketAdmin_create(ctx, act->logHelper); celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION; - //track serializers (only json) - if (status == CELIX_SUCCESS) { - celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; - opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME; - opts.filter.ignoreServiceLanguage = true; - opts.callbackHandle = act->admin; - opts.addWithProperties = pubsub_websocketAdmin_addSerializerSvc; - opts.removeWithProperties = pubsub_websocketAdmin_removeSerializerSvc; - act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); - } - //register pubsub admin service if (status == CELIX_SUCCESS) { pubsub_admin_service_t *psaSvc = &act->adminService; diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c index 8950fda..bf74d38 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c +++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c @@ -18,18 +18,18 @@ */ #include <memory.h> -#include <pubsub_endpoint.h> -#include <pubsub_serializer.h> #include <ip_utils.h> -#include <pubsub_message_serialization_service.h> -#include <pubsub_matching.h> +#include "pubsub_endpoint.h" +#include "pubsub_serializer.h" +#include "pubsub_matching.h" #include "pubsub_utils.h" #include "pubsub_websocket_admin.h" #include "pubsub_psa_websocket_constants.h" #include "pubsub_websocket_topic_sender.h" #include "pubsub_websocket_topic_receiver.h" #include "pubsub_websocket_common.h" +#include "pubsub_serializer_handler.h" #define L_DEBUG(...) \ celix_logHelper_log(psa->log, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__) @@ -52,11 +52,6 @@ struct pubsub_websocket_admin { bool verbose; struct { - celix_thread_rwlock_t mutex; - hash_map_t *map; //key = svcId, value = psa_websocket_serializer_entry_t* - } serializers; - - struct { celix_thread_mutex_t mutex; hash_map_t *map; //key = scope:topic key, value = pubsub_websocket_topic_sender_t* } topicSenders; @@ -71,6 +66,10 @@ struct pubsub_websocket_admin { hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint) } discoveredEndpoints; + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*. + } serializationHandlers; }; static celix_status_t pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint); @@ -93,9 +92,6 @@ pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *c psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE); psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE); - celixThreadRwlock_create(&psa->serializers.mutex, NULL); - psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - celixThreadMutex_create(&psa->topicSenders.mutex, NULL); psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); @@ -105,6 +101,9 @@ pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *c celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL); psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL); + psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL); + return psa; } @@ -138,13 +137,13 @@ void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) { } celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex); - celixThreadRwlock_writeLock(&psa->serializers.mutex); - iter = hashMapIterator_construct(psa->serializers.map); + celixThreadMutex_lock(&psa->serializationHandlers.mutex); + iter = hashMapIterator_construct(psa->serializationHandlers.map); while (hashMapIterator_hasNext(&iter)) { - psa_websocket_serializer_entry_t *entry = hashMapIterator_nextValue(&iter); - free(entry); + pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter); + pubsub_serializerHandler_destroy(entry); } - celixThreadRwlock_unlock(&psa->serializers.mutex); + celixThreadMutex_unlock(&psa->serializationHandlers.mutex); celixThreadMutex_destroy(&psa->topicSenders.mutex); hashMap_destroy(psa->topicSenders.map, true, false); @@ -155,112 +154,12 @@ void pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) { celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex); hashMap_destroy(psa->discoveredEndpoints.map, false, false); - celixThreadRwlock_destroy(&psa->serializers.mutex); - hashMap_destroy(psa->serializers.map, false, false); + celixThreadMutex_destroy(&psa->serializationHandlers.mutex); + hashMap_destroy(psa->serializationHandlers.map, false, false); free(psa); } -void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) { - pubsub_websocket_admin_t *psa = handle; - - const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL); - long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L); - const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL); - const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0"); - - if (serType == NULL || msgId == -1L || msgFqn == NULL) { - L_INFO("[PSA_WEBSOCKET_V2] Ignoring serializer service without one of the following properties: %s or %s or %s", - PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY); - - L_INFO("[PSA_WEBSOCKET_V2] Ignored serializer type %s msgId %li fqn %s", serType, msgId, msgFqn); - return; - } - L_INFO("[PSA_WEBSOCKET_V2] Adding serializer type %s msgId %li fqn %s", serType, msgId, msgFqn); - - celixThreadRwlock_writeLock(&psa->serializers.mutex); - hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType); - if(typeEntries == NULL) { - typeEntries = hashMap_create(NULL, NULL, NULL, NULL); - hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries); - L_INFO("[PSA_WEBSOCKET_V2] typeEntries added %p %s", psa->serializers.map, serType); - } - psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId); - if (entry == NULL) { - entry = calloc(1, sizeof(psa_websocket_serializer_entry_t)); - entry->svc = svc; - entry->fqn = celix_utils_strdup(msgFqn); - entry->version = celix_utils_strdup(msgVersion); - hashMap_put(typeEntries, (void*)msgId, entry); - L_INFO("[PSA_WEBSOCKET_V2] entry added"); - } - celixThreadRwlock_unlock(&psa->serializers.mutex); -} - -void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) { - pubsub_websocket_admin_t *psa = handle; - const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL); - long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L); - - if(serType == NULL || msgId == -1) { - L_ERROR("[PSA_WEBSOCKET_V2] Error removing serializer svc %s %i", serType, msgId); - return; - } - - //remove serializer - // 1) First find entry and - // 2) loop and destroy all topic sender using the serializer and - // 3) loop and destroy all topic receivers using the serializer - // Note that it is the responsibility of the topology manager to create new topic senders/receivers - - celixThreadRwlock_writeLock(&psa->serializers.mutex); - hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType); - if(typeEntries != NULL) { - psa_websocket_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId); - free((void*)entry->fqn); - free((void*)entry->version); - free(entry); - - // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type. - if(hashMap_size(typeEntries) == 0) { - hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false); - celixThreadRwlock_unlock(&psa->serializers.mutex); - - celixThreadMutex_lock(&psa->topicSenders.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter); - pubsub_websocket_topic_sender_t *sender = hashMapEntry_getValue(senderEntry); - if (sender != NULL && strncmp(serType, pubsub_websocketTopicSender_serializerType(sender), 1024 * 1024) == 0) { - char *key = hashMapEntry_getKey(senderEntry); - hashMapIterator_remove(&iter); - pubsub_websocketTopicSender_destroy(sender); - free(key); - } - } - celixThreadMutex_unlock(&psa->topicSenders.mutex); - - celixThreadMutex_lock(&psa->topicReceivers.mutex); - iter = hashMapIterator_construct(psa->topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter); - pubsub_websocket_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry); - if (receiver != NULL && strncmp(serType, pubsub_websocketTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) { - char *key = hashMapEntry_getKey(senderEntry); - hashMapIterator_remove(&iter); - pubsub_websocketTopicReceiver_destroy(receiver); - free(key); - } - } - celixThreadMutex_unlock(&psa->topicReceivers.mutex); - } else { - celixThreadRwlock_unlock(&psa->serializers.mutex); - } - } else { - celixThreadRwlock_unlock(&psa->serializers.mutex); - } -} - celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) { pubsub_websocket_admin_t *psa = handle; L_DEBUG("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_matchPublisher"); @@ -297,14 +196,36 @@ celix_status_t pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const return status; } +static pubsub_serializer_handler_t* pubsub_websocketAdmin_getSerializationHandler(pubsub_websocket_admin_t* psa, long msgSerializationMarkerSvcId) { + pubsub_serializer_handler_t* handler = NULL; + celixThreadMutex_lock(&psa->serializationHandlers.mutex); + handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId); + if (handler == NULL) { + handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log); + if (handler != NULL) { + hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler); + } + } + celixThreadMutex_unlock(&psa->serializationHandlers.mutex); + return handler; +} + + celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) { pubsub_websocket_admin_t *psa = handle; celix_status_t status = CELIX_SUCCESS; - //1) Create TopicSender - //2) Store TopicSender - //3) Connect existing endpoints - //4) set outPublisherEndpoint + //1) Get serialization handler + //2) Create TopicSender + //3) Store TopicSender + //4) Connect existing endpoints + //5) set outPublisherEndpoint + + pubsub_serializer_handler_t* handler = pubsub_websocketAdmin_getSerializationHandler(psa, serializerSvcId); + if (handler == NULL) { + L_ERROR("Cannot create topic sender without serialization handler"); + return CELIX_ILLEGAL_STATE; + } celix_properties_t *newEndpoint = NULL; @@ -324,7 +245,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char * celixThreadMutex_lock(&psa->topicSenders.mutex); pubsub_websocket_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key); if (sender == NULL) { - sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, topic, serType, psa); + sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, topic, handler, psa); if (sender != NULL) { const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE; newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, @@ -382,9 +303,14 @@ celix_status_t pubsub_websocketAdmin_teardownTopicSender(void *handle, const cha celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) { pubsub_websocket_admin_t *psa = handle; - celix_properties_t *newEndpoint = NULL; + pubsub_serializer_handler_t* handler = pubsub_websocketAdmin_getSerializationHandler(psa, serializerSvcId); + if (handler == NULL) { + L_ERROR("Cannot create topic receiver without serialization handler"); + return CELIX_ILLEGAL_STATE; + } + //get serializer type const char *serType = NULL; celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS; @@ -400,7 +326,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char celixThreadMutex_lock(&psa->topicReceivers.mutex); pubsub_websocket_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key); if (receiver == NULL) { - receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serType, psa); + receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, handler, psa); if (receiver != NULL) { const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE; newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, @@ -578,57 +504,11 @@ celix_status_t pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, cons return status; } -psa_websocket_serializer_entry_t* pubsub_websocketAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) { - pubsub_websocket_admin_t *psa = handle; - psa_websocket_serializer_entry_t *serializer = NULL; - - celixThreadRwlock_readLock(&psa->serializers.mutex); - hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType); - if(typeEntries != NULL) { - serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId); - } - - return serializer; -} - -void pubsub_websocketAdmin_releaseSerializer(void *handle, psa_websocket_serializer_entry_t* serializer) { - pubsub_websocket_admin_t *psa = handle; - celixThreadRwlock_unlock(&psa->serializers.mutex); -} - -int64_t pubsub_websocketAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) { - pubsub_websocket_admin_t *psa = handle; - int64_t id = -1L; - - celixThreadRwlock_readLock(&psa->serializers.mutex); - hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType); - if(typeEntries != NULL) { - hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries); - while(hashMapIterator_hasNext(&iterator)) { - void *key = hashMapIterator_nextKey(&iterator); - psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, key); - L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, fqn); - if(strncmp(fqn, entry->fqn, 1024*1024) == 0) { - id = (uint32_t)(uintptr_t)key; - break; - } - } - } else { - L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", serializationType, fqn); - } - celixThreadRwlock_unlock(&psa->serializers.mutex); - - L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn %p %s %s = %i", psa->serializers.map, serializationType, fqn, id); - - return id; -} - bool pubsub_websocketAdmin_executeCommand(void *handle, const char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) { pubsub_websocket_admin_t *psa = handle; fprintf(out, "\n"); fprintf(out, "Topic Senders:\n"); - celixThreadRwlock_writeLock(&psa->serializers.mutex); celixThreadMutex_lock(&psa->topicSenders.mutex); hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map); while (hashMapIterator_hasNext(&iter)) { @@ -642,11 +522,9 @@ bool pubsub_websocketAdmin_executeCommand(void *handle, const char *commandLine fprintf(out, " |- url = %s\n", url); } celixThreadMutex_unlock(&psa->topicSenders.mutex); - celixThreadRwlock_unlock(&psa->serializers.mutex); fprintf(out, "\n"); fprintf(out, "\nTopic Receivers:\n"); - celixThreadRwlock_writeLock(&psa->serializers.mutex); celixThreadMutex_lock(&psa->topicReceivers.mutex); iter = hashMapIterator_construct(psa->topicReceivers.map); while (hashMapIterator_hasNext(&iter)) { @@ -677,7 +555,6 @@ bool pubsub_websocketAdmin_executeCommand(void *handle, const char *commandLine celix_arrayList_destroy(unconnected); } celixThreadMutex_unlock(&psa->topicReceivers.mutex); - celixThreadRwlock_unlock(&psa->serializers.mutex); fprintf(out, "\n"); return true; diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c index c93f078..ea997e9 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c @@ -68,10 +68,11 @@ struct pubsub_websocket_topic_receiver { void *admin; char *scope; char *topic; - char *serType; char scopeAndTopicFilter[5]; char *uri; + pubsub_serializer_handler_t* serializerHandler; + celix_websocket_service_t sockSvc; long svcId; @@ -131,12 +132,12 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu const char *scope, const char *topic, const celix_properties_t *topicProperties, - const char *serType, + pubsub_serializer_handler_t* serializerHandler, void *admin) { pubsub_websocket_topic_receiver_t *receiver = calloc(1, sizeof(*receiver)); receiver->ctx = ctx; receiver->logHelper = logHelper; - receiver->serType = celix_utils_strdup(serType); + receiver->serializerHandler = serializerHandler; receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024); receiver->topic = strndup(topic, 1024 * 1024); receiver->admin = admin; @@ -309,7 +310,6 @@ void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re free(receiver->uri); free(receiver->scope); free(receiver->topic); - free(receiver->serType); } free(receiver); } @@ -325,7 +325,7 @@ const char* pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t } const char *pubsub_websocketTopicReceiver_serializerType(pubsub_websocket_topic_receiver_t *receiver) { - return receiver->serType; + return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler); } void pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) { @@ -451,58 +451,52 @@ static void pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr, const char* payload, size_t payloadSize) { //NOTE receiver->subscribers.mutex locked - int64_t msgTypeId = pubsub_websocketAdmin_getMessageIdForMessageFqn(receiver->admin, receiver->serType, hdr->id); - - if(msgTypeId < 0) { - L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); - return; - } - - psa_websocket_serializer_entry_t *serializer = pubsub_websocketAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serType, msgTypeId); + uint32_t msgId = pubsub_serializerHandler_getMsgId(receiver->serializerHandler, hdr->id); - if(serializer == NULL) { - pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer); - L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); + if (msgId == 0) { + L_WARN("Cannot find msg id for msg fqn %s", hdr->id); return; } void *deSerializedMsg = NULL; - - celix_version_t* version = celix_version_createVersionFromString(serializer->version); - bool validVersion = psa_websocket_checkVersion(version, hdr); - celix_version_destroy(version); + bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, msgId, hdr->major, hdr->minor); if (validVersion) { struct iovec deSerializeBuffer; deSerializeBuffer.iov_base = (void *)payload; deSerializeBuffer.iov_len = payloadSize; - celix_status_t status = serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, &deSerializedMsg); - + celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg); if (status == CELIX_SUCCESS) { hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices); bool release = true; while (hashMapIterator_hasNext(&iter)) { pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); - svc->receive(svc->handle, serializer->fqn, msgTypeId, deSerializedMsg, NULL, &release); + svc->receive(svc->handle, hdr->id, msgId, deSerializedMsg, NULL, &release); if (!release && hashMapIterator_hasNext(&iter)) { //receive function has taken ownership and still more receive function to come .. //deserialize again for new message - status = serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, &deSerializedMsg); + status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg); if (status != CELIX_SUCCESS) { - L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); + L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); break; } release = true; } } if (release) { - serializer->svc->freeDeserializedMsg(serializer->svc->handle, deSerializedMsg); + pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deSerializedMsg); } } else { - L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); + L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); } + } else { + L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x", + hdr->id, + pubsub_serializerHandler_getSerializationType(receiver->serializerHandler), + (int)hdr->major, + (int)hdr->minor, + pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, msgId), + pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, msgId)); } - - pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer); } static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const char *msg, size_t msgSize) { diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h index 55d5255..f5edda5 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h +++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h @@ -20,8 +20,9 @@ #ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H #define CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H -#include <pubsub_admin_metrics.h> +#include "pubsub_admin_metrics.h" #include "celix_bundle_context.h" +#include "pubsub_serializer_handler.h" typedef struct pubsub_websocket_topic_receiver pubsub_websocket_topic_receiver_t; @@ -30,7 +31,7 @@ pubsub_websocket_topic_receiver_t* pubsub_websocketTopicReceiver_create(celix_bu const char *scope, const char *topic, const celix_properties_t *topicProperties, - const char *serType, + pubsub_serializer_handler_t* serializerHandler, void *admin); void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *receiver); diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c index 98a1ad7..28a8af1 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c @@ -52,10 +52,13 @@ struct pubsub_websocket_topic_sender { void *admin; char *scope; char *topic; - char *serType; char scopeAndTopicFilter[5]; char *uri; + pubsub_serializer_handler_t* serializerHandler; + + int seqNr; //atomic + celix_websocket_service_t websockSvc; long websockSvcId; struct mg_connection *sockConnection; @@ -71,16 +74,10 @@ struct pubsub_websocket_topic_sender { } boundedServices; }; -typedef struct psa_websocket_send_msg_entry { - pubsub_websocket_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send) - uint32_t type; //msg type id (hash of fqn) -} psa_websocket_send_msg_entry_t; - typedef struct psa_websocket_bounded_service_entry { pubsub_websocket_topic_sender_t *parent; pubsub_publisher_t service; long bndId; - hash_map_t *msgEntries; //key = msg type id, value = psa_websocket_send_msg_entry_t int getCount; } psa_websocket_bounded_service_entry_t; @@ -99,18 +96,12 @@ pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create( celix_log_helper_t *logHelper, const char *scope, const char *topic, - const char *serType, + pubsub_serializer_handler_t* serializerHandler, void *admin) { pubsub_websocket_topic_sender_t *sender = calloc(1, sizeof(*sender)); sender->ctx = ctx; sender->logHelper = logHelper; - sender->serType = celix_utils_strdup(serType); - - if(sender->serType == NULL) { - L_ERROR("[PSA_WEBSOCKET_V2_TS] Error getting serType"); - free(sender); - return NULL; - } + sender->serializerHandler = serializerHandler; psa_websocket_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter); sender->uri = psa_websocket_createURI(scope, topic); @@ -174,17 +165,7 @@ void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map); while (hashMapIterator_hasNext(&iter)) { psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter); - if (entry != NULL) { - hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries); - while (hashMapIterator_hasNext(&iter2)) { - psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2); - free(msgEntry); - - } - hashMap_destroy(entry->msgEntries, false, false); - - free(entry); - } + free(entry); } hashMap_destroy(sender->boundedServices.map, false, false); celixThreadMutex_unlock(&sender->boundedServices.mutex); @@ -198,7 +179,6 @@ void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender } free(sender->topic); free(sender->uri); - free(sender->serType); free(sender); } } @@ -216,16 +196,17 @@ const char* pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sen } const char* pubsub_websocketTopicSender_serializerType(pubsub_websocket_topic_sender_t *sender) { - return sender->serType; + return pubsub_serializerHandler_getSerializationType(sender->serializerHandler); } static int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) { psa_websocket_bounded_service_entry_t *entry = (psa_websocket_bounded_service_entry_t *) handle; - int64_t rc = pubsub_websocketAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serType, msgType); - if(rc >= 0) { - *msgTypeId = (unsigned int)rc; + uint32_t msgId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType); + if (msgId != 0) { + *msgTypeId = msgId; + return 0; } - return 0; + return -1; } static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) { @@ -241,7 +222,6 @@ static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_ entry->getCount = 1; entry->parent = sender; entry->bndId = bndId; - entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL); entry->service.handle = entry; entry->service.localMsgTypeIdForMsgType = psa_websocket_localMsgTypeIdForMsgType; entry->service.send = psa_websocket_topicPublicationSend; @@ -264,60 +244,40 @@ static void psa_websocket_ungetPublisherService(void *handle, const celix_bundle if (entry != NULL && entry->getCount == 0) { //free entry hashMap_remove(sender->boundedServices.map, (void*)bndId); - - hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries); - while (hashMapIterator_hasNext(&iter)) { - psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter); - free(msgEntry); - } - hashMap_destroy(entry->msgEntries, false, false); - free(entry); } celixThreadMutex_unlock(&sender->boundedServices.mutex); } static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) { - int status = CELIX_SERVICE_EXCEPTION; psa_websocket_bounded_service_entry_t *bound = handle; pubsub_websocket_topic_sender_t *sender = bound->parent; - psa_websocket_serializer_entry_t *serializer = pubsub_websocketAdmin_acquireSerializerForMessageId(sender->admin, sender->serType, msgTypeId); + const char* msgFqn; + int majorVersion; + int minorVersion; + celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorVersion); - if(serializer == NULL) { - pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer); - L_WARN("[PSA_WEBSOCKET_V2_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic); - return CELIX_SERVICE_EXCEPTION; - } - - psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId)); - - if(entry == NULL) { - entry = calloc(1, sizeof(psa_websocket_send_msg_entry_t)); - entry->type = msgTypeId; - entry->header.id = serializer->fqn; - celix_version_t* version = celix_version_createVersionFromString(serializer->version); - entry->header.major = (uint8_t)celix_version_getMajor(version); - entry->header.minor = (uint8_t)celix_version_getMinor(version); - entry->header.seqNr = 0; - celix_version_destroy(version); - hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry); + + if (status != CELIX_SUCCESS) { + L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId, + pubsub_serializerHandler_getSerializationType(sender->serializerHandler)); + return status; } if (sender->sockConnection != NULL) { delay_first_send_for_late_joiners(sender); size_t serializedOutputLen = 0; struct iovec* serializedOutput = NULL; - status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedOutput, &serializedOutputLen); - + status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen); if (status == CELIX_SUCCESS /*ser ok*/) { json_error_t jsError; json_t *jsMsg = json_object(); - json_object_set_new_nocheck(jsMsg, "id", json_string(entry->header.id)); - json_object_set_new_nocheck(jsMsg, "major", json_integer(entry->header.major)); - json_object_set_new_nocheck(jsMsg, "minor", json_integer(entry->header.minor)); - uint32_t seqNr = __atomic_fetch_add(&entry->header.seqNr, 1, __ATOMIC_RELAXED); + json_object_set_new_nocheck(jsMsg, "id", json_string(msgFqn)); + json_object_set_new_nocheck(jsMsg, "major", json_integer(majorVersion)); + json_object_set_new_nocheck(jsMsg, "minor", json_integer(minorVersion)); + uint32_t seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED); json_object_set_new_nocheck(jsMsg, "seqNr", json_integer(seqNr)); json_t *jsData; @@ -338,17 +298,15 @@ static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgType } json_decref(jsMsg); //Decrease ref count means freeing the object - serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedOutput, serializedOutputLen); + pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedOutput, serializedOutputLen); } else { - L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for scope/topic %s/%s", - entry->header.id, sender->scope == NULL ? "(null)" : sender->scope, sender->topic); + L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %u for scope/topic %s/%s", + msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic); } } else { // when (sender->sockConnection == NULL) we dont have a client, but we do have a valid entry status = CELIX_SUCCESS; // Not an error, just nothing to do } - pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer); - return status; } diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h index 8f8cebf..6b42500 100644 --- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h +++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h @@ -22,6 +22,7 @@ #include "celix_bundle_context.h" #include "pubsub_admin_metrics.h" +#include "pubsub_serializer_handler.h" typedef struct pubsub_websocket_topic_sender pubsub_websocket_topic_sender_t; @@ -30,7 +31,7 @@ pubsub_websocket_topic_sender_t* pubsub_websocketTopicSender_create( celix_log_helper_t *logHelper, const char *scope, const char *topic, - const char *serType, + pubsub_serializer_handler_t* serializerHandler, void *admin); void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender); diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c index a00ce34..9d2070f 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c @@ -452,6 +452,10 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec int updateSerError = 0; const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId); + if (msgFqn == NULL) { + L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId); + return; + } void *deserializedMsg = NULL; bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion); diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c index 8ae4489..afc8e54 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c @@ -498,6 +498,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co pubsub_zmq_topic_sender_t *sender = bound->parent; bool monitor = sender->metricsEnabled; + //TODO remove use of entry, so that one less lock is needed and drop metrics stuff psa_zmq_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void*)(uintptr_t)(msgTypeId)); //metrics updates diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c index 418b4b5..7d3d0f3 100644 --- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c +++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c @@ -42,7 +42,7 @@ typedef struct pubsub_serialization_service_entry { const celix_properties_t *properties; uint32_t msgId; celix_version_t* msgVersion; - char* msgFqn; + const char* msgFqn; pubsub_message_serialization_service_t* svc; } pubsub_serialization_service_entry_t; @@ -185,7 +185,6 @@ static void pubsub_serializerHandler_destroyCallback(void* data) { celix_array_list_t *entries = hashMapIterator_nextValue(&iter); for (int i = 0; i < celix_arrayList_size(entries); ++i) { pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i); - free(entry->msgFqn); celix_version_destroy(entry->msgVersion); free(entry); } @@ -239,6 +238,12 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_ } if (valid) { + char* fqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId); + if (fqn == NULL) { + fqn = celix_utils_strdup(msgFqn); + hashMap_put(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId, fqn); + } + celix_array_list_t *entries = hashMap_get(handler->serializationServices, (void *) (uintptr_t) msgId); if (entries == NULL) { entries = celix_arrayList_create(); @@ -246,7 +251,7 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_ pubsub_serialization_service_entry_t *entry = calloc(1, sizeof(*entry)); entry->svcId = svcId; entry->properties = svcProperties; - entry->msgFqn = celix_utils_strdup(msgFqn); + entry->msgFqn = fqn; entry->msgId = msgId; entry->msgVersion = msgVersion; entry->svc = svc; @@ -258,11 +263,6 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_ celix_version_destroy(msgVersion); } - char* fqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId); - if (fqn == NULL) { - hashMap_put(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId, celix_utils_strdup(msgFqn)); - } - celixThreadRwlock_unlock(&handler->lock); } @@ -288,7 +288,6 @@ void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handl } } if (found != NULL) { - free(found->msgFqn); celix_version_destroy(found->msgVersion); free(found); }
