This is an automated email from the ASF dual-hosted git repository.

abroekhuis pushed a commit to branch feature/scope_fixes
in repository https://gitbox.apache.org/repos/asf/celix.git

commit 86918b96c50163a014dac577cb10f1a9bd477570
Author: Alexander Broekhuis <[email protected]>
AuthorDate: Tue Apr 14 13:30:58 2020 +0200

    Fixed merge problems wrt scope usage.
---
 .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 51 ++++++++--------------
 .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c |  4 +-
 bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c    | 15 ++++---
 .../src/pubsub_topology_manager.c                  | 36 +++++++--------
 4 files changed, 48 insertions(+), 58 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index 579d5d4..cc9f5b0 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -601,34 +601,7 @@ 
pubsub_tcpAdmin_connectEndpointToReceiver(pubsub_tcp_admin_t *psa, pubsub_tcp_to
         L_WARN("[PSA TCP] Error got endpoint without a tcp url (admin: %s, 
type: %s)", admin, type);
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
-        const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
-        const char *serializer = NULL;
-        long serializerSvcId = 
pubsub_tcpTopicReceiver_serializerSvcId(receiver);
-        psa_tcp_serializer_entry_t *serializerEntry = 
hashMap_get(psa->serializers.map, (void *) serializerSvcId);
-        if (serializerEntry != NULL) {
-            serializer = serializerEntry->serType;
-        }
-        const char *protocol = NULL;
-        long protocolSvcId = pubsub_tcpTopicReceiver_protocolSvcId(receiver);
-        psa_tcp_protocol_entry_t *protocolEntry = 
hashMap_get(psa->protocols.map, (void *) protocolSvcId);
-        if (protocolEntry != NULL) {
-            protocol = protocolEntry->protType;
-        }
-
-        const char *eScope = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-        const char *eTopic = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-        const char *eSerializer = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_SERIALIZER, NULL);
-        const char *eProtocol = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_PROTOCOL, NULL);
-
-        if (scope != NULL && topic != NULL && serializer != NULL && protocol 
!= NULL
-            && eScope != NULL && eTopic != NULL && eSerializer != NULL && 
eProtocol != NULL
-            && strncmp(eScope, scope, 1024 * 1024) == 0
-            && strncmp(eTopic, topic, 1024 * 1024) == 0
-            && strncmp(eSerializer, serializer, 1024 * 1024) == 0
-            && strncmp(eProtocol, protocol, 1024 * 1024) == 0) {
-            pubsub_tcpTopicReceiver_connectTo(receiver, url);
-        }
+        pubsub_tcpTopicReceiver_connectTo(receiver, url);
     }
 
     return status;
@@ -639,9 +612,12 @@ celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void 
*handle, const celix_p
 
     if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) {
         celixThreadMutex_lock(&psa->topicReceivers.mutex);
-        hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicReceivers.map);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_tcp_topic_receiver_t *receiver = 
hashMapIterator_nextValue(&iter);
+        const char *scope = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+        const char *topic = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+        char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+
+        pubsub_tcp_topic_receiver_t *receiver = 
hashMap_get(psa->topicReceivers.map, key);
+        if (receiver != NULL) {
             pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint);
         }
         celixThreadMutex_unlock(&psa->topicReceivers.mutex);
@@ -663,13 +639,24 @@ 
pubsub_tcpAdmin_disconnectEndpointFromReceiver(pubsub_tcp_admin_t *psa, pubsub_t
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
+    const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
+    const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
+
+    const char *eScope = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+    const char *eTopic = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
     const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL);
 
     if (url == NULL) {
         L_WARN("[PSA TCP] Error got endpoint without tcp url");
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
+        if (eTopic != NULL && topic != NULL && strncmp(eTopic, topic, 1024 * 
1024) == 0) {
+            if (scope == NULL && eScope == NULL) {
+                pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
+            } else if (scope != NULL && eScope != NULL && strncmp(eScope, 
scope, 1024 * 1024) == 0) {
+                pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
+            }
+        }
     }
 
     return status;
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c 
b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index a36fcfa..b6c7ee0 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -401,7 +401,7 @@ static void *psa_tcp_getPublisherService(void *handle, 
const celix_bundle_t *req
             entry->service.send = psa_tcp_topicPublicationSend;
             hashMap_put(sender->boundedServices.map, (void *) bndId, entry);
         } else {
-            L_ERROR("Error creating serializer map for TCP TopicSender %s/%s", 
sender->scope, sender->topic);
+            L_ERROR("Error creating serializer map for TCP TopicSender %s/%s", 
sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
         }
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -447,7 +447,7 @@ static void psa_tcp_ungetPublisherService(void *handle, 
const celix_bundle_t *re
 
 pubsub_admin_sender_metrics_t 
*pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender) {
     pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
sender->scope);
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope 
== NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
sender->topic);
     celixThreadMutex_lock(&sender->boundedServices.mutex);
     size_t count = 0;
diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c 
b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
index b7cf5f6..38b83eb 100644
--- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -65,6 +65,8 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, 
const char* fwUUID,
 
     if (scope != NULL) {
         celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope);
