This is an automated email from the ASF dual-hosted git repository. abroekhuis pushed a commit to branch feature/scope_usage in repository https://gitbox.apache.org/repos/asf/celix.git
commit a2dd43189114a0f97db78c6a973876ad55a8e086 Author: Alexander Broekhuis <[email protected]> AuthorDate: Tue Mar 31 11:20:18 2020 +0200 Updated ZMQ implementation to use NULL if scope is not set. Only when needed, "default" is used. Added exclude filter to example for scope, to make sure that the right publisher is found. If a publisher without scope, and a publisher with a scope, use the same topic, this could result in getting the wrong publisher service. --- .../publisher/private/src/ps_pub_activator.c | 2 +- .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 12 ++++--- .../src/pubsub_zmq_topic_receiver.c | 20 ++++++++---- .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 10 ++++-- .../pubsub/pubsub_api/include/pubsub/subscriber.h | 2 +- .../pubsub_discovery/src/pubsub_discovery_impl.c | 2 +- bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 17 ++++++---- bundles/pubsub/pubsub_spi/src/pubsub_utils.c | 24 +++----------- .../src/pubsub_topology_manager.c | 38 +++++++++++++++------- 9 files changed, 73 insertions(+), 54 deletions(-) diff --git a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c index d545cbc..c87e8e0 100644 --- a/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c +++ b/bundles/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c @@ -62,7 +62,7 @@ static int pub_start(struct publisherActivator *act, celix_bundle_context_t *ctx snprintf(filter, 128, "(%s=%s)(%s=%s)", PUBSUB_PUBLISHER_TOPIC, topic, PUBSUB_PUBLISHER_SCOPE, scope); free(scope); #else - snprintf(filter, 128, "(%s=%s)", (char*) PUBSUB_PUBLISHER_TOPIC, topic); + snprintf(filter, 128, "(&(%s=%s)(!(%s=*)))", (char*) PUBSUB_PUBLISHER_TOPIC, topic, PUBSUB_PUBLISHER_SCOPE); #endif celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; opts.callbackHandle = act->client; diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c index 1130b03..3864d2e 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c @@ -667,13 +667,17 @@ static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin const char *eSerializer = celix_properties_get(endpoint, PUBSUB_ENDPOINT_SERIALIZER, NULL); const char *eProtocol = celix_properties_get(endpoint, PUBSUB_ENDPOINT_PROTOCOL, NULL); - if (scope != NULL && topic != NULL && serializer != NULL && protocol != NULL - && eScope != NULL && eTopic != NULL && eSerializer != NULL && eProtocol != NULL - && strncmp(eScope, scope, 1024*1024) == 0 + if (topic != NULL && serializer != NULL && protocol != NULL + && eTopic != NULL && eSerializer != NULL && eProtocol != NULL && strncmp(eTopic, topic, 1024*1024) == 0 && strncmp(eSerializer, serializer, 1024*1024) == 0 && strncmp(eProtocol, protocol, 1024*1024) == 0) { - pubsub_zmqTopicReceiver_connectTo(receiver, url); + // Scope is not required + if (scope == NULL && eScope == NULL) { + pubsub_zmqTopicReceiver_connectTo(receiver, url); + } else if (scope != NULL && eScope != NULL && strncmp(scope, eScope, 1024*1024) == 0) { + pubsub_zmqTopicReceiver_connectTo(receiver, url); + } } } diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c index 979d373..e832aff 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c @@ -146,7 +146,7 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context receiver->serializer = serializer; receiver->protocolSvcId = protocolSvcId; receiver->protocol = protocol; - receiver->scope = strndup(scope, 1024 * 1024); + receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024); receiver->topic = strndup(topic, 1024 * 1024); receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED); @@ -260,7 +260,9 @@ pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context } if (receiver->zmqSock == NULL) { - free(receiver->scope); + if (receiver->scope != NULL) { + free(receiver->scope); + } free(receiver->topic); free(receiver); receiver = NULL; @@ -401,10 +403,16 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const pubsub_zmq_topic_receiver_t *receiver = handle; long bndId = celix_bundle_getId(bnd); - const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); - if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) { - //not the same scope. ignore - return; + const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL); + if (receiver->scope == NULL){ + if (subScope != NULL){ + return; + } + } else { + if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) { + //not the same scope. ignore + return; + } } celixThreadMutex_lock(&receiver->subscribers.mutex); diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c index d1fb841..35f959b 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c @@ -252,7 +252,7 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create( } if (sender->url != NULL) { - sender->scope = strndup(scope, 1024 * 1024); + sender->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024); sender->topic = strndup(topic, 1024 * 1024); celixThreadMutex_create(&sender->boundedServices.mutex, NULL); @@ -268,7 +268,9 @@ pubsub_zmq_topic_sender_t* pubsub_zmqTopicSender_create( celix_properties_t *props = celix_properties_create(); celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic); - celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope); + if (sender->scope != NULL) { + celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope); + } celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS; opts.factory = &sender->publisher.factory; @@ -318,7 +320,9 @@ void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) { celixThreadMutex_destroy(&sender->boundedServices.mutex); celixThreadMutex_destroy(&sender->zmq.mutex); - free(sender->scope); + if (sender->scope != NULL) { + free(sender->scope); + } free(sender->topic); free(sender->url); free(sender); diff --git a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h index 0ec4c58..f2afa26 100644 --- a/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h +++ b/bundles/pubsub/pubsub_api/include/pubsub/subscriber.h @@ -39,7 +39,7 @@ #define PUBSUB_SUBSCRIBER_SCOPE "scope" #define PUBSUB_SUBSCRIBER_CONFIG "pubsub.config" -#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT "default" +//#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT "default" struct pubsub_subscriber_struct { void *handle; diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c index 283db2f..c6a661b 100644 --- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c +++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c @@ -414,7 +414,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope clock_gettime(CLOCK_MONOTONIC, &entry->createTime); entry->isSet = false; entry->properties = celix_properties_copy(endpoint); - asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope, topic, uuid); + asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope == NULL ? "default" : scope, topic, uuid); const char *hashKey = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, NULL); celixThreadMutex_lock(&disc->announcedEndpointsMutex); diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c index f9e74b2..2a4eb5e 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c @@ -122,7 +122,7 @@ celix_properties_t* pubsubEndpoint_createFromSubscriberSvc(bundle_context_t* ctx celix_properties_t *ep = celix_properties_create(); const char* fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL); - const char* scope = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_SCOPE, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT); + const char* scope = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_SCOPE, NULL); const char* topic = celix_properties_get(svcProps, PUBSUB_SUBSCRIBER_TOPIC, NULL); struct retrieve_topic_properties_data data; @@ -157,7 +157,7 @@ celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context char* topic = NULL; char* scopeFromFilter = NULL; pubsub_getPubSubInfoFromFilter(filter, &topic, &scopeFromFilter); - const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter; + const char *scope = scopeFromFilter; struct retrieve_topic_properties_data data; data.props = NULL; @@ -176,7 +176,9 @@ celix_properties_t* pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context } free(topic); - free(scopeFromFilter); + if (scope != NULL) { + free(scopeFromFilter); + } return ep; } @@ -194,7 +196,11 @@ bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const celix_properti char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) { char *result = NULL; - asprintf(&result, "%s:%s", scope, topic); + if (scope != NULL) { + asprintf(&result, "%s:%s", scope, topic); + } else { + asprintf(&result, "default:%s", topic); + } return result; } @@ -220,7 +226,6 @@ bool pubsubEndpoint_isValid(const celix_properties_t *props, bool requireAdminTy checkProp(props, PUBSUB_ENDPOINT_SERIALIZER); } bool p6 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_NAME); - bool p7 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_SCOPE); - return p1 && p2 && p3 && p4 && p5 && p6 && p7; + return p1 && p2 && p3 && p4 && p5 && p6; } \ No newline at end of file diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c index 18005b6..078ce57 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_utils.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_utils.c @@ -43,25 +43,9 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topi const char *scope = NULL; const char *objectClass = NULL; celix_filter_t *filter = celix_filter_create(filterstr); - if (filter != NULL) { - if (filter->operand == CELIX_FILTER_OPERAND_AND) { //only and pubsub filter valid (e.g. (&(objectClass=pubsub_publisher)(topic=example)) - array_list_t *attributes = filter->children; - unsigned int i; - unsigned int size = arrayList_size(attributes); - for (i = 0; i < size; ++i) { - filter_t *attr = arrayList_get(attributes, i); - if (attr->operand == CELIX_FILTER_OPERAND_EQUAL) { - if (strncmp(OSGI_FRAMEWORK_OBJECTCLASS, attr->attribute, 128) == 0) { - objectClass = attr->value; - } else if (strncmp(PUBSUB_PUBLISHER_TOPIC, attr->attribute, 128) == 0) { - topic = attr->value; - } else if (strncmp(PUBSUB_PUBLISHER_SCOPE, attr->attribute, 128) == 0) { - scope = attr->value; - } - } - } - } - } + scope = (char *) celix_filter_findAttribute(filter, PUBSUB_PUBLISHER_SCOPE); + topic = (char *) celix_filter_findAttribute(filter, PUBSUB_PUBLISHER_TOPIC); + objectClass = (char *) celix_filter_findAttribute(filter, OSGI_FRAMEWORK_OBJECTCLASS); if (topic != NULL && objectClass != NULL && strncmp(objectClass, PUBSUB_PUBLISHER_SERVICE_NAME, 128) == 0) { //NOTE topic must be present, scope can be present in the filter. @@ -92,7 +76,7 @@ celix_status_t pubsub_getPubSubInfoFromFilter(const char* filterstr, char **topi char* pubsub_getKeysBundleDir(celix_bundle_context_t *ctx) { array_list_pt bundles = NULL; bundleContext_getBundles(ctx, &bundles); - int nrOfBundles = arrayList_size(bundles); + uint32_t nrOfBundles = arrayList_size(bundles); long bundle_id = -1; char* result = NULL; diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c index 5ec5140..4d163d0 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c @@ -126,7 +126,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); if (entry != NULL) { free(entry->scopeAndTopicKey); - free(entry->scope); + if (entry->scope != NULL) { + free(entry->scope); + } free(entry->topic); if (entry->topicProperties != NULL) { celix_properties_destroy(entry->topicProperties); @@ -148,7 +150,9 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); if (entry != NULL) { free(entry->scopeAndTopicKey); - free(entry->scope); + if (entry->scope != NULL) { + free(entry->scope); + } free(entry->topic); if (entry->topicProperties != NULL) { celix_properties_destroy(entry->topicProperties); @@ -314,7 +318,7 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_ //3) signal psaHandling thread to setup topic receiver const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL); - const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL); if (topic == NULL) { logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "[PSTM] Warning found subscriber service without mandatory '%s' property.", @@ -334,7 +338,7 @@ void pubsub_topologyManager_subscriberAdded(void *handle, void *svc __attribute_ } else { entry = calloc(1, sizeof(*entry)); entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship - entry->scope = strndup(scope, 1024 * 1024); + entry->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024); entry->topic = strndup(topic, 1024 * 1024); entry->usageCount = 1; entry->selectedPsaSvcId = -1L; @@ -362,7 +366,7 @@ void pubsub_topologyManager_subscriberRemoved(void *handle, void *svc __attribut //1) Find topic receiver and decrease count const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL); - const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL); if (topic == NULL) { return; @@ -437,7 +441,7 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv char *topicFromFilter = NULL; char *scopeFromFilter = NULL; pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topicFromFilter, &scopeFromFilter); - char *scope = scopeFromFilter == NULL ? strndup("default", 32) : scopeFromFilter; + char *scope = scopeFromFilter; char *topic = topicFromFilter; char *scopeAndTopicKey = NULL; @@ -453,7 +457,9 @@ void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_serv pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey); if (entry != NULL) { entry->usageCount += 1; - free(scope); + if (scope != NULL) { + free(scope); + } free(topic); free(scopeAndTopicKey); } else { @@ -491,10 +497,12 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se char *topic = NULL; char *scopeFromFilter = NULL; pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter); - const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter; + const char *scope = scopeFromFilter; if (topic == NULL) { - free(scopeFromFilter); + if (scopeFromFilter != NULL) { + free(scopeFromFilter); + } return; } @@ -509,7 +517,9 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se free(scopeAndTopicKey); free(topic); - free(scopeFromFilter); + if (scopeFromFilter != NULL) { + free(scopeFromFilter); + } } celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) { @@ -656,7 +666,9 @@ static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) { //no usage -> remove hashMapIterator_remove(&iter); free(entry->scopeAndTopicKey); - free(entry->scope); + if (entry->scope != NULL) { + free(entry->scope); + } free(entry->topic); if (entry->topicProperties != NULL) { celix_properties_destroy(entry->topicProperties); @@ -721,7 +733,9 @@ static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) { hashMapIterator_remove(&iter); //cleanup entry free(entry->scopeAndTopicKey); - free(entry->scope); + if (entry->scope != NULL) { + free(entry->scope); + } free(entry->topic); if (entry->topicProperties != NULL) { celix_properties_destroy(entry->topicProperties);
