This is an automated email from the ASF dual-hosted git repository. abroekhuis pushed a commit to branch feature/scope_fixes in repository https://gitbox.apache.org/repos/asf/celix.git
commit 86918b96c50163a014dac577cb10f1a9bd477570 Author: Alexander Broekhuis <[email protected]> AuthorDate: Tue Apr 14 13:30:58 2020 +0200 Fixed merge problems wrt scope usage. --- .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 51 ++++++++-------------- .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 4 +- bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 15 ++++--- .../src/pubsub_topology_manager.c | 36 +++++++-------- 4 files changed, 48 insertions(+), 58 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c index 579d5d4..cc9f5b0 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c @@ -601,34 +601,7 @@ pubsub_tcpAdmin_connectEndpointToReceiver(pubsub_tcp_admin_t *psa, pubsub_tcp_to L_WARN("[PSA TCP] Error got endpoint without a tcp url (admin: %s, type: %s)", admin, type); status = CELIX_BUNDLE_EXCEPTION; } else { - const char *scope = pubsub_tcpTopicReceiver_scope(receiver); - const char *topic = pubsub_tcpTopicReceiver_topic(receiver); - const char *serializer = NULL; - long serializerSvcId = pubsub_tcpTopicReceiver_serializerSvcId(receiver); - psa_tcp_serializer_entry_t *serializerEntry = hashMap_get(psa->serializers.map, (void *) serializerSvcId); - if (serializerEntry != NULL) { - serializer = serializerEntry->serType; - } - const char *protocol = NULL; - long protocolSvcId = pubsub_tcpTopicReceiver_protocolSvcId(receiver); - psa_tcp_protocol_entry_t *protocolEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId); - if (protocolEntry != NULL) { - protocol = protocolEntry->protType; - } - - const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); - const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); - const char *eSerializer = celix_properties_get(endpoint, PUBSUB_ENDPOINT_SERIALIZER, NULL); - const char *eProtocol = celix_properties_get(endpoint, PUBSUB_ENDPOINT_PROTOCOL, NULL); - - if (scope != NULL && topic != NULL && serializer != NULL && protocol != NULL - && eScope != NULL && eTopic != NULL && eSerializer != NULL && eProtocol != NULL - && strncmp(eScope, scope, 1024 * 1024) == 0 - && strncmp(eTopic, topic, 1024 * 1024) == 0 - && strncmp(eSerializer, serializer, 1024 * 1024) == 0 - && strncmp(eProtocol, protocol, 1024 * 1024) == 0) { - pubsub_tcpTopicReceiver_connectTo(receiver, url); - } + pubsub_tcpTopicReceiver_connectTo(receiver, url); } return status; @@ -639,9 +612,12 @@ celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void *handle, const celix_p if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) { celixThreadMutex_lock(&psa->topicReceivers.mutex); - hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map); - while (hashMapIterator_hasNext(&iter)) { - pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); + const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); + char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); + + pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key); + if (receiver != NULL) { pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint); } celixThreadMutex_unlock(&psa->topicReceivers.mutex); @@ -663,13 +639,24 @@ pubsub_tcpAdmin_disconnectEndpointFromReceiver(pubsub_tcp_admin_t *psa, pubsub_t //note can be called with discoveredEndpoint.mutex lock celix_status_t status = CELIX_SUCCESS; + const char *scope = pubsub_tcpTopicReceiver_scope(receiver); + const char *topic = pubsub_tcpTopicReceiver_topic(receiver); + + const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); + const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL); if (url == NULL) { L_WARN("[PSA TCP] Error got endpoint without tcp url"); status = CELIX_BUNDLE_EXCEPTION; } else { - pubsub_tcpTopicReceiver_disconnectFrom(receiver, url); + if (eTopic != NULL && topic != NULL && strncmp(eTopic, topic, 1024 * 1024) == 0) { + if (scope == NULL && eScope == NULL) { + pubsub_tcpTopicReceiver_disconnectFrom(receiver, url); + } else if (scope != NULL && eScope != NULL && strncmp(eScope, scope, 1024 * 1024) == 0) { + pubsub_tcpTopicReceiver_disconnectFrom(receiver, url); + } + } } return status; diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c index a36fcfa..b6c7ee0 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c @@ -401,7 +401,7 @@ static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *req entry->service.send = psa_tcp_topicPublicationSend; hashMap_put(sender->boundedServices.map, (void *) bndId, entry); } else { - L_ERROR("Error creating serializer map for TCP TopicSender %s/%s", sender->scope, sender->topic); + L_ERROR("Error creating serializer map for TCP TopicSender %s/%s", sender->scope == NULL ? "(null)" : sender->scope, sender->topic); } } celixThreadMutex_unlock(&sender->boundedServices.mutex); @@ -447,7 +447,7 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender) { pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result)); - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope); + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope); snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic); celixThreadMutex_lock(&sender->boundedServices.mutex); size_t count = 0; diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c index b7cf5f6..38b83eb 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c @@ -65,6 +65,8 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID, if (scope != NULL) { celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope); + } else { + celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, PUBSUB_DEFAULT_ENDPOINT_SCOPE); } if (topic != NULL) { @@ -159,12 +161,11 @@ celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context char* topic = NULL; char* scopeFromFilter = NULL; pubsub_getPubSubInfoFromFilter(filter, &scopeFromFilter, &topic); - const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter; + const char *scope = scopeFromFilter; struct retrieve_topic_properties_data data; data.props = NULL; data.isPublisher = true; - data.scope = scope; data.topic = topic; celix_bundleContext_useBundle(ctx, bundleId, &data, retrieveTopicProperties); @@ -197,7 +198,11 @@ bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const celix_properti char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) { char *result = NULL; - asprintf(&result, "%s:%s", scope, topic); + if (scope != NULL) { + asprintf(&result, "%s:%s", scope, topic); + } else { + asprintf(&result, "default:%s", topic); + } return result; } @@ -223,7 +228,5 @@ bool pubsubEndpoint_isValid(const celix_properties_t *props, bool requireAdminTy checkProp(props, PUBSUB_ENDPOINT_SERIALIZER); } bool p6 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_NAME); - bool p7 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_SCOPE); - - return p1 && p2 && p3 && p4 && p5 && p6 && p7; + return p1 && p2 && p3 && p4 && p5 && p6; } \ No newline at end of file diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c index 5c6e9f2..ae51fca 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c @@ -314,7 +314,7 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_ //3) signal psaHandling thread to setup topic receiver const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL); - const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL); if (topic == NULL) { logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "[PSTM] Warning found subscriber service without mandatory '%s' property.", @@ -334,7 +334,7 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_ } else { entry = calloc(1, sizeof(*entry)); entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship - entry->scope = strndup(scope, 1024 * 1024); + entry->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024); entry->topic = strndup(topic, 1024 * 1024); entry->usageCount = 1; entry->selectedPsaSvcId = -1L; @@ -362,7 +362,7 @@ void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribut //1) Find topic receiver and decrease count const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL); - const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL); if (topic == NULL) { return; @@ -437,7 +437,7 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv char *topicFromFilter = NULL; char *scopeFromFilter = NULL; pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &scopeFromFilter, &topicFromFilter); - char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter; + char *scope = scopeFromFilter; char *topic = topicFromFilter; char *scopeAndTopicKey = NULL; @@ -491,7 +491,7 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se char *topic = NULL; char *scopeFromFilter = NULL; pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &scopeFromFilter, &topic); - const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter; + const char *scope = scopeFromFilter; if (topic == NULL) { free(scopeFromFilter); @@ -528,7 +528,7 @@ celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n", celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL), - celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)"), celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL), uuid); } @@ -582,9 +582,9 @@ celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, con if (manager->verbose) { logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n", - celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL), - celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), - celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "(null)"), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)"), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "(null)"), uuid); } @@ -634,7 +634,7 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) { if (entry != NULL && (entry->usageCount <= 0 || entry->needsMatch)) { if (manager->verbose && entry->endpoint != NULL) { logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, - "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope, entry->topic); + "[PSTM] Tearing down TopicSender for scope/topic %s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic); } if (entry->endpoint != NULL) { @@ -700,7 +700,7 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) { const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "[PSTM] Tearing down TopicReceiver for scope/topic %s/%s with psa admin type %s and serializer %s\n", - entry->scope, entry->topic, adminType, serType); + entry->scope == NULL ? "(null)" : entry->scope, entry->topic, adminType, serType); } if (entry->endpoint != NULL) { @@ -857,7 +857,7 @@ static void pstm_setupTopicSenders(pubsub_topology_manager_t *manager) { } if (entry->needsMatch) { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope, entry->topic); + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicSender for %s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic); } } } @@ -935,7 +935,7 @@ static void pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) { if (entry->needsMatch) { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope, entry->topic); + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot setup TopicReceiver for %s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic); } } } @@ -996,7 +996,7 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t const char *cn = celix_properties_get(discovered->endpoint, "container_name", "!Error!"); const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!"); const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!"); - const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!"); + const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)"); const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!"); const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); @@ -1034,7 +1034,7 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid); - fprintf(os, " |- scope = %s\n", entry->scope); + fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(null)" : entry->scope); fprintf(os, " |- topic = %s\n", entry->topic); fprintf(os, " |- admin type = %s\n", adminType); fprintf(os, " |- serializer = %s\n", serType); @@ -1064,7 +1064,7 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid); - fprintf(os, " |- scope = %s\n", entry->scope); + fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(null)" : entry->scope); fprintf(os, " |- topic = %s\n", entry->topic); fprintf(os, " |- admin type = %s\n", adminType); fprintf(os, " |- serializer = %s\n", serType); @@ -1086,7 +1086,7 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t while (hashMapIterator_hasNext(&iter)) { pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); if (entry->endpoint == NULL) { - fprintf(os, "|- Pending Topic Sender for %s/%s:\n", entry->scope, entry->topic); + fprintf(os, "|- Pending Topic Sender for %s/%s:\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic); const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)"); const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)"); const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)"); @@ -1111,7 +1111,7 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t while (hashMapIterator_hasNext(&iter)) { pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); if (entry->endpoint == NULL) { - fprintf(os, "|- Topic Receiver for %s/%s:\n", entry->scope, entry->topic); + fprintf(os, "|- Topic Receiver for %s/%s:\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic); const char *requestedQos = celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, "(None)"); const char *requestedConfig = celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)"); const char *requestedSer = celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, "(None)");
