This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/master by this push: new a11b4c9 Bugfix/zmq wrong sender connections (#264) a11b4c9 is described below commit a11b4c9433a3a69245f374a8a4fb3a2935491bd4 Author: Pepijn Noltes <pepijnnol...@gmail.com> AuthorDate: Tue Jun 30 16:05:34 2020 +0200 Bugfix/zmq wrong sender connections (#264) * Fixes an issue in endpoint endpoint discovery. * Changes handling of scopeless and default scoped topic sendes/receivers. In the udpated setup a scopeless (scope==NULL) and default scoped (scope="default") will have their own topic sender and receiver, but will connect to each other through discovery. This is done so that requesting a scopeless publisher will result in a service without a scope property and requesting a default scoped publisher will result in a publisher with a "default" property. --- .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 16 ++-- .../src/pubsub_tcp_topic_receiver.c | 3 + .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c | 15 ++-- .../src/pubsub_udpmc_topic_receiver.c | 5 +- .../src/pubsub_websocket_admin.c | 15 ++-- .../src/pubsub_websocket_topic_receiver.c | 3 + .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 35 ++++++--- .../src/pubsub_zmq_topic_receiver.c | 7 +- bundles/pubsub/pubsub_spi/CMakeLists.txt | 7 +- bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt | 25 ++++++ .../gtest/src/PubSubEndpointUtilsTestSuite.cc | 47 +++++++++++ .../pubsub/pubsub_spi/include/pubsub_endpoint.h | 16 +++- bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 3 +- .../pubsub/pubsub_spi/src/pubsub_endpoint_match.c | 11 +++ bundles/pubsub/test/test/test_runner.cc | 90 +++++++++++----------- libs/utils/gtest/src/LogUtilsTestSuite.cc | 2 - 16 files changed, 211 insertions(+), 89 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c index 67d4333..e7ad284 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c @@ -549,7 +549,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map); while (hashMapIterator_hasNext(&iter)) { celix_properties_t *endpoint = hashMapIterator_nextValue(&iter); - if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) { + if (pubsub_tcpAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) { pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint); } } @@ -610,15 +610,13 @@ celix_status_t pubsub_tcpAdmin_addDiscoveredEndpoint(void *handle, const celix_p if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) { celixThreadMutex_lock(&psa->topicReceivers.mutex); - const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); - const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); - char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - - pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key); - if (receiver != NULL) { - pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_tcp_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_tcpTopicReceiver_topic(receiver), pubsub_tcpTopicReceiver_scope(receiver))) { + pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + } } - free(key); celixThreadMutex_unlock(&psa->topicReceivers.mutex); } diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c index abc44f4..4fa4586 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c @@ -448,6 +448,9 @@ static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const //not the same scope. ignore return; } + } else { + //receiver scope is not NULL, but subScope is NULL -> ignore + return; } celixThreadMutex_lock(&receiver->subscribers.mutex); diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c index d8fdac1..18657a1 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c @@ -410,7 +410,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map); while (hashMapIterator_hasNext(&iter)) { celix_properties_t *endpoint = hashMapIterator_nextValue(&iter); - if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) { + if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) { pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint); } } @@ -472,13 +472,12 @@ celix_status_t pubsub_udpmcAdmin_addEndpoint(void *handle, const celix_propertie if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) { celixThreadMutex_lock(&psa->topicReceivers.mutex); - const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); - const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); - char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - - pubsub_udpmc_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key); - if (receiver != NULL) { - pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_udpmc_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_udpmcTopicReceiver_topic(receiver), pubsub_udpmcTopicReceiver_scope(receiver))) { + pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + } } celixThreadMutex_unlock(&psa->topicReceivers.mutex); } diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c index c8ad961..5f82019 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_topic_receiver.c @@ -338,11 +338,14 @@ static void pubsub_udpmcTopicReceiver_addSubscriber(void *handle, void *svc, con if (subScope != NULL) { return; } - } else { + } else if (subScope != NULL) { if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) { //not the same scope. ignore return; } + } else { + //receiver scope is not NULL, but subScope is NULL -> ignore + return; } celixThreadMutex_lock(&receiver->subscribers.mutex); diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c index a017c01..ba33252 100644 --- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c @@ -393,7 +393,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char while (hashMapIterator_hasNext(&iter)) { celix_properties_t *endpoint = hashMapIterator_nextValue(&iter); const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL); - if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { + if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0 && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) { pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint); } } @@ -460,13 +460,12 @@ celix_status_t pubsub_websocketAdmin_addDiscoveredEndpoint(void *handle, const c if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { celixThreadMutex_lock(&psa->topicReceivers.mutex); - const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); - const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); - char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - - pubsub_websocket_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key); - if (receiver != NULL) { - pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_websocket_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_websocketTopicReceiver_topic(receiver), pubsub_websocketTopicReceiver_scope(receiver))) { + pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + } } celixThreadMutex_unlock(&psa->topicReceivers.mutex); } diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c index ec35af3..7d8cd00 100644 --- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c @@ -410,6 +410,9 @@ static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc, //not the same scope. ignore return; } + } else { + //receiver scope is not NULL, but subScope is NULL -> ignore + return; } celixThreadMutex_lock(&receiver->subscribers.mutex); diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c index 83eaed1..b16321a 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c @@ -601,7 +601,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map); while (hashMapIterator_hasNext(&iter)) { celix_properties_t *endpoint = hashMapIterator_nextValue(&iter); - if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) { + if (pubsub_zmqAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) { pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint); } } @@ -660,15 +660,13 @@ celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_p if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) { celixThreadMutex_lock(&psa->topicReceivers.mutex); - const char *scope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); - const char *topic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); - char *key = pubsubEndpoint_createScopeTopicKey(scope, topic); - - pubsub_zmq_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key); - if (receiver != NULL) { - pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + hash_map_iterator_t iter = hashMapIterator_construct(psa->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter); + if (pubsubEndpoint_matchWithTopicAndScope(endpoint, pubsub_zmqTopicReceiver_topic(receiver), pubsub_zmqTopicReceiver_scope(receiver))) { + pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint); + } } - free(key); celixThreadMutex_unlock(&psa->topicReceivers.mutex); } @@ -725,10 +723,27 @@ celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celi return status; } -bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) { +bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *out, FILE *errStream __attribute__((unused))) { pubsub_zmq_admin_t *psa = handle; celix_status_t status = CELIX_SUCCESS; + + char *line = celix_utils_strdup(commandLine); + char *token = line; + strtok_r(line, " ", &token); //first token is command name + strtok_r(NULL, " ", &token); //second token is sub command + + if (celix_utils_stringEquals(token, "nr_of_receivers")) { + celixThreadMutex_lock(&psa->topicReceivers.mutex); + fprintf(out,"%i\n", hashMap_size(psa->topicReceivers.map)); + celixThreadMutex_unlock(&psa->topicReceivers.mutex); + } + if (celix_utils_stringEquals(token, "nr_of_senders")) { + celixThreadMutex_lock(&psa->topicSenders.mutex); + fprintf(out, "%i\n", hashMap_size(psa->topicSenders.map)); + celixThreadMutex_unlock(&psa->topicSenders.mutex); + } + fprintf(out, "\n"); fprintf(out, "Topic Senders:\n"); celixThreadMutex_lock(&psa->serializers.mutex); diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c index b19adb0..11f2b8f 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_topic_receiver.c @@ -419,8 +419,8 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const long bndId = celix_bundle_getId(bnd); long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1); const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL); - if (receiver->scope == NULL){ - if (subScope != NULL){ + if (receiver->scope == NULL) { + if (subScope != NULL) { return; } } else if (subScope != NULL) { @@ -428,6 +428,9 @@ static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const //not the same scope. ignore return; } + } else { + //receiver scope is not NULL, but subScope is NULL -> ignore + return; } celixThreadMutex_lock(&receiver->subscribers.mutex); diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt index 2f4f3e9..8d1c340 100644 --- a/bundles/pubsub/pubsub_spi/CMakeLists.txt +++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt @@ -33,4 +33,9 @@ target_link_libraries(pubsub_spi PUBLIC Celix::pubsub_utils ) add_library(Celix::pubsub_spi ALIAS pubsub_spi) install(TARGETS pubsub_spi EXPORT celix DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT pubsub) -install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT pubsub) \ No newline at end of file +install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT pubsub) + + +if (ENABLE_TESTING) + add_subdirectory(gtest) +endif(ENABLE_TESTING) \ No newline at end of file diff --git a/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt new file mode 100644 index 0000000..5cc80be --- /dev/null +++ b/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +add_executable(test_pubsub_spi + src/PubSubEndpointUtilsTestSuite.cc +) +target_link_libraries(test_pubsub_spi PRIVATE Celix::pubsub_spi GTest::gtest GTest::gtest_main) +target_compile_options(test_pubsub_spi PRIVATE -std=c++14) #Note test code is allowed to be C++14 + +#Seems to be an issue with coverage setup, for now disabled +#setup_target_for_coverage(test_pubsub_spi SCAN_DIR ..) diff --git a/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc b/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc new file mode 100644 index 0000000..802e8f0 --- /dev/null +++ b/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc @@ -0,0 +1,47 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include "gtest/gtest.h" + +#include "pubsub_endpoint.h" + +TEST(PubSubEndpointUtilsTestSuite, pubsubEndpoint_matchWithTopicAndScope) { + celix_properties_t* endpoint = celix_properties_create(); + celix_properties_set(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "topic"); + + EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", nullptr)); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topicaa", nullptr)); + EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "default")); //Note "default" is the same as NULL scope + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scope")); + + celix_properties_set(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "scope"); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", nullptr)); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topicaa", nullptr)); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "default")); + EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scope")); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scopeaa")); + + celix_properties_set(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "default"); + EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", nullptr)); //Note NULL is the same as "default" scope + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topicaa", nullptr)); + EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "default")); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scope")); + + celix_properties_destroy(endpoint); +} diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h index 1cd966c..2e1cfe9 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h @@ -65,7 +65,12 @@ bool pubsubEndpoint_equals(const celix_properties_t *psEp1, const celix_properti bool pubsubEndpoint_isValid(const celix_properties_t *endpointProperties, bool requireAdminType, bool requireSerializerType); - +/** + * Create a key based on scope an topic. + * Scope can be NULL. + * Note that NULL, "topic" and "default", "topic" will result in different keys + * @return a newly created key. caller is responsible for freeing the string array. + */ char *pubsubEndpoint_createScopeTopicKey(const char *scope, const char *topic); /** @@ -174,7 +179,14 @@ bool pubsubEndpoint_match( long *outSerializerSvcId, long *outProtocolSvcId); - +/** + * Match an endpoint with a topic & scope. + * @param endpoint The endpoints (mandatory) + * @param topic The topic (mandatory) + * @param scope The scope (can be NULL) + * @return true if the endpoint is for the provide topic and scope); + */ +bool pubsubEndpoint_matchWithTopicAndScope(const celix_properties_t* endpoint, const char *topic, const char *scope); #ifdef __cplusplus diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c index a98044f..c1a715e 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c @@ -204,7 +204,8 @@ char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) { if (scope != NULL) { asprintf(&result, "%s:%s", scope, topic); } else { - asprintf(&result, "default:%s", topic); + //NOTE scope == NULL, equal to scope="default" + asprintf(&result, "%s", topic); } return result; } diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c index 659ad5d..041f0f5 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c @@ -22,6 +22,7 @@ #include <pubsub_endpoint.h> #include <pubsub_serializer.h> #include <pubsub_protocol.h> +#include <celix_api.h> #include "service_reference.h" @@ -322,3 +323,13 @@ bool pubsubEndpoint_match( return match; } + +bool pubsubEndpoint_matchWithTopicAndScope(const celix_properties_t* endpoint, const char *topic, const char *scope) { + const char *endpointScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, PUBSUB_DEFAULT_ENDPOINT_SCOPE); + const char *endpointTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); + if (scope == NULL) { + scope = PUBSUB_DEFAULT_ENDPOINT_SCOPE; + } + + return celix_utils_stringEquals(topic, endpointTopic) && celix_utils_stringEquals(scope, endpointScope); +} diff --git a/bundles/pubsub/test/test/test_runner.cc b/bundles/pubsub/test/test/test_runner.cc index 9ed12da..3ef3faa 100644 --- a/bundles/pubsub/test/test/test_runner.cc +++ b/bundles/pubsub/test/test/test_runner.cc @@ -31,58 +31,58 @@ int main(int argc, char **argv) { } TEST_GROUP(PUBSUB_INT_GROUP) { - celix_framework_t *fw = NULL; - celix_bundle_context_t *ctx = NULL; - void setup() override { - celixLauncher_launch("config.properties", &fw); - ctx = celix_framework_getFrameworkContext(fw); - } + celix_framework_t * fw = NULL; + celix_bundle_context_t *ctx = NULL; + void setup() override { + celixLauncher_launch("config.properties", &fw); + ctx = celix_framework_getFrameworkContext(fw); + } - void teardown() override { - celixLauncher_stop(fw); - celixLauncher_waitForShutdown(fw); - celixLauncher_destroy(fw); - ctx = NULL; - fw = NULL; - } + void teardown() override { + celixLauncher_stop(fw); + celixLauncher_waitForShutdown(fw); + celixLauncher_destroy(fw); + ctx = NULL; + fw = NULL; + } }; TEST_GROUP(PUBSUB_INT_ENV_GROUP) { - celix_framework_t *fw = NULL; - celix_bundle_context_t *ctx = NULL; - void setup() override { - setenv("PSA_TCP_STATIC_BIND_URL_FOR_ping", "tcp://localhost:9001", 1); - setenv("PSA_TCP_STATIC_CONNECT_URL_FOR_ping", "tcp://localhost:9001", 1); - setenv("PSA_UDPMC_STATIC_BIND_PORT_FOR_ping", "9001", 1); - setenv("PSA_UDPMC_STATIC_CONNECT_URLS_FOR_ping", "224.100.0.1:9001", 1); - setenv("PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES_FOR_ping", "127.0.0.1:9001", 1); - setenv("CELIX_HTTP_ADMIN_LISTENING_PORTS", "9001", 1); - setenv("PSA_ZMQ_STATIC_BIND_URL_FOR_ping", "ipc:///tmp/pubsub-envtest", 1); - setenv("PSA_ZMQ_STATIC_CONNECT_URL_FOR_ping", "ipc:///tmp/pubsub-envtest", 1); + celix_framework_t * fw = NULL; + celix_bundle_context_t *ctx = NULL; + void setup() override { + setenv("PSA_TCP_STATIC_BIND_URL_FOR_ping", "tcp://localhost:9001", 1); + setenv("PSA_TCP_STATIC_CONNECT_URL_FOR_ping", "tcp://localhost:9001", 1); + setenv("PSA_UDPMC_STATIC_BIND_PORT_FOR_ping", "9001", 1); + setenv("PSA_UDPMC_STATIC_CONNECT_URLS_FOR_ping", "224.100.0.1:9001", 1); + setenv("PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES_FOR_ping", "127.0.0.1:9001", 1); + setenv("CELIX_HTTP_ADMIN_LISTENING_PORTS", "9001", 1); + setenv("PSA_ZMQ_STATIC_BIND_URL_FOR_ping", "ipc:///tmp/pubsub-envtest", 1); + setenv("PSA_ZMQ_STATIC_CONNECT_URL_FOR_ping", "ipc:///tmp/pubsub-envtest", 1); - celixLauncher_launch("config.properties", &fw); - ctx = celix_framework_getFrameworkContext(fw); - } + celixLauncher_launch("config.properties", &fw); + ctx = celix_framework_getFrameworkContext(fw); + } - void teardown() override { - celixLauncher_stop(fw); - celixLauncher_waitForShutdown(fw); - celixLauncher_destroy(fw); - ctx = NULL; - fw = NULL; - unsetenv("PSA_TCP_STATIC_BIND_URL_FOR_ping"); - unsetenv("PSA_TCP_STATIC_CONNECT_URL_FOR_ping"); - unsetenv("PSA_UDPMC_STATIC_BIND_PORT_FOR_ping"); - unsetenv("PSA_UDPMC_STATIC_CONNECT_URLS_FOR_ping"); - unsetenv("PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES_FOR_ping"); - unsetenv("CELIX_HTTP_ADMIN_LISTENING_PORTS"); - unsetenv("PSA_ZMQ_STATIC_BIND_URL_FOR_ping"); - unsetenv("PSA_ZMQ_STATIC_CONNECT_URL_FOR_ping"); - } + void teardown() override { + celixLauncher_stop(fw); + celixLauncher_waitForShutdown(fw); + celixLauncher_destroy(fw); + ctx = NULL; + fw = NULL; + unsetenv("PSA_TCP_STATIC_BIND_URL_FOR_ping"); + unsetenv("PSA_TCP_STATIC_CONNECT_URL_FOR_ping"); + unsetenv("PSA_UDPMC_STATIC_BIND_PORT_FOR_ping"); + unsetenv("PSA_UDPMC_STATIC_CONNECT_URLS_FOR_ping"); + unsetenv("PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES_FOR_ping"); + unsetenv("CELIX_HTTP_ADMIN_LISTENING_PORTS"); + unsetenv("PSA_ZMQ_STATIC_BIND_URL_FOR_ping"); + unsetenv("PSA_ZMQ_STATIC_CONNECT_URL_FOR_ping"); + } }; void receiveTest(celix_bundle_context_t *ctx) { - constexpr int TRIES = 25; + constexpr int TRIES = 40; constexpr int TIMEOUT = 250000; constexpr int MSG_COUNT = 100; @@ -91,8 +91,8 @@ void receiveTest(celix_bundle_context_t *ctx) { for (int i = 0; i < TRIES; ++i) { count = 0; celix_bundleContext_useService(ctx, CELIX_RECEIVE_COUNT_SERVICE_NAME, &count, [](void *handle, void *svc) { - auto* count_ptr = static_cast<int*>(handle); - auto* count = static_cast<celix_receive_count_service_t*>(svc); + auto *count_ptr = static_cast<int *>(handle); + auto *count = static_cast<celix_receive_count_service_t *>(svc); *count_ptr = count->receiveCount(count->handle); }); printf("Current msg count is %i, waiting for at least %i\n", count, MSG_COUNT); diff --git a/libs/utils/gtest/src/LogUtilsTestSuite.cc b/libs/utils/gtest/src/LogUtilsTestSuite.cc index 0123ee6..f0062b0 100644 --- a/libs/utils/gtest/src/LogUtilsTestSuite.cc +++ b/libs/utils/gtest/src/LogUtilsTestSuite.cc @@ -23,8 +23,6 @@ class LogUtilsTestSuite : public ::testing::Test {}; - - TEST_F(LogUtilsTestSuite, LogLevelToString) { EXPECT_STREQ("trace", celix_logUtils_logLevelToString(CELIX_LOG_LEVEL_TRACE)); EXPECT_STREQ("debug", celix_logUtils_logLevelToString(CELIX_LOG_LEVEL_DEBUG));