This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch bugfix/zmq_wrong_sender_connections in repository https://gitbox.apache.org/repos/asf/celix.git
commit e10d9835c8fba6c4086406d36841431e9810fdd6 Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Jun 29 16:26:30 2020 +0200 Removes adding of default scope in pubsub discovery. If no scope is set for a publisher/subscriber this needs to be reflected in the endpoints so that correct matching can occur. --- .../examples/pubsub/publisher/CMakeLists.txt | 1 + .../examples/pubsub/subscriber/CMakeLists.txt | 2 +- .../src/pubsub_tcp_topic_receiver.c | 9 ++++--- .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 6 ++++- .../src/pubsub_zmq_topic_receiver.c | 6 ++++- .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 6 ++++- .../pubsub_discovery/src/pubsub_discovery_impl.c | 29 ++++++++++++---------- .../pubsub/pubsub_spi/include/pubsub_constants.h | 5 ---- bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 2 -- .../src/pubsub_topology_manager.c | 18 +++++++------- 10 files changed, 47 insertions(+), 37 deletions(-) diff --git a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt index c68d533..3b9ea50 100644 --- a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt +++ b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt @@ -25,6 +25,7 @@ add_celix_bundle(celix_pubsub_poi_publisher target_link_libraries(celix_pubsub_poi_publisher PRIVATE Celix::framework Celix::pubsub_api) target_include_directories(celix_pubsub_poi_publisher PRIVATE private/include) +target_compile_definitions(celix_pubsub_poi_publisher PRIVATE USE_SCOPE) celix_bundle_files(celix_pubsub_poi_publisher ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor diff --git a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt index 94acdb2..8bee482 100644 --- a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt +++ b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt @@ -25,7 +25,7 @@ add_celix_bundle(celix_pubsub_poi_subscriber target_link_libraries(celix_pubsub_poi_subscriber PRIVATE Celix::framework Celix::pubsub_api) target_include_directories(celix_pubsub_poi_subscriber PRIVATE private/include) - +target_compile_definitions(celix_pubsub_poi_subscriber PRIVATE USE_SCOPE) celix_bundle_files(celix_pubsub_poi_subscriber ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c index eb6afbd..8ecb7df 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c @@ -641,10 +641,11 @@ static void *psa_tcp_recvThread(void *data) { pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) { pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result)); - snprintf(result->scope, - PUBSUB_AMDIN_METRICS_NAME_MAX, - "%s", - receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope); + if (receiver->scope != NULL) { + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope); + } else { + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, ""); + } snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic); int msgTypesCount = 0; diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c index 47dc888..809ab7f 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c @@ -449,7 +449,11 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender) { pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result)); - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope); + if (sender->scope != NULL) { + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope); + } else { + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, ""); + } snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic); celixThreadMutex_lock(&sender->boundedServices.mutex); size_t count = 0; diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c index 088474a..2ae58af 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c @@ -711,7 +711,11 @@ static void* psa_zmq_recvThread(void * data) { pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver) { pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result)); - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope); + if (receiver->scope != NULL) { + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope); + } else { + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, ""); + } snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic); int msgTypesCount = 0; diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c index 413f1b3..85c488b 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c @@ -464,7 +464,11 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender) { pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result)); - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope); + if (sender->scope != NULL) { + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope); + } else { + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, ""); + } snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic); celixThreadMutex_lock(&sender->boundedServices.mutex); size_t count = 0; diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c index c6d1aa9..5e1b702 100644 --- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c +++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c @@ -398,7 +398,11 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope clock_gettime(CLOCK_MONOTONIC, &entry->createTime); entry->isSet = false; entry->properties = celix_properties_copy(endpoint); - asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : scope, topic, uuid); + if (scope == NULL) { + asprintf(&entry->key, "/pubsub/%s/%s/%s", config, topic, uuid); + } else { + asprintf(&entry->key, "/pubsub/%s/%s__%s/%s", config, scope, topic, uuid); + } const char *hashKey = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, NULL); celixThreadMutex_lock(&disc->announcedEndpointsMutex); @@ -409,7 +413,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope celixThreadCondition_broadcast(&disc->waitCond); celixThreadMutex_unlock(&disc->runningMutex); } else if (valid) { - L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured visibility is %s\n", scope == NULL ? "(null)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY, visibility); + L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured visibility is %s\n", scope == NULL ? "(empty)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY, visibility); } if (!valid) { @@ -464,8 +468,8 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel if (disc->verbose) { const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!"); const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!"); - const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!"); + const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "(no serialization)"); + const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no protocol)"); L_INFO("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol is %s.\n", uuid, type, admin, ser, prot); } @@ -495,8 +499,8 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc, if (disc->verbose) { const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!"); const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!"); - const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!"); + const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "(no serialization)"); + const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no protocol)"); L_INFO("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol = %s.\n", uuid, type, admin, ser, prot); } @@ -572,7 +576,6 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); - //TODO add support for query (scope / topic) fprintf(os, "\n"); fprintf(os, "Discovery configuration:\n"); @@ -589,11 +592,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at while (hashMapIterator_hasNext(&iter)) { celix_properties_t *ep = hashMapIterator_nextValue(&iter); const char *uuid = celix_properties_get(ep, PUBSUB_ENDPOINT_UUID, "!Error!"); - const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!"); + const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)"); const char *topic = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!"); const char *adminType = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); - const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); + const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)"); + const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)"); const char *type = celix_properties_get(ep, PUBSUB_ENDPOINT_TYPE, "!Error!"); fprintf(os, "Endpoint %s:\n", uuid); fprintf(os, " |- type = %s\n", type); @@ -612,11 +615,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at while (hashMapIterator_hasNext(&iter)) { pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter); const char *uuid = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, "!Error!"); - const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!"); + const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)"); const char *topic = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!"); const char *adminType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); - const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); + const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)"); + const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)"); const char *type = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TYPE, "!Error!"); int age = (int)(now.tv_sec - entry->createTime.tv_sec); fprintf(os, "Endpoint %s:\n", uuid); diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h index 00ee6b4..671b874 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h @@ -39,9 +39,4 @@ */ #define PUBSUB_ENDPOINT_LOCAL_VISIBILITY "local" -/** - * Default scope, if not scope is specified endpoints are published using this scope - */ -#define PUBSUB_DEFAULT_ENDPOINT_SCOPE "default" - #endif /* PUBSUB_CONSTANTS_H_ */ diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c index 2aa052d..540ecda 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c @@ -65,8 +65,6 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID, if (scope != NULL) { celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope); - } else { - celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, PUBSUB_DEFAULT_ENDPOINT_SCOPE); } if (topic != NULL) { diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c index 2b063ef..434de1e 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c @@ -1009,11 +1009,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t const char *cn = celix_properties_get(discovered->endpoint, "container_name", "!Error!"); const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!"); const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!"); - const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)"); + const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)"); const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!"); const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); - const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); + const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)"); + const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)"); fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid); fprintf(os, " |- container name = %s\n", cn); fprintf(os, " |- fw uuid = %s\n", fwuuid); @@ -1044,10 +1044,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t } const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!"); const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); - const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); + const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)"); + const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)"); fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid); - fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(null)" : entry->scope); + fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(no scope)" : entry->scope); fprintf(os, " |- topic = %s\n", entry->topic); fprintf(os, " |- admin type = %s\n", adminType); fprintf(os, " |- serializer = %s\n", serType); @@ -1074,10 +1074,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t } const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!"); const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); - const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); + const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)"); + const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)"); fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid); - fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(null)" : entry->scope); + fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(no scope)" : entry->scope); fprintf(os, " |- topic = %s\n", entry->topic); fprintf(os, " |- admin type = %s\n", adminType); fprintf(os, " |- serializer = %s\n", serType);
