This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch bugfix/zmq_wrong_sender_connections in repository https://gitbox.apache.org/repos/asf/celix.git
commit d3d908a4f0b737d8f6f818af4f38fa5a7e960c6e Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Jun 29 16:43:11 2020 +0200 Revert "Removes adding of default scope in pubsub discovery." This reverts commit e10d9835c8fba6c4086406d36841431e9810fdd6. --- .../examples/pubsub/publisher/CMakeLists.txt | 1 - .../examples/pubsub/subscriber/CMakeLists.txt | 2 +- .../src/pubsub_tcp_topic_receiver.c | 9 +++---- .../pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c | 6 +---- .../src/pubsub_zmq_topic_receiver.c | 6 +---- .../pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c | 6 +---- .../pubsub_discovery/src/pubsub_discovery_impl.c | 29 ++++++++++------------ .../pubsub/pubsub_spi/include/pubsub_constants.h | 5 ++++ bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 2 ++ .../src/pubsub_topology_manager.c | 18 +++++++------- 10 files changed, 37 insertions(+), 47 deletions(-) diff --git a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt index 3b9ea50..c68d533 100644 --- a/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt +++ b/bundles/pubsub/examples/pubsub/publisher/CMakeLists.txt @@ -25,7 +25,6 @@ add_celix_bundle(celix_pubsub_poi_publisher target_link_libraries(celix_pubsub_poi_publisher PRIVATE Celix::framework Celix::pubsub_api) target_include_directories(celix_pubsub_poi_publisher PRIVATE private/include) -target_compile_definitions(celix_pubsub_poi_publisher PRIVATE USE_SCOPE) celix_bundle_files(celix_pubsub_poi_publisher ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor diff --git a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt index 8bee482..94acdb2 100644 --- a/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt +++ b/bundles/pubsub/examples/pubsub/subscriber/CMakeLists.txt @@ -25,7 +25,7 @@ add_celix_bundle(celix_pubsub_poi_subscriber target_link_libraries(celix_pubsub_poi_subscriber PRIVATE Celix::framework Celix::pubsub_api) target_include_directories(celix_pubsub_poi_subscriber PRIVATE private/include) -target_compile_definitions(celix_pubsub_poi_subscriber PRIVATE USE_SCOPE) + celix_bundle_files(celix_pubsub_poi_subscriber ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c index 8ecb7df..eb6afbd 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c @@ -641,11 +641,10 @@ static void *psa_tcp_recvThread(void *data) { pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) { pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result)); - if (receiver->scope != NULL) { - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope); - } else { - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, ""); - } + snprintf(result->scope, + PUBSUB_AMDIN_METRICS_NAME_MAX, + "%s", + receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope); snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic); int msgTypesCount = 0; diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c index 809ab7f..47dc888 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c @@ -449,11 +449,7 @@ static void psa_tcp_ungetPublisherService(void *handle, const celix_bundle_t *re pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender) { pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result)); - if (sender->scope != NULL) { - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope); - } else { - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, ""); - } + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope); snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic); celixThreadMutex_lock(&sender->boundedServices.mutex); size_t count = 0; diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c index 2ae58af..088474a 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c @@ -711,11 +711,7 @@ static void* psa_zmq_recvThread(void * data) { pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver) { pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result)); - if (receiver->scope != NULL) { - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope); - } else { - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, ""); - } + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope); snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic); int msgTypesCount = 0; diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c index 85c488b..413f1b3 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_sender.c @@ -464,11 +464,7 @@ static void psa_zmq_ungetPublisherService(void *handle, const celix_bundle_t *re pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender) { pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result)); - if (sender->scope != NULL) { - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope); - } else { - snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, ""); - } + snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope); snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic); celixThreadMutex_lock(&sender->boundedServices.mutex); size_t count = 0; diff --git a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c index 5e1b702..c6d1aa9 100644 --- a/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c +++ b/bundles/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c @@ -398,11 +398,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope clock_gettime(CLOCK_MONOTONIC, &entry->createTime); entry->isSet = false; entry->properties = celix_properties_copy(endpoint); - if (scope == NULL) { - asprintf(&entry->key, "/pubsub/%s/%s/%s", config, topic, uuid); - } else { - asprintf(&entry->key, "/pubsub/%s/%s__%s/%s", config, scope, topic, uuid); - } + asprintf(&entry->key, "/pubsub/%s/%s/%s/%s", config, scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : scope, topic, uuid); const char *hashKey = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, NULL); celixThreadMutex_lock(&disc->announcedEndpointsMutex); @@ -413,7 +409,7 @@ celix_status_t pubsub_discovery_announceEndpoint(void *handle, const celix_prope celixThreadCondition_broadcast(&disc->waitCond); celixThreadMutex_unlock(&disc->runningMutex); } else if (valid) { - L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured visibility is %s\n", scope == NULL ? "(empty)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY, visibility); + L_DEBUG("[PSD] Ignoring endpoint %s/%s because the visibility is not %s. Configured visibility is %s\n", scope == NULL ? "(null)" : scope, topic, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY, visibility); } if (!valid) { @@ -468,8 +464,8 @@ static void pubsub_discovery_addDiscoveredEndpoint(pubsub_discovery_t *disc, cel if (disc->verbose) { const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!"); const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "(no serialization)"); - const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no protocol)"); + const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!"); + const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!"); L_INFO("[PSD] Adding discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol is %s.\n", uuid, type, admin, ser, prot); } @@ -499,8 +495,8 @@ static void pubsub_discovery_removeDiscoveredEndpoint(pubsub_discovery_t *disc, if (disc->verbose) { const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!"); const char *admin = celix_properties_get(endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "(no serialization)"); - const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "(no protocol)"); + const char *ser = celix_properties_get(endpoint, PUBSUB_SERIALIZER_TYPE_KEY, "!Error!"); + const char *prot = celix_properties_get(endpoint, PUBSUB_PROTOCOL_TYPE_KEY, "!Error!"); L_INFO("[PSD] Removing discovered endpoint %s. type is %s, admin is %s, serializer is %s, protocol = %s.\n", uuid, type, admin, ser, prot); } @@ -576,6 +572,7 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); + //TODO add support for query (scope / topic) fprintf(os, "\n"); fprintf(os, "Discovery configuration:\n"); @@ -592,11 +589,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at while (hashMapIterator_hasNext(&iter)) { celix_properties_t *ep = hashMapIterator_nextValue(&iter); const char *uuid = celix_properties_get(ep, PUBSUB_ENDPOINT_UUID, "!Error!"); - const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)"); + const char *scope = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!"); const char *topic = celix_properties_get(ep, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!"); const char *adminType = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)"); - const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)"); + const char *serType = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + const char *protType = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); const char *type = celix_properties_get(ep, PUBSUB_ENDPOINT_TYPE, "!Error!"); fprintf(os, "Endpoint %s:\n", uuid); fprintf(os, " |- type = %s\n", type); @@ -615,11 +612,11 @@ bool pubsub_discovery_executeCommand(void *handle, const char * commandLine __at while (hashMapIterator_hasNext(&iter)) { pubsub_announce_entry_t *entry = hashMapIterator_nextValue(&iter); const char *uuid = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_UUID, "!Error!"); - const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)"); + const char *scope = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!"); const char *topic = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!"); const char *adminType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)"); - const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)"); + const char *serType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + const char *protType = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); const char *type = celix_properties_get(entry->properties, PUBSUB_ENDPOINT_TYPE, "!Error!"); int age = (int)(now.tv_sec - entry->createTime.tv_sec); fprintf(os, "Endpoint %s:\n", uuid); diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h index 671b874..00ee6b4 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_constants.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_constants.h @@ -39,4 +39,9 @@ */ #define PUBSUB_ENDPOINT_LOCAL_VISIBILITY "local" +/** + * Default scope, if not scope is specified endpoints are published using this scope + */ +#define PUBSUB_DEFAULT_ENDPOINT_SCOPE "default" + #endif /* PUBSUB_CONSTANTS_H_ */ diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c index 540ecda..2aa052d 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c @@ -65,6 +65,8 @@ static void pubsubEndpoint_setFields(celix_properties_t *ep, const char* fwUUID, if (scope != NULL) { celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, scope); + } else { + celix_properties_set(ep, PUBSUB_ENDPOINT_TOPIC_SCOPE, PUBSUB_DEFAULT_ENDPOINT_SCOPE); } if (topic != NULL) { diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c index 434de1e..2b063ef 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c @@ -1009,11 +1009,11 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t const char *cn = celix_properties_get(discovered->endpoint, "container_name", "!Error!"); const char *fwuuid = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, "!Error!"); const char *type = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TYPE, "!Error!"); - const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(no scope)"); + const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "(null)"); const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!"); const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)"); - const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)"); + const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + const char *protType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid); fprintf(os, " |- container name = %s\n", cn); fprintf(os, " |- fw uuid = %s\n", fwuuid); @@ -1044,10 +1044,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t } const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!"); const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)"); - const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)"); + const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); fprintf(os, "|- Topic Sender for endpoint %s:\n", uuid); - fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(no scope)" : entry->scope); + fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(null)" : entry->scope); fprintf(os, " |- topic = %s\n", entry->topic); fprintf(os, " |- admin type = %s\n", adminType); fprintf(os, " |- serializer = %s\n", serType); @@ -1074,10 +1074,10 @@ static celix_status_t pubsub_topologyManager_topology(pubsub_topology_manager_t } const char *uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, "!Error!"); const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); - const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "(no serialization)"); - const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "(no protocol)"); + const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + const char *protType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_PROTOCOL, "!Error!"); fprintf(os, "|- Topic Receiver for endpoint %s:\n", uuid); - fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(no scope)" : entry->scope); + fprintf(os, " |- scope = %s\n", entry->scope == NULL ? "(null)" : entry->scope); fprintf(os, " |- topic = %s\n", entry->topic); fprintf(os, " |- admin type = %s\n", adminType); fprintf(os, " |- serializer = %s\n", serType);