+    } else {
+        celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, 
PUBSUB_DEFAULT_ENDPOINT_SCOPE);
     }
 
     if (topic != NULL) {
@@ -159,12 +161,11 @@ celix_properties_t* 
pubsubEndpoint_createFromPublisherTrackerInfo(bundle_context
     char* topic = NULL;
     char* scopeFromFilter = NULL;
     pubsub_getPubSubInfoFromFilter(filter, &scopeFromFilter, &topic);
-    const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
+    const char *scope = scopeFromFilter;
 
     struct retrieve_topic_properties_data data;
     data.props = NULL;
     data.isPublisher = true;
-    data.scope = scope;
     data.topic = topic;
     celix_bundleContext_useBundle(ctx, bundleId, &data, 
retrieveTopicProperties);
 
@@ -197,7 +198,11 @@ bool pubsubEndpoint_equals(const celix_properties_t 
*psEp1, const celix_properti
 
 char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) 
{
     char *result = NULL;
-    asprintf(&result, "%s:%s", scope, topic);
+    if (scope != NULL) {
+        asprintf(&result, "%s:%s", scope, topic);
+    } else {
+        asprintf(&result, "default:%s", topic);
+    }
     return result;
 }
 
@@ -223,7 +228,5 @@ bool pubsubEndpoint_isValid(const celix_properties_t 
*props, bool requireAdminTy
         checkProp(props, PUBSUB_ENDPOINT_SERIALIZER);
     }
     bool p6 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_NAME);
-    bool p7 = checkProp(props, PUBSUB_ENDPOINT_TOPIC_SCOPE);
-
-    return p1 && p2 && p3 && p4 && p5 && p6 && p7;
+    return p1 && p2 && p3 && p4 && p5 && p6;
 }
\ No newline at end of file
diff --git 
a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c 
b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 5c6e9f2..ae51fca 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -314,7 +314,7 @@ void pubsub_topologyManager_subscriberAdded(void *handle, 
void *svc __attribute_
     //3) signal psaHandling thread to setup topic receiver
 
     const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, 
NULL);
-    const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, 
"default");
+    const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, 
NULL);
     if (topic == NULL) {
         logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
                       "[PSTM] Warning found subscriber service without 
mandatory '%s' property.",
@@ -334,7 +334,7 @@ void pubsub_topologyManager_subscriberAdded(void *handle, 
void *svc __attribute_
     } else {
         entry = calloc(1, sizeof(*entry));
         entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship
-        entry->scope = strndup(scope, 1024 * 1024);
+        entry->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
         entry->topic = strndup(topic, 1024 * 1024);
         entry->usageCount = 1;
         entry->selectedPsaSvcId = -1L;
@@ -362,7 +362,7 @@ void pubsub_topologyManager_subscriberRemoved(void *handle, 
void *svc __attribut
     //1) Find topic receiver and decrease count
 
     const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, 
NULL);
-    const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, 
"default");
+    const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, 
NULL);
 
     if (topic == NULL) {
         return;
@@ -437,7 +437,7 @@ void pubsub_topologyManager_publisherTrackerAdded(void 
*handle, const celix_serv
     char *topicFromFilter = NULL;
     char *scopeFromFilter = NULL;
     pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &scopeFromFilter, 
&topicFromFilter);
-    char *scope = scopeFromFilter == NULL ? strndup("default", 32) : 
scopeFromFilter;
+    char *scope = scopeFromFilter;
     char *topic = topicFromFilter;
 
     char *scopeAndTopicKey = NULL;
@@ -491,7 +491,7 @@ void pubsub_topologyManager_publisherTrackerRemoved(void 
*handle, const celix_se
     char *topic = NULL;
     char *scopeFromFilter = NULL;
     pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &scopeFromFilter, 
&topic);
-    const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter;
+    const char *scope = scopeFromFilter;
 
     if (topic == NULL) {
         free(scopeFromFilter);
@@ -528,7 +528,7 @@ celix_status_t 
pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const
         logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
                       "PSTM: Discovered endpoint added for topic %s with scope 
%s [fwUUID=%s, epUUID=%s]\n",
                       celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
-                      celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+                      celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)"),
                       celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
                       uuid);
     }
@@ -582,9 +582,9 @@ celix_status_t 
pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, con
     if (manager->verbose) {
         logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
                       "PSTM: Discovered endpoint removed for topic %s with 
scope %s [fwUUID=%s, epUUID=%s]\n",
-                      celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
-                      celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
-                      celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+                      celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, "(null)"),
+                      celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)"),
+                      celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID, "(null)"),
                       uuid);
     }
 
