This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch bugfix/zmq_wrong_sender_connections in repository https://gitbox.apache.org/repos/asf/celix.git
commit 44d6a5515c004de4c221a40c426099dfb780dfcf Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Jun 29 15:39:07 2020 +0200 Fixes an issue in endpoint endpoint discovery. The topic and scope of already discovered endpoints where not checked before connecting to them. --- .../pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c | 2 +- .../pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c | 2 +- .../src/pubsub_websocket_admin.c | 2 +- .../pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c | 2 +- bundles/pubsub/pubsub_spi/CMakeLists.txt | 7 +++- bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt | 23 +++++++++++ .../gtest/src/PubSubEndpointUtilsTestSuite.cc | 46 ++++++++++++++++++++++ .../pubsub/pubsub_spi/include/pubsub_endpoint.h | 9 ++++- bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c | 2 +- .../pubsub/pubsub_spi/src/pubsub_endpoint_match.c | 10 +++++ libs/utils/gtest/src/LogUtilsTestSuite.cc | 4 +- 11 files changed, 100 insertions(+), 9 deletions(-) diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c index 67d4333..c13442f 100644 --- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c +++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c @@ -549,7 +549,7 @@ celix_status_t pubsub_tcpAdmin_setupTopicReceiver(void *handle, const char *scop hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map); while (hashMapIterator_hasNext(&iter)) { celix_properties_t *endpoint = hashMapIterator_nextValue(&iter); - if (pubsub_tcpAdmin_endpointIsPublisher(endpoint)) { + if (pubsub_tcpAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) { pubsub_tcpAdmin_connectEndpointToReceiver(psa, receiver, endpoint); } } diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c index d8fdac1..e152b94 100644 --- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c +++ b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_udpmc_admin.c @@ -410,7 +410,7 @@ celix_status_t pubsub_udpmcAdmin_setupTopicReceiver(void *handle, const char *sc hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map); while (hashMapIterator_hasNext(&iter)) { celix_properties_t *endpoint = hashMapIterator_nextValue(&iter); - if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint)) { + if (pubsub_udpmcAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) { pubsub_udpmcAdmin_connectEndpointToReceiver(psa, receiver, endpoint); } } diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c index a017c01..924ddef 100644 --- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c +++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c @@ -393,7 +393,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char while (hashMapIterator_hasNext(&iter)) { celix_properties_t *endpoint = hashMapIterator_nextValue(&iter); const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL); - if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { + if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0 && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) { pubsub_websocketAdmin_connectEndpointToReceiver(psa, receiver, endpoint); } } diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c index bd2de54..2c1c7b5 100644 --- a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c +++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_zmq_admin.c @@ -599,7 +599,7 @@ celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scop hash_map_iterator_t iter = hashMapIterator_construct(psa->discoveredEndpoints.map); while (hashMapIterator_hasNext(&iter)) { celix_properties_t *endpoint = hashMapIterator_nextValue(&iter); - if (pubsub_zmqAdmin_endpointIsPublisher(endpoint)) { + if (pubsub_zmqAdmin_endpointIsPublisher(endpoint) && pubsubEndpoint_matchWithTopicAndScope(endpoint, topic, scope)) { pubsub_zmqAdmin_connectEndpointToReceiver(psa, receiver, endpoint); } } diff --git a/bundles/pubsub/pubsub_spi/CMakeLists.txt b/bundles/pubsub/pubsub_spi/CMakeLists.txt index 2f4f3e9..8d1c340 100644 --- a/bundles/pubsub/pubsub_spi/CMakeLists.txt +++ b/bundles/pubsub/pubsub_spi/CMakeLists.txt @@ -33,4 +33,9 @@ target_link_libraries(pubsub_spi PUBLIC Celix::pubsub_utils ) add_library(Celix::pubsub_spi ALIAS pubsub_spi) install(TARGETS pubsub_spi EXPORT celix DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT pubsub) -install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT pubsub) \ No newline at end of file +install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT pubsub) + + +if (ENABLE_TESTING) + add_subdirectory(gtest) +endif(ENABLE_TESTING) \ No newline at end of file diff --git a/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt new file mode 100644 index 0000000..faac0ff --- /dev/null +++ b/bundles/pubsub/pubsub_spi/gtest/CMakeLists.txt @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +add_executable(test_pubsub_spi + src/PubSubEndpointUtilsTestSuite.cc +) +target_link_libraries(test_pubsub_spi PRIVATE Celix::pubsub_spi GTest::gtest GTest::gtest_main) +target_compile_options(test_pubsub_spi PRIVATE -std=c++14) #Note test code is allowed to be C++14 +setup_target_for_coverage(test_pubsub_spi SCAN_DIR ..) diff --git a/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc b/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc new file mode 100644 index 0000000..eecdc74 --- /dev/null +++ b/bundles/pubsub/pubsub_spi/gtest/src/PubSubEndpointUtilsTestSuite.cc @@ -0,0 +1,46 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include "gtest/gtest.h" + +#include "pubsub_endpoint.h" + +class PubSubEndpointUtilsTestSuite : public ::testing::Test { +public: +}; + + +TEST_F(PubSubEndpointUtilsTestSuite, pubsubEndpoint_matchWithTopicAndScope) { + celix_properties_t* endpoint = celix_properties_create(); + celix_properties_set(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "topic"); + + EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", nullptr)); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topicaa", nullptr)); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "default")); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scope")); + + celix_properties_set(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "scope"); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", nullptr)); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topicaa", nullptr)); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "default")); + EXPECT_TRUE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scope")); + EXPECT_FALSE(pubsubEndpoint_matchWithTopicAndScope(endpoint, "topic", "scopeaa")); + + celix_properties_destroy(endpoint); +} diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h index 1cd966c..e1f8f33 100644 --- a/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h +++ b/bundles/pubsub/pubsub_spi/include/pubsub_endpoint.h @@ -174,7 +174,14 @@ bool pubsubEndpoint_match( long *outSerializerSvcId, long *outProtocolSvcId); - +/** + * Match an endpoint with a topic & scope. + * @param endpoint The endpoints (mandatory) + * @param topic The topic (mandatory) + * @param scope The scope (can be NULL) + * @return true if the endpoint is for the provide topic and scope); + */ +bool pubsubEndpoint_matchWithTopicAndScope(const celix_properties_t* endpoint, const char *topic, const char *scope); #ifdef __cplusplus diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c index a98044f..2aa052d 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint.c @@ -204,7 +204,7 @@ char* pubsubEndpoint_createScopeTopicKey(const char* scope, const char* topic) { if (scope != NULL) { asprintf(&result, "%s:%s", scope, topic); } else { - asprintf(&result, "default:%s", topic); + asprintf(&result, "scopeless %s", topic); } return result; } diff --git a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c index 659ad5d..cc18beb 100644 --- a/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c +++ b/bundles/pubsub/pubsub_spi/src/pubsub_endpoint_match.c @@ -22,6 +22,7 @@ #include <pubsub_endpoint.h> #include <pubsub_serializer.h> #include <pubsub_protocol.h> +#include <celix_api.h> #include "service_reference.h" @@ -322,3 +323,12 @@ bool pubsubEndpoint_match( return match; } + +bool pubsubEndpoint_matchWithTopicAndScope(const celix_properties_t* endpoint, const char *topic, const char *scope) { + const char *endpointScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); + const char *endpointTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); + if (celix_utils_stringEquals(topic, endpointTopic) && celix_utils_stringEquals(scope, endpointScope)) { + return true; + } + return false; +} \ No newline at end of file diff --git a/libs/utils/gtest/src/LogUtilsTestSuite.cc b/libs/utils/gtest/src/LogUtilsTestSuite.cc index 0123ee6..a63bd43 100644 --- a/libs/utils/gtest/src/LogUtilsTestSuite.cc +++ b/libs/utils/gtest/src/LogUtilsTestSuite.cc @@ -20,11 +20,11 @@ #include <gtest/gtest.h> #include "celix_log_utils.h" +#include "hash_map.h" +#include "utils.h" class LogUtilsTestSuite : public ::testing::Test {}; - - TEST_F(LogUtilsTestSuite, LogLevelToString) { EXPECT_STREQ("trace", celix_logUtils_logLevelToString(CELIX_LOG_LEVEL_TRACE)); EXPECT_STREQ("debug", celix_logUtils_logLevelToString(CELIX_LOG_LEVEL_DEBUG));
