This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/use_ser_hander_in_psa in repository https://gitbox.apache.org/repos/asf/celix.git
commit 14f3bc1a81ad4d4a672f899f360d93ee0603bfd4 Author: Pepijn Noltes <[email protected]> AuthorDate: Wed May 26 19:05:17 2021 +0200 Adds use of pubsub serializer handler to pubsub v2 tcp and v2 zmq --- .../pubsub/pubsub_admin_tcp/v2/src/psa_activator.c | 15 -- .../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c | 247 +++++---------------- .../pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h | 4 - .../v2/src/pubsub_tcp_topic_receiver.c | 70 ++---- .../v2/src/pubsub_tcp_topic_receiver.h | 7 +- .../v2/src/pubsub_tcp_topic_sender.c | 72 +++--- .../v2/src/pubsub_tcp_topic_sender.h | 4 +- .../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c | 20 -- .../v2/src/pubsub_zmq_topic_receiver.c | 12 +- .../v2/src/pubsub_zmq_topic_receiver.h | 2 +- .../v2/src/pubsub_zmq_topic_sender.c | 22 +- .../v2/src/pubsub_zmq_topic_sender.h | 2 +- .../src/PubSubSerializationHandlerTestSuite.cc | 23 +- .../include/pubsub_serializer_handler.h | 27 ++- .../pubsub_utils/src/pubsub_serializer_handler.c | 108 ++++++--- 15 files changed, 248 insertions(+), 387 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c index ec9badb..ec3f853 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c @@ -20,7 +20,6 @@ #include <stdlib.h> #include "celix_api.h" -#include "pubsub_serializer.h" #include "pubsub_protocol.h" #include "celix_log_helper.h" @@ -34,7 +33,6 @@ typedef struct psa_tcp_activator { pubsub_tcp_admin_t *admin; - long serializersTrackerId; long protocolsTrackerId; pubsub_admin_service_t adminService; @@ -50,7 +48,6 @@ typedef struct psa_tcp_activator { int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) { act->adminSvcId = -1L; act->cmdSvcId = -1L; - act->serializersTrackerId = -1L; act->protocolsTrackerId = -1L; act->logHelper = celix_logHelper_create(ctx, "celix_psa_admin_tcp_v2"); @@ -58,17 +55,6 @@ int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) { act->admin = pubsub_tcpAdmin_create(ctx, act->logHelper); celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION; - //track serializers - if (status == CELIX_SUCCESS) { - celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; - opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME; - opts.filter.ignoreServiceLanguage = true; - opts.callbackHandle = act->admin; - opts.addWithProperties = pubsub_tcpAdmin_addSerializerSvc; - opts.removeWithProperties = pubsub_tcpAdmin_removeSerializerSvc; - act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); - } - //track protocols if (status == CELIX_SUCCESS) { celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; @@ -132,7 +118,6 @@ int psa_tcp_stop(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) { celix_bundleContext_unregisterService(ctx, act->adminSvcId); celix_bundleContext_unregisterService(ctx, act->cmdSvcId); celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId); - celix_bundleContext_stopTracker(ctx, act->serializersTrackerId); celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId); pubsub_tcpAdmin_destroy(act->admin); diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c index 66c92ef..f5f200f 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c @@ -55,11 +55,6 @@ struct pubsub_tcp_admin { bool verbose; struct { - celix_thread_rwlock_t mutex; - hash_map_t *map; //key = svcId, value = psa_tcp_serializer_entry_t* - } serializers; - - struct { celix_thread_mutex_t mutex; hash_map_t *map; //key = svcId, value = psa_tcp_protocol_entry_t* } protocols; @@ -79,6 +74,11 @@ struct pubsub_tcp_admin { hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint) } discoveredEndpoints; + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*. + } serializationHandlers; + pubsub_tcp_endPointStore_t endpointStore; }; @@ -101,11 +101,6 @@ static bool pubsub_tcpAdmin_endpointIsPublisher(const celix_properties_t *endpoi return type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0; } -static void pubsub_tcpAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) { - const char** out = handle; - *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL); -} - pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper) { pubsub_tcp_admin_t *psa = calloc(1, sizeof(*psa)); psa->ctx = ctx; @@ -120,11 +115,9 @@ pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_lo psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_CONTROL_SCORE_KEY, PSA_TCP_DEFAULT_QOS_CONTROL_SCORE); - celixThreadRwlock_create(&psa->serializers.mutex, NULL); - psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - celixThreadMutex_create(&psa->protocols.mutex, NULL); psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL); + celixThreadMutex_create(&psa->topicSenders.mutex, NULL); psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); @@ -137,6 +130,9 @@ pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_lo celixThreadMutex_create(&psa->endpointStore.mutex, NULL); psa->endpointStore.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL); + psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL); + return psa; } @@ -177,13 +173,13 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) { } celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex); - celixThreadRwlock_writeLock(&psa->serializers.mutex); - iter = hashMapIterator_construct(psa->serializers.map); + celixThreadMutex_lock(&psa->serializationHandlers.mutex); + iter = hashMapIterator_construct(psa->serializationHandlers.map); while (hashMapIterator_hasNext(&iter)) { - psa_tcp_serializer_entry_t *entry = hashMapIterator_nextValue(&iter); - free(entry); + pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter); + pubsub_serializerHandler_destroy(entry); } - celixThreadRwlock_unlock(&psa->serializers.mutex); + celixThreadMutex_unlock(&psa->serializationHandlers.mutex); celixThreadMutex_lock(&psa->protocols.mutex); iter = hashMapIterator_construct(psa->protocols.map); @@ -205,8 +201,9 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) { celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex); hashMap_destroy(psa->discoveredEndpoints.map, false, false); - celixThreadRwlock_destroy(&psa->serializers.mutex); - hashMap_destroy(psa->serializers.map, false, false); + celixThreadMutex_destroy(&psa->serializationHandlers.mutex); + hashMap_destroy(psa->serializationHandlers.map, false, false); + celixThreadMutex_destroy(&psa->protocols.mutex); hashMap_destroy(psa->protocols.map, false, false); @@ -215,101 +212,6 @@ void pubsub_tcpAdmin_destroy(pubsub_tcp_admin_t *psa) { free(psa); } -void pubsub_tcpAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) { - pubsub_tcp_admin_t *psa = handle; - - const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL); - long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L); - const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL); - const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0"); - - if (serType == NULL || msgId == -1L || msgFqn == NULL) { - L_INFO("[PSA_TCP_V2] Ignoring serializer service without one of the following properties: %s or %s or %s", - PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY); - - L_INFO("[PSA_TCP_V2] Ignored serializer type %s msgId %li fqn %s", serType, msgId, msgFqn); - return; - } - L_INFO("[PSA_TCP_V2] Adding serializer type %s msgId %li fqn %s", serType, msgId, msgFqn); - - celixThreadRwlock_writeLock(&psa->serializers.mutex); - hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType); - if(typeEntries == NULL) { - typeEntries = hashMap_create(NULL, NULL, NULL, NULL); - hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries); - L_INFO("[PSA_TCP_V2] typeEntries added %p %s", psa->serializers.map, serType); - } - psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId); - if (entry == NULL) { - entry = calloc(1, sizeof(psa_tcp_serializer_entry_t)); - entry->svc = svc; - entry->fqn = celix_utils_strdup(msgFqn); - entry->version = celix_utils_strdup(msgVersion); - hashMap_put(typeEntries, (void*)msgId, entry); - L_INFO("[PSA_TCP_V2] entry added"); - } - celixThreadRwlock_unlock(&psa->serializers.mutex); -} - -void pubsub_tcpAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) { - pubsub_tcp_admin_t *psa = handle; - const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL); - long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L); - - //remove serializer - // 1) First find entry and - // 2) loop and destroy all topic sender using the serializer and - // 3) loop and destroy all topic receivers using the serializer - // Note that it is the responsibility of the topology manager to create new topic senders/receivers - - celixThreadRwlock_writeLock(&psa->serializers.mutex); - hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType); - if(typeEntries != NULL) { - psa_tcp_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId); - free((void*)entry->fqn); - free((void*)entry->version); - free(entry); - - // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type. - if(hashMap_size(typeEntries) == 0) { - hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false); - celixThreadRwlock_unlock(&psa->serializers.mutex); - - celixThreadMutex_lock(&psa->topicSenders.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter); - pubsub_tcp_topic_sender_t *sender = hashMapEntry_getValue(senderEntry); - if (sender != NULL && strncmp(serType, pubsub_tcpTopicSender_serializerType(sender), 1024 * 1024) == 0) { - char *key = hashMapEntry_getKey(senderEntry); - hashMapIterator_remove(&iter); - pubsub_tcpTopicSender_destroy(sender); - free(key); - } - } - celixThreadMutex_unlock(&psa->topicSenders.mutex); - - celixThreadMutex_lock(&psa->topicReceivers.mutex); - iter = hashMapIterator_construct(psa->topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_t *receiverEntry = hashMapIterator_nextEntry(&iter); - pubsub_tcp_topic_receiver_t *receiver = hashMapEntry_getValue(receiverEntry); - if (receiver != NULL && strncmp(serType, pubsub_tcpTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) { - char *key = hashMapEntry_getKey(receiverEntry); - hashMapIterator_remove(&iter); - pubsub_tcpTopicReceiver_destroy(receiver); - free(key); - } - } - celixThreadMutex_unlock(&psa->topicReceivers.mutex); - } else { - celixThreadRwlock_unlock(&psa->serializers.mutex); - } - } else { - celixThreadRwlock_unlock(&psa->serializers.mutex); - } -} - void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) { pubsub_tcp_admin_t *psa = handle; @@ -421,30 +323,40 @@ pubsub_tcpAdmin_matchDiscoveredEndpoint(void *handle, const celix_properties_t * return status; } +static pubsub_serializer_handler_t* pubsub_tcpAdmin_getSerializationHandler(pubsub_tcp_admin_t* psa, long msgSerializationMarkerSvcId) { + pubsub_serializer_handler_t* handler = NULL; + celixThreadMutex_lock(&psa->serializationHandlers.mutex); + handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId); + if (handler == NULL) { + handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log); + if (handler != NULL) { + hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler); + } + } + celixThreadMutex_unlock(&psa->serializationHandlers.mutex); + return handler; +} + celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) { pubsub_tcp_admin_t *psa = handle; celix_status_t status = CELIX_SUCCESS; - //1) Create TopicSender - //2) Store TopicSender - //3) Connect existing endpoints - //4) set outPublisherEndpoint + //1) Get serialization handler + //2) Create TopicSender + //3) Store TopicSender + //4) Connect existing endpoints + //5) set outPublisherEndpoint + + pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId); + if (handler == NULL) { + L_ERROR("Cannot create topic sender without serialization handler"); + return CELIX_ILLEGAL_STATE; + } celix_properties_t *newEndpoint = NULL; char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - - //get serializer type - const char *serType = NULL; - celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS; - opts.callbackHandle = &serType; - opts.useWithProperties = pubsub_tcpAdmin_getSerType; - opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME; - char filter[32]; - snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId); - opts.filter.filter = filter; - celix_bundleContext_useServiceWithOptions(psa->ctx, &opts); celixThreadMutex_lock(&psa->protocols.mutex); celixThreadMutex_lock(&psa->topicSenders.mutex); @@ -453,14 +365,15 @@ celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, if (sender == NULL) { psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId); if (protEntry != NULL) { - sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties, + sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle, topicProperties, &psa->endpointStore, protocolSvcId, protEntry->svc); } if (sender != NULL) { const char *psaType = PUBSUB_TCP_ADMIN_TYPE; const char *protType = protEntry->protType; - newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, protType, NULL); + newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, + pubsub_serializerHandler_getSerializationType(handler), protType, NULL); celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender)); celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, pubsub_tcpTopicSender_isStatic(sender)); @@ -525,21 +438,15 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop long protocolSvcId, celix_properties_t **outSubscriberEndpoint) { pubsub_tcp_admin_t *psa = handle; - celix_properties_t *newEndpoint = NULL; + pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId); + if (handler == NULL) { + L_ERROR("Cannot create topic receiver without serialization handler"); + return CELIX_ILLEGAL_STATE; + } + celix_properties_t *newEndpoint = NULL; char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - //get serializer type - const char *serType = NULL; - celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS; - opts.callbackHandle = &serType; - opts.useWithProperties = pubsub_tcpAdmin_getSerType; - opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME; - char filter[32]; - snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId); - opts.filter.filter = filter; - celix_bundleContext_useServiceWithOptions(psa->ctx, &opts); - celixThreadMutex_lock(&psa->protocols.mutex); celixThreadMutex_lock(&psa->topicReceivers.mutex); pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key); @@ -547,7 +454,8 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop if (receiver == NULL) { psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId); if (protEntry != NULL) { - receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties, + receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic, + handler, handle, topicProperties, &psa->endpointStore, protocolSvcId, protEntry->svc); } else { L_ERROR("[PSA_TCP_V2] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic); @@ -556,7 +464,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop const char *psaType = PUBSUB_TCP_ADMIN_TYPE; const char *protType = protEntry->protType; newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, - PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL); + PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, pubsub_serializerHandler_getSerializationType(handler), protType, NULL); //if available also set container name const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL); if (cn != NULL) { @@ -795,53 +703,6 @@ bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine __attr return status; } - - -psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) { - pubsub_tcp_admin_t *psa = handle; - psa_tcp_serializer_entry_t *serializer = NULL; - - celixThreadRwlock_readLock(&psa->serializers.mutex); - hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType); - if(typeEntries != NULL) { - serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId); - } - - return serializer; -} - -void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer __attribute__((unused))) { - pubsub_tcp_admin_t *psa = handle; - celixThreadRwlock_unlock(&psa->serializers.mutex); -} - -int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) { - pubsub_tcp_admin_t *psa = handle; - int64_t id = -1L; - - celixThreadRwlock_readLock(&psa->serializers.mutex); - hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType); - if(typeEntries != NULL) { - hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries); - while(hashMapIterator_hasNext(&iterator)) { - void *key = hashMapIterator_nextKey(&iterator); - psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, key); - L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, fqn); - if(strncmp(fqn, entry->fqn, 1024*1024) == 0) { - id = (uint32_t)(uintptr_t)key; - break; - } - } - } else { - L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", serializationType, fqn); - } - celixThreadRwlock_unlock(&psa->serializers.mutex); - - L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn %p %s %s = %i", psa->serializers.map, serializationType, fqn, id); - - return id; -} - pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle) { return NULL; } diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h index 2440fbb..513a934 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h @@ -82,10 +82,6 @@ void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_propert void pubsub_tcpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props); bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream); -psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId); -void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer); -int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn); - pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle); #endif //CELIX_PUBSUB_TCP_ADMIN_H diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c index 853e49d..b02768a 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c @@ -56,7 +56,7 @@ struct pubsub_tcp_topic_receiver { pubsub_protocol_service_t *protocol; char *scope; char *topic; - char *serType; + pubsub_serializer_handler_t* serializerHandler; void *admin; size_t timeout; bool isPassive; @@ -104,13 +104,12 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv static void processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime); static void psa_tcp_connectHandler(void *handle, const char *url, bool lock); static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock); -static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor); pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper, const char *scope, const char *topic, - const char *serType, + pubsub_serializer_handler_t* serializerHandler, void *admin, const celix_properties_t *topicProperties, pubsub_tcp_endPointStore_t *handlerStore, @@ -119,7 +118,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver)); receiver->ctx = ctx; receiver->logHelper = logHelper; - receiver->serType = celix_utils_strdup(serType); + receiver->serializerHandler = serializerHandler; receiver->admin = admin; receiver->protocolSvcId = protocolSvcId; receiver->protocol = protocol; @@ -269,7 +268,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) { } } hashMap_destroy(receiver->subscribers.map, false, false); - celixThreadMutex_unlock(&receiver->subscribers.mutex); celixThreadMutex_lock(&receiver->requestedConnections.mutex); @@ -299,7 +297,6 @@ void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) { free(receiver->scope); } free(receiver->topic); - free(receiver->serType); } free(receiver); } @@ -313,7 +310,7 @@ const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver) } const char *pubsub_tcpTopicReceiver_serializerType(pubsub_tcp_topic_receiver_t *receiver) { - return receiver->serType; + return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler); } long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver) { @@ -460,47 +457,39 @@ static inline void processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry, const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) { //NOTE receiver->subscribers.mutex locked - psa_tcp_serializer_entry_t *msgSer = pubsub_tcpAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serType, message->header.msgId); - if(msgSer == NULL) { - pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer); - L_WARN("[PSA_TCP_TR] Cannot find serializer for type id 0x%X. Received payload size is %u.", message->header.msgId, message->payload.length); - return; - } + const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId); void *deSerializedMsg = NULL; - celix_version_t *version = celix_version_createVersionFromString(msgSer->version); - bool validVersion = psa_tcp_checkVersion(version, message->header.msgMajorVersion, message->header.msgMinorVersion); - celix_version_destroy(version); + bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion); if (validVersion) { struct iovec deSerializeBuffer; deSerializeBuffer.iov_base = message->payload.payload; deSerializeBuffer.iov_len = message->payload.length; - celix_status_t status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg); + celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg); // When received payload pointer is the same as deserializedMsg, set ownership of pointer to topic receiver if (message->payload.payload == deSerializedMsg) { *releaseMsg = true; } - const char *msgType = msgSer->fqn; if (status == CELIX_SUCCESS) { uint32_t msgId = message->header.msgId; celix_properties_t *metadata = message->metadata.metadata; - bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata); + bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, &metadata); bool release = true; if (cont) { hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices); while (hashMapIterator_hasNext(&iter)) { pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter); - svc->receive(svc->handle,msgType, msgId, deSerializedMsg, message->metadata.metadata, &release); - pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata); + svc->receive(svc->handle, msgFqn, msgId, deSerializedMsg, message->metadata.metadata, &release); + pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata); if (!release && hashMapIterator_hasNext(&iter)) { //receive function has taken ownership and still more receive function to come .. //deserialize again for new message - status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg); + status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg); if (status != CELIX_SUCCESS) { L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", - msgType, + msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); break; @@ -509,19 +498,25 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subs } } if (release) { - msgSer->svc->freeDeserializedMsg(msgSer->svc->handle, deSerializedMsg); + pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg); } if (message->metadata.metadata) { celix_properties_destroy(message->metadata.metadata); } } } else { - L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgType, + L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); } + } else { + L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x", + msgFqn, + pubsub_serializerHandler_getSerializationType(receiver->serializerHandler), + (int)message->header.msgMajorVersion, + (int)message->header.msgMinorVersion, + pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId), + pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId)); } - - pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer); } static void @@ -664,24 +659,3 @@ static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiv } celixThreadMutex_unlock(&receiver->subscribers.mutex); } - -static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) { - bool check = false; - - if (major == 0 && minor == 0) { - //no check - return true; - } - - int versionMajor; - int versionMinor; - if (msgVersion!=NULL) { - version_getMajor(msgVersion, &versionMajor); - version_getMinor(msgVersion, &versionMinor); - if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */ - check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ - } - } - - return check; -} diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h index a7de405..d06fe4a 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h @@ -20,10 +20,11 @@ #ifndef CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H #define CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H -#include <pubsub_admin_metrics.h> +#include "pubsub_admin_metrics.h" #include "celix_bundle_context.h" -#include <pubsub_protocol.h> +#include "pubsub_protocol.h" #include "pubsub_tcp_common.h" +#include "pubsub_serializer_handler.h" typedef struct pubsub_tcp_topic_receiver pubsub_tcp_topic_receiver_t; @@ -31,7 +32,7 @@ pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context celix_log_helper_t *logHelper, const char *scope, const char *topic, - const char *serType, + pubsub_serializer_handler_t* serializerHandler, void *admin, const celix_properties_t *topicProperties, pubsub_tcp_endPointStore_t *handlerStore, diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c index 847d538..11d73d9 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c @@ -57,12 +57,12 @@ struct pubsub_tcp_topic_sender { pubsub_tcpHandler_t *socketHandler; pubsub_tcpHandler_t *sharedSocketHandler; pubsub_interceptors_handler_t *interceptorsHandler; + pubsub_serializer_handler_t* serializerHandler; void *admin; char *scope; char *topic; char *url; - char *serializerType; bool isStatic; bool isPassive; bool verbose; @@ -85,7 +85,6 @@ typedef struct psa_tcp_send_msg_entry { uint8_t major; uint8_t minor; unsigned char originUUID[16]; -// pubsub_msg_serializer_t *msgSer; pubsub_protocol_service_t *protSer; struct iovec *serializedIoVecOutput; size_t serializedIoVecOutputLen; @@ -118,7 +117,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( celix_log_helper_t *logHelper, const char *scope, const char *topic, - const char *serializerType, + pubsub_serializer_handler_t* serializerHandler, void *admin, const celix_properties_t *topicProperties, pubsub_tcp_endPointStore_t *handlerStore, @@ -127,7 +126,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender)); sender->ctx = ctx; sender->logHelper = logHelper; - sender->serializerType = celix_utils_strdup(serializerType); + sender->serializerHandler = serializerHandler; sender->admin = admin; sender->protocolSvcId = protocolSvcId; sender->protocol = protocol; @@ -189,7 +188,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt); pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout); pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize); - // Hhen passiveKey is specified, enable receive event for full-duplex connection using key. + // When passiveKey is specified, enable receive event for full-duplex connection using key. // Because the topic receiver is already started, enable the receive event. pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false); pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout); @@ -301,7 +300,6 @@ void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) { } free(sender->topic); free(sender->url); - free(sender->serializerType); free(sender); } } @@ -319,7 +317,7 @@ const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender) { } const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender) { - return sender->serializerType; + return pubsub_serializerHandler_getSerializationType(sender->serializerHandler); } const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) { @@ -337,15 +335,6 @@ bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) { return sender->isPassive; } -static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) { - psa_tcp_bounded_service_entry_t *entry = (psa_tcp_bounded_service_entry_t *) handle; - int64_t rc = pubsub_tcpAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serializerType, msgType); - if(rc >= 0) { - *msgTypeId = (unsigned int)rc; - } - return 0; -} - static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) { pubsub_tcp_topic_sender_t *sender = handle; @@ -406,39 +395,35 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i psa_tcp_bounded_service_entry_t *bound = handle; pubsub_tcp_topic_sender_t *sender = bound->parent; - psa_tcp_serializer_entry_t *serializer = pubsub_tcpAdmin_acquireSerializerForMessageId(sender->admin, sender->serializerType, msgTypeId); - - if(serializer == NULL) { - pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer); - L_WARN("[PSA_TCP_V2_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serializerType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic); - return CELIX_SERVICE_EXCEPTION; - } psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId)); - if(entry == NULL) { - entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t)); - entry->protSer = sender->protocol; - entry->type = msgTypeId; - entry->fqn = serializer->fqn; - celix_version_t* version = celix_version_createVersionFromString(serializer->version); - entry->major = (uint8_t)celix_version_getMajor(version); - entry->minor = (uint8_t)celix_version_getMinor(version); - celix_version_destroy(version); - uuid_copy(entry->originUUID, sender->fwUUID); - hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry); + if (entry == NULL) { + const char* fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId); + if (fqn != NULL) { + entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t)); + entry->protSer = sender->protocol; + entry->type = msgTypeId; + entry->fqn = fqn; + entry->major = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId); + entry->minor = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId); + uuid_copy(entry->originUUID, sender->fwUUID); + hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry); + } else { + L_WARN("Cannot find message serialization for msg id %i", (int)msgTypeId); + } } delay_first_send_for_late_joiners(sender); size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen; struct iovec *serializedIoVecOutput = NULL; - status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen); + status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen); entry->serializedIoVecOutputLen = MAX(serializedIoVecOutputLen, entry->serializedIoVecOutputLen); bool cont = false; if (status == CELIX_SUCCESS) /*ser ok*/ { - cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, &metadata); + cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, &metadata); } if (cont) { pubsub_protocol_message_t message; @@ -468,12 +453,12 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i status = -1; sendOk = false; } - pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, metadata); + pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->fqn, msgTypeId, inMsg, metadata); if (message.metadata.metadata) { celix_properties_destroy(message.metadata.metadata); } if (serializedIoVecOutput) { - serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedIoVecOutput, serializedIoVecOutputLen); + pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen); serializedIoVecOutput = NULL; } } @@ -482,12 +467,10 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *i L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno)); } } else { - L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", serializer->fqn, + L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", entry->fqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic); } - pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer); - return status; } @@ -502,4 +485,11 @@ static void delay_first_send_for_late_joiners(pubsub_tcp_topic_sender_t *sender) usleep(sender->send_delay * 1000); firstSend = false; } +} + +static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) { + psa_tcp_bounded_service_entry_t* entry = handle; + uint32_t msgId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType); + *msgTypeId = (unsigned int)msgId; + return 0; } \ No newline at end of file diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h index dfb5014..29c8f7a 100644 --- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h +++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h @@ -24,6 +24,7 @@ #include "pubsub_admin_metrics.h" #include "pubsub_protocol.h" #include "pubsub_tcp_common.h" +#include "pubsub_serializer_handler.h" typedef struct pubsub_tcp_topic_sender pubsub_tcp_topic_sender_t; @@ -32,7 +33,7 @@ pubsub_tcp_topic_sender_t *pubsub_tcpTopicSender_create( celix_log_helper_t *logHelper, const char *scope, const char *topic, - const char *serializerType, + pubsub_serializer_handler_t* serializerHandler, void *admin, const celix_properties_t *topicProperties, pubsub_tcp_endPointStore_t *handlerStore, @@ -46,7 +47,6 @@ const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender); const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender); bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender); bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender); -long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender); long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender); /** diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c index e30403e..cc6de56 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c @@ -66,11 +66,6 @@ struct pubsub_zmq_admin { bool verbose; struct { - celix_thread_rwlock_t mutex; - hash_map_t *map; //key = svcId, value = psa_zmq_serializer_entry_t* - } serializers; - - struct { celix_thread_mutex_t mutex; hash_map_t *map; //key = svcId, value = psa_zmq_protocol_entry_t* } protocols; @@ -189,9 +184,6 @@ pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_lo psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE); psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE); - celixThreadRwlock_create(&psa->serializers.mutex, NULL); - psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - celixThreadMutex_create(&psa->protocols.mutex, NULL); psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL); @@ -241,15 +233,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) { } celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex); - celixThreadRwlock_writeLock(&psa->serializers.mutex); - iter = hashMapIterator_construct(psa->serializers.map); - while (hashMapIterator_hasNext(&iter)) { - hash_map_t *entry = hashMapIterator_nextValue(&iter); - hashMap_destroy(entry, false, true); - } - hashMap_clear(psa->serializers.map, false, false); - celixThreadRwlock_unlock(&psa->serializers.mutex); - celixThreadMutex_lock(&psa->protocols.mutex); iter = hashMapIterator_construct(psa->protocols.map); while (hashMapIterator_hasNext(&iter)) { @@ -275,9 +258,6 @@ void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa) { celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex); hashMap_destroy(psa->discoveredEndpoints.map, false, false); - celixThreadRwlock_destroy(&psa->serializers.mutex); - hashMap_destroy(psa->serializers.map, false, false); - celixThreadMutex_destroy(&psa->protocols.mutex); hashMap_destroy(psa->protocols.map, false, false); diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c index 1051d77..a00ce34 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c @@ -95,7 +95,6 @@ struct pubsub_zmq_topic_receiver { struct { celix_thread_mutex_t mutex; hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t - hash_map_t *msgFqns; //key = msg id, value = char* msgFqn bool allInitialized; } subscribers; }; @@ -207,7 +206,6 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context celixThreadMutex_create(&receiver->recvThread.mutex, NULL); receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); - receiver->subscribers.msgFqns = hashMap_create(NULL, NULL, NULL, NULL); receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); } @@ -288,7 +286,6 @@ void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) { } } hashMap_destroy(receiver->subscribers.map, false, false); - hashMap_destroy(receiver->subscribers.msgFqns, false, true); celixThreadMutex_unlock(&receiver->subscribers.mutex); celixThreadMutex_lock(&receiver->requestedConnections.mutex); @@ -454,11 +451,7 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec int updateReceiveCount = 0; int updateSerError = 0; - char* msgFqn = hashMap_get(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId); - if (msgFqn == NULL) { - msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId); - hashMap_put(receiver->subscribers.msgFqns, (void*)(intptr_t)message->header.msgId, msgFqn); - } + const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId); void *deserializedMsg = NULL; bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion); @@ -505,8 +498,9 @@ static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); } } else { - L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s', version mismatch. Version received: %i.%i.x, version send: %i.%i.x", + L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x", msgFqn, + pubsub_serializerHandler_getSerializationType(receiver->serializerHandler), (int)message->header.msgMajorVersion, (int)message->header.msgMinorVersion, pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId), diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h index a2f953b..3900f55 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h @@ -32,7 +32,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context const char *scope, const char *topic, const celix_properties_t *topicProperties, - pubsub_serializer_handler_t* serHandler, + pubsub_serializer_handler_t* serializerHandler, void *admin, long protocolSvcId, pubsub_protocol_service_t *protocol); diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c index 9996c53..8ae4489 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c @@ -52,7 +52,7 @@ struct pubsub_zmq_topic_sender { celix_bundle_context_t *ctx; celix_log_helper_t *logHelper; - pubsub_serializer_handler_t* serializationHandler; + pubsub_serializer_handler_t* serializerHandler; void *admin; long protocolSvcId; pubsub_protocol_service_t *protocol; @@ -137,7 +137,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create( celix_log_helper_t *logHelper, const char *scope, const char *topic, - pubsub_serializer_handler_t* serializationHandler, + pubsub_serializer_handler_t* serializerHandler, void *admin, long protocolSvcId, pubsub_protocol_service_t *prot, @@ -148,7 +148,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create( pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender)); sender->ctx = ctx; sender->logHelper = logHelper; - sender->serializationHandler = serializationHandler; + sender->serializerHandler = serializerHandler; sender->admin = admin; sender->protocolSvcId = protocolSvcId; sender->protocol = prot; @@ -359,7 +359,7 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) { } const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) { - return pubsub_serializerHandler_getSerializationType(sender->serializationHandler); + return pubsub_serializerHandler_getSerializationType(sender->serializerHandler); } long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender) { @@ -384,7 +384,7 @@ bool pubsub_zmqTopicSender_isStatic(pubsub_zmq_topic_sender_t *sender) { static int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) { psa_zmq_bounded_service_entry_t *entry = (psa_zmq_bounded_service_entry_t *) handle; - *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializationHandler, msgType); + *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType); return 0; } @@ -513,9 +513,9 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co entry = calloc(1, sizeof(psa_zmq_send_msg_entry_t)); entry->protSer = sender->protocol; entry->type = msgTypeId; - entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializationHandler, msgTypeId); - entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializationHandler, msgTypeId); - entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializationHandler, msgTypeId); + entry->fqn = pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId); + entry->msgMajorVersion = pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, msgTypeId); + entry->msgMinorVersion = pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, msgTypeId); uuid_copy(entry->originUUID, sender->fwUUID); celixThreadMutex_create(&entry->metrics.mutex, NULL); hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry); @@ -528,7 +528,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co } size_t serializedOutputLen = 0; struct iovec *serializedOutput = NULL; - status = pubsub_serializerHandler_serialize(sender->serializationHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen); + status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen); if (monitor) { clock_gettime(CLOCK_REALTIME, &serializationEnd); } @@ -588,7 +588,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co zmq_msg_t msg4; // Footer void *socket = zsock_resolve(sender->zmq.socket); psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry)); - freeMsgEntry->serHandler = sender->serializationHandler; + freeMsgEntry->serHandler = sender->serializerHandler; freeMsgEntry->msgId = msgTypeId; freeMsgEntry->serializedOutput = serializedOutput; freeMsgEntry->serializedOutputLen = serializedOutputLen; @@ -665,7 +665,7 @@ static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, co celix_properties_destroy(message.metadata.metadata); } if (!bound->parent->zeroCopyEnabled && serializedOutput) { - pubsub_serializerHandler_freeSerializedMsg(sender->serializationHandler, msgTypeId, serializedOutput, serializedOutputLen); + pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedOutput, serializedOutputLen); } if (sendOk) { diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h index c2c0d57..584b88d 100644 --- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h +++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h @@ -32,7 +32,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create( celix_log_helper_t *logHelper, const char *scope, const char *topic, - pubsub_serializer_handler_t* serializationHandler, + pubsub_serializer_handler_t* serializerHandler, void *admin, long protocolSvcId, pubsub_protocol_service_t *prot, diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc index 550c85f..77a2fa0 100644 --- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc +++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc @@ -111,7 +111,6 @@ TEST_F(PubSubSerializationHandlerTestSuite, SerializationServiceFound) { EXPECT_EQ(42, pubsub_serializerHandler_getMsgId(handler, "example::Msg")); auto *fqn = pubsub_serializerHandler_getMsgFqn(handler, 42); EXPECT_STREQ("example::Msg", fqn); - free(fqn); EXPECT_TRUE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 0)); EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0)); @@ -293,3 +292,25 @@ TEST_F(PubSubSerializationHandlerTestSuite, CreateHandlerFromMarker) { celix_logHelper_destroy(logHelper); } + +TEST_F(PubSubSerializationHandlerTestSuite, GetMsgInfo) { + auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true); + EXPECT_FALSE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42)); + EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr)); + + + long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0"); + EXPECT_TRUE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42)); + EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr)); + + const char* msgFqn; + int major; + int minor; + EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, &msgFqn, &major, &minor)); + EXPECT_STREQ("example::Msg1", msgFqn); + EXPECT_EQ(1, major); + EXPECT_EQ(0, minor); + + celix_bundleContext_unregisterService(ctx.get(), svcId1); + pubsub_serializerHandler_destroy(handler); +} \ No newline at end of file diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h index 0519891..f2c58ac 100644 --- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h +++ b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h @@ -138,10 +138,15 @@ celix_status_t pubsub_serializerHandler_freeDeserializedMsg(pubsub_serializer_ha bool pubsub_serializerHandler_isMessageSupported(pubsub_serializer_handler_t* handler, uint32_t msgId, int majorVersion, int minorVersion); /** + * @brief Whether the serializer handler has found 1 or more pubsub_message_serialization_service for the provided msg id. + */ +bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId); + +/** * @brief Get msg fqn from a msg id. - * @return msg fqn or NULL if msg id is not known. + * @return msg fqn or NULL if msg id is not known. msg fqn is valid as long as the handler exists. */ -char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId); +const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId); /** * @brief Get a msg id from a msgFqn. @@ -176,6 +181,24 @@ int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* han */ int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId); + +/** + * @brief Returns msg info (fqn, major version, minor version) in 1 call. + * + * @param handler The serializer handler + * @param msgId The msg id where to get the info for + * @param msgFqnOut If not NULL will be set to the msgFqn (valid as long as the serializer handler is valid) + * @param msgMajorVersionOut If not NULL will be set to the msg major version + * @param msgMinorVersionOut If not NULL will be set to the msg minor version + * @return CELIX_SUCCESS on success, CELIX_ILLEGAL_ARGUMENT if the message for the provided msg id cannot be found. + */ +celix_status_t pubsub_serializerHandler_getMsgInfo( + pubsub_serializer_handler_t* handler, + uint32_t msgId, + const char** msgFqnOut, + int* msgMajorVersionOut, + int* msgMinorVersionOut); + #ifdef __cplusplus } #endif diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c index 7bfa5e0..418b4b5 100644 --- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c +++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c @@ -48,6 +48,7 @@ typedef struct pubsub_serialization_service_entry { struct pubsub_serializer_handler { celix_bundle_context_t* ctx; + char* filter; char* serType; bool backwardCompatible; long serializationSvcTrackerId; @@ -55,6 +56,7 @@ struct pubsub_serializer_handler { celix_thread_rwlock_t lock; hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t* + hash_map_t *msgFullyQualifiedNames; //key = msg id, value = msg fqn. Non destructive map with msg fqn }; static void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties); @@ -104,17 +106,6 @@ static bool isCompatible(pubsub_serializer_handler_t* handler, pubsub_serializat return compatible; } -static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) { - //NOTE assumes mutex is locked - const char *result = NULL; - celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId); - if (entries != NULL) { - pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry. - result = entry->msgFqn; - } - return result; -} - pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) { pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler)); handler->ctx = ctx; @@ -125,18 +116,18 @@ pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_contex celixThreadRwlock_create(&handler->lock, NULL); handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL); + handler->msgFullyQualifiedNames = hashMap_create(NULL, NULL, NULL, NULL); - char *filter = NULL; - asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType); + asprintf(&handler->filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType); celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME; opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE; - opts.filter.filter = filter; + opts.filter.filter = handler->filter; opts.callbackHandle = handler; opts.addWithProperties = addSerializationService; opts.removeWithProperties = removeSerializationService; - handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); - free(filter); + handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptionsAsync(ctx, &opts); + return handler; } @@ -186,26 +177,31 @@ pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(cel return data.handler; } +static void pubsub_serializerHandler_destroyCallback(void* data) { + pubsub_serializer_handler_t* handler = data; + celixThreadRwlock_destroy(&handler->lock); + hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices); + while (hashMapIterator_hasNext(&iter)) { + celix_array_list_t *entries = hashMapIterator_nextValue(&iter); + for (int i = 0; i < celix_arrayList_size(entries); ++i) { + pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i); + free(entry->msgFqn); + celix_version_destroy(entry->msgVersion); + free(entry); + } + celix_arrayList_destroy(entries); + } + hashMap_destroy(handler->serializationServices, false, false); + hashMap_destroy(handler->msgFullyQualifiedNames, false, true); + celix_logHelper_destroy(handler->logHelper); + free(handler->serType); + free(handler->filter); + free(handler); +} void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) { if (handler != NULL) { - celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId); - celixThreadRwlock_destroy(&handler->lock); - hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices); - while (hashMapIterator_hasNext(&iter)) { - celix_array_list_t *entries = hashMapIterator_nextValue(&iter); - for (int i = 0; i < celix_arrayList_size(entries); ++i) { - pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i); - free(entry->msgFqn); - celix_version_destroy(entry->msgVersion); - free(entry); - } - celix_arrayList_destroy(entries); - } - hashMap_destroy(handler->serializationServices, false, false); - celix_logHelper_destroy(handler->logHelper); - free(handler->serType); - free(handler); + celix_bundleContext_stopTrackerAsync(handler->ctx, handler->serializationSvcTrackerId, handler, pubsub_serializerHandler_destroyCallback); } } @@ -257,10 +253,16 @@ void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_ celix_arrayList_add(entries, entry); celix_arrayList_sort(entries, compareEntries); - hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries); + hashMap_put(handler->serializationServices, (void*)(uintptr_t)msgId, entries); } else { celix_version_destroy(msgVersion); } + + char* fqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId); + if (fqn == NULL) { + hashMap_put(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId, celix_utils_strdup(msgFqn)); + } + celixThreadRwlock_unlock(&handler->lock); } @@ -375,9 +377,16 @@ bool pubsub_serializerHandler_isMessageSupported(pubsub_serializer_handler_t* ha return compatible; } -char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) { +bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId) { + celixThreadRwlock_readLock(&handler->lock); + void* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId); + celixThreadRwlock_unlock(&handler->lock); + return entries != NULL; +} + +const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) { celixThreadRwlock_readLock(&handler->lock); - char *msgFqn = celix_utils_strdup(getMsgFqn(handler, msgId)); + char *msgFqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId); celixThreadRwlock_unlock(&handler->lock); return msgFqn; } @@ -434,4 +443,31 @@ size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializ const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler) { return handler->serType; +} + +int pubsub_serializerHandler_getMsgInfo( + pubsub_serializer_handler_t* handler, + uint32_t msgId, + const char** msgFqnOut, + int* msgMajorVersionOut, + int* msgMinorVersionOut) { + int result = CELIX_SUCCESS; + celixThreadRwlock_readLock(&handler->lock); + celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId); + if (entries != NULL) { + pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry. + if (msgFqnOut != NULL) { + *msgFqnOut = entry->msgFqn; + } + if (msgMinorVersionOut != NULL) { + *msgMajorVersionOut = celix_version_getMajor(entry->msgVersion); + } + if (msgMinorVersionOut != NULL) { + *msgMinorVersionOut = celix_version_getMinor(entry->msgVersion); + } + } else { + result = CELIX_ILLEGAL_ARGUMENT; + } + celixThreadRwlock_unlock(&handler->lock); + return result; } \ No newline at end of file