@@ -634,7 +634,7 @@ static void 
pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
         if (entry != NULL && (entry->usageCount <= 0 || entry->needsMatch)) {
             if (manager->verbose && entry->endpoint != NULL) {
                 logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
-                              "[PSTM] Tearing down TopicSender for scope/topic 
%s/%s\n", entry->scope, entry->topic);
+                              "[PSTM] Tearing down TopicSender for scope/topic 
%s/%s\n", entry->scope == NULL ? "(null)" : entry->scope, entry->topic);
             }
 
             if (entry->endpoint != NULL) {
@@ -700,7 +700,7 @@ static void 
pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
                 const char *serType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
                 logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
                               "[PSTM] Tearing down TopicReceiver for 
scope/topic %s/%s with psa admin type %s and serializer %s\n",
-                              entry->scope, entry->topic, adminType, serType);
+                              entry->scope == NULL ? "(null)" : entry->scope, 
entry->topic, adminType, serType);
             }
 
             if (entry->endpoint != NULL) {
@@ -857,7 +857,7 @@ static void 
pstm_setupTopicSenders(pubsub_topology_manager_t *manager) {
             }
 
             if (entry->needsMatch) {
-                logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, 
"Cannot setup TopicSender for %s/%s\n", entry->scope, entry->topic);
+                logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, 
"Cannot setup TopicSender for %s/%s\n", entry->scope == NULL ? "(null)" : 
entry->scope, entry->topic);
             }
         }
     }
@@ -935,7 +935,7 @@ static void 
pstm_setupTopicReceivers(pubsub_topology_manager_t *manager) {
 
 
             if (entry->needsMatch) {
-                logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, 
"Cannot setup TopicReceiver for %s/%s\n", entry->scope, entry->topic);
+                logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, 
"Cannot setup TopicReceiver for %s/%s\n", entry->scope == NULL ? "(null)" : 
entry->scope, entry->topic);
             }
         }
     }
@@ -996,7 +996,7 @@ static celix_status_t 
pubsub_topologyManager_topology(pubsub_topology_manager_t
         const char *cn = celix_properties_get(discovered->endpoint, 
"container_name", "!Error!");
         const char *fwuuid = celix_properties_get(discovered->endpoint, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!");
         const char *type = celix_properties_get(discovered->endpoint, 
PUBSUB_ENDPOINT_TYPE, "!Error!");
-        const char *scope = celix_properties_get(discovered->endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+        const char *scope = celix_properties_get(discovered->endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)");
         const char *topic = celix_properties_get(discovered->endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
         const char *adminType = celix_properties_get(discovered->endpoint, 
PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
         const char *serType = celix_properties_get(discovered->endpoint, 
PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
@@ -1034,7 +1034,7 @@ static celix_status_t 
pubsub_topologyManager_topology(pubsub_topology_manager_t
         const char *serType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
         const char *protType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid);
-        fprintf(os, "   |- scope       = %s\n", entry->scope);
+        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? 
"(null)" : entry->scope);
         fprintf(os, "   |- topic       = %s\n", entry->topic);
         fprintf(os, "   |- admin type  = %s\n", adminType);
         fprintf(os, "   |- serializer  = %s\n", serType);
@@ -1064,7 +1064,7 @@ static celix_status_t 
pubsub_topologyManager_topology(pubsub_topology_manager_t
         const char *serType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
         const char *protType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_PROTOCOL, "!Error!");
         fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid);
-        fprintf(os, "   |- scope       = %s\n", entry->scope);
+        fprintf(os, "   |- scope       = %s\n", entry->scope == NULL ? 
"(null)" : entry->scope);
         fprintf(os, "   |- topic       = %s\n", entry->topic);
         fprintf(os, "   |- admin type  = %s\n", adminType);
         fprintf(os, "   |- serializer  = %s\n", serType);
@@ -1086,7 +1086,7 @@ static celix_status_t 
pubsub_topologyManager_topology(pubsub_topology_manager_t
         while (hashMapIterator_hasNext(&iter)) {
             pstm_topic_receiver_or_sender_entry_t *entry = 
hashMapIterator_nextValue(&iter);
             if (entry->endpoint == NULL) {
-                fprintf(os, "|- Pending Topic Sender for %s/%s:\n", 
entry->scope, entry->topic);
+                fprintf(os, "|- Pending Topic Sender for %s/%s:\n", 
entry->scope == NULL ? "(null)" : entry->scope, entry->topic);
                 const char *requestedQos = 
celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, 
"(None)");
                 const char *requestedConfig = 
celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)");
                 const char *requestedSer = 
celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, 
"(None)");
@@ -1111,7 +1111,7 @@ static celix_status_t 
pubsub_topologyManager_topology(pubsub_topology_manager_t
         while (hashMapIterator_hasNext(&iter)) {
             pstm_topic_receiver_or_sender_entry_t *entry = 
hashMapIterator_nextValue(&iter);
             if (entry->endpoint == NULL) {
-                fprintf(os, "|- Topic Receiver for %s/%s:\n", entry->scope, 
entry->topic);
+                fprintf(os, "|- Topic Receiver for %s/%s:\n", entry->scope == 
NULL ? "(null)" : entry->scope, entry->topic);
                 const char *requestedQos = 
celix_properties_get(entry->topicProperties, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, 
"(None)");
                 const char *requestedConfig = 
celix_properties_get(entry->topicProperties, PUBSUB_ADMIN_TYPE_KEY, "(None)");
                 const char *requestedSer = 
celix_properties_get(entry->topicProperties, PUBSUB_SERIALIZER_TYPE_KEY, 
"(None)");

Reply via email to