This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/509-remove-pubsub in repository https://gitbox.apache.org/repos/asf/celix.git
commit d2e10c5ac70aecab62e08620e616920bc7191bb6 Author: Pepijn Noltes <[email protected]> AuthorDate: Sun Dec 17 16:52:20 2023 +0100 Refactor cxx rsa integration to use ipc concepts --- .../cxx_remote_services/integration/CMakeLists.txt | 57 +++++++-------- bundles/cxx_remote_services/integration/README.md | 8 +++ .../src/RemoteServicesIntegrationTestSuite.cc | 9 ++- .../integration/resources/endpoint_discovery.json | 6 +- .../integration/src/CalculatorConsumer.cc | 13 ---- .../integration/src/CalculatorProvider.cc | 4 +- .../src/TestExportImportRemoteServiceFactory.cc | 83 ++++++++++++---------- 7 files changed, 89 insertions(+), 91 deletions(-) diff --git a/bundles/cxx_remote_services/integration/CMakeLists.txt b/bundles/cxx_remote_services/integration/CMakeLists.txt index bad01c2d..9bb8d7e6 100644 --- a/bundles/cxx_remote_services/integration/CMakeLists.txt +++ b/bundles/cxx_remote_services/integration/CMakeLists.txt @@ -41,39 +41,36 @@ if (CXX_RSA_INTEGRATION) endif() ################# Integration examples ################################## + add_celix_container(RemoteCalculatorProvider + CXX + GROUP rsa + PROPERTIES + CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug + BUNDLES + Celix::ShellCxx + Celix::shell_tui - if (BUILD_LAUNCHER) - add_celix_container(RemoteCalculatorProvider - CXX - GROUP rsa - PROPERTIES - CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug - BUNDLES - Celix::ShellCxx - Celix::shell_tui + #Remote Services + Celix::RemoteServiceAdmin + TestExportImportRemoteServiceFactory #needed to be able to create a ExportedService for ICalculator - #Remote Services - Celix::RemoteServiceAdmin - TestExportImportRemoteServiceFactory #needed to be able to create a ExportedService for ICalculator + CalculatorProvider + ) - CalculatorProvider - ) + add_celix_container(RemoteCalculatorConsumer + CXX + GROUP rsa + PROPERTIES + CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug + BUNDLES + Celix::ShellCxx + Celix::shell_tui - add_celix_container(RemoteCalculatorConsumer - CXX - GROUP rsa - PROPERTIES - CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug - BUNDLES - Celix::ShellCxx - Celix::shell_tui + #Remote Services + Celix::RsaConfiguredDiscovery + Celix::RemoteServiceAdmin + TestExportImportRemoteServiceFactory #needed to be able to create a ExportedService for ICalculator - #Remote Services - Celix::RsaConfiguredDiscovery - Celix::RemoteServiceAdmin - TestExportImportRemoteServiceFactory #needed to be able to create a ExportedService for ICalculator - - CalculatorConsumer - ) - endif () + CalculatorConsumer + ) endif() diff --git a/bundles/cxx_remote_services/integration/README.md b/bundles/cxx_remote_services/integration/README.md new file mode 100644 index 00000000..1a34dccb --- /dev/null +++ b/bundles/cxx_remote_services/integration/README.md @@ -0,0 +1,8 @@ +# C++ Remote Service Amdatu Integration + +This project part tests the integration of the C++ Remote Service Admin implementation with the +C++ configuration-based remote service discovery. + +Because the C++ Remote Service Admin is based on export and import service factories and does not directly +implement a transportation or serializer technology, the integration tests are based on a simple implementation of +inter process communication message queue (IPC mq) for transportation and a simple memcpy for serialization. diff --git a/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc b/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc index 007dd1f8..73ef653d 100644 --- a/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc +++ b/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc @@ -74,9 +74,6 @@ public: } void invokeRemoteCalcService() { - installProviderBundles(); - installConsumerBundles(); - //If a calculator provider bundle is installed I expect a exported calculator interface auto count = serverCtx->useService<ICalculator>() .setFilter("(service.exported.interfaces=*)") @@ -98,8 +95,8 @@ public: /* * Testing the remote service in a while loop till it is successful or 10 seconds has passed. - * Note that because pubsub does not guarantee a connection when used, it is possible - and likely - - * that the first remote test iteration fails due to not yet completely connected pubsub. + * Note that because mq does not guarantee a connection when used, it is possible - and likely - + * that the first remote test iteration fails due to not yet completely connected mq. */ auto start = std::chrono::system_clock::now(); auto now = std::chrono::system_clock::now(); @@ -156,5 +153,7 @@ TEST_F(RemoteServicesIntegrationTestSuite, StartStopFrameworks) { } TEST_F(RemoteServicesIntegrationTestSuite, InvokeRemoteCalcService) { + installProviderBundles(); + installConsumerBundles(); invokeRemoteCalcService(); } diff --git a/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json b/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json index e7d04adf..f086cd12 100644 --- a/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json +++ b/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json @@ -4,12 +4,12 @@ "endpoint.id": "id-01", "service.imported": true, "service.imported.configs": [ - "pubsub" + "ipc-mq" ], "service.exported.interfaces": "ICalculator", "endpoint.objectClass": "ICalculator", - "endpoint.topic": "test", - "endpoint.scope": "default" + "endpoint.client.to.provider.channel.id": "1234", + "endpoint.provider.to.client.channel.id": "1235" } ] } \ No newline at end of file diff --git a/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc b/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc index 35ceac04..00e22ec6 100644 --- a/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc +++ b/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc @@ -46,20 +46,8 @@ public: }); counter++; } - - void start() { - stream = calculator->result(); - stream->forEach([](double val) { - fprintf(stdout, "calc result stream: %f\n", val); - }); - } - - void stop() { - stream.reset(); - } private: std::shared_ptr<ICalculator> calculator{}; - std::shared_ptr<celix::PushStream<double>> stream{}; }; class CalculatorConsumerActivator { @@ -71,7 +59,6 @@ public: .setCallbacks(&CalculatorConsumer::setCalculator); cmp.createProvidedService<celix::IShellCommand>() .addProperty(celix::IShellCommand::COMMAND_NAME, "calc"); - cmp.setCallbacks(nullptr, &CalculatorConsumer::start, &CalculatorConsumer::stop, nullptr); cmp.build(); //bootstrap own configured import discovery to the configured discovery manager diff --git a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc index 57e86b00..f3a46099 100644 --- a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc +++ b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc @@ -98,8 +98,8 @@ public: .setCallbacks(&CalculatorImpl::setPushStreamProvider); cmp.createProvidedService<ICalculator>() .addProperty("service.exported.interfaces", celix::typeName<ICalculator>()) - .addProperty("endpoint.topic", "test") - .addProperty("endpoint.scope", "default") + .addProperty("endpoint.client.to.provider.channel.id", "1234") + .addProperty("endpoint.provider.to.client.channel.id", "1235") .addProperty("service.exported.intents", "osgi.async"); cmp.setCallbacks(&CalculatorImpl::init, &CalculatorImpl::start, &CalculatorImpl::stop, &CalculatorImpl::deinit); diff --git a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc index f86ebdeb..a711a81e 100644 --- a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc +++ b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc @@ -102,38 +102,19 @@ static void sendMsgWithIpc(const celix::LogHelper& logHelper, int qidSender, con } } -static int getConsumer2ProviderChannelId(std::string& scope, std::string& topic) { - auto c2pChannel = std::string{"c2p"} + "/" + scope + "/" + topic; //client 2 provider channel - auto c2pId = (int)celix_utils_stringHash(c2pChannel.c_str()); - return c2pId; -} - -static long getProvider2ConsumerChannelId(std::string& scope, std::string& topic) { - auto p2cChannel = std::string{"p2c"} + "/" + scope + "/" + topic; //provider 2 client channel - int p2cId = (int)celix_utils_stringHash(p2cChannel.c_str()); - return p2cId; -} - /** * A importedCalculater which acts as a pubsub proxy to a imported remote service. */ class ImportedCalculator final : public ICalculator { public: - explicit ImportedCalculator(celix::LogHelper _logHelper, int c2pChannelId, int p2cChannelId) : logHelper{std::move(_logHelper)} { + explicit ImportedCalculator(celix::LogHelper _logHelper, long c2pChannelId, long p2cChannelId) : logHelper{std::move(_logHelper)} { setupMsgIpc(c2pChannelId, p2cChannelId); } - ~ImportedCalculator() noexcept override { - //failing al leftover deferreds - { - std::lock_guard lock{mutex}; - for (auto& pair : deferreds) { - pair.second.fail(celix::rsa::RemoteServicesException{"Shutting down proxy"}); - } - } - }; + ~ImportedCalculator() noexcept override = default; std::shared_ptr<celix::PushStream<double>> result() override { + std::lock_guard lock{mutex}; return stream; } @@ -158,6 +139,7 @@ public: } int start() { + std::lock_guard lock{mutex}; ses = psp->createSynchronousEventSource<double>(factory); stream = psp->createStream<double>(ses, factory); running.store(true, std::memory_order::memory_order_release); @@ -176,6 +158,8 @@ public: running.store(false, std::memory_order::memory_order_release); receiveThread.join(); receiveThread = {}; + cleanupDeferreds(); + std::lock_guard lock{mutex}; ses->close(); ses.reset(); stream.reset(); @@ -197,7 +181,10 @@ public: } private: - void setupMsgIpc(int c2pChannelId, int p2cChannelId) { + void setupMsgIpc(long c2pChannelId, long p2cChannelId) { + logHelper.debug("Creating msg queue for ImportedCalculator with c2pChannelId=%li and p2cChannelId=%li", + c2pChannelId, + p2cChannelId); int keySender = (int)c2pChannelId; int keyReceiver = (int)p2cChannelId; qidSender = msgget(keySender, 0666 | IPC_CREAT); @@ -205,7 +192,19 @@ private: if (qidSender == -1 || qidReceiver == -1) { throw std::logic_error{"RsaShmClient: Error creating msg queue."}; + } else { + logHelper.info("Created msg queue for ImportedCalculator with qidSender=%i and qidReceiver=%i", + qidSender, + qidReceiver); + } + } + + void cleanupDeferreds() { + std::lock_guard lock{mutex}; + for (auto& pair : deferreds) { + pair.second.tryFail(celix::rsa::RemoteServicesException{"Shutting down proxy"}); } + deferreds.clear(); } void receiveMessages() { @@ -233,9 +232,9 @@ private: lock.unlock(); if (ret.hasError) { - deferred.fail(celix::rsa::RemoteServicesException{ret.errorMsg}); + deferred.tryFail(celix::rsa::RemoteServicesException{ret.errorMsg}); } else { - deferred.resolve(ret.result); + deferred.tryResolve(ret.result); } } catch (const IpcException& e) { logHelper.error("IpcException: %s", e.what()); @@ -249,6 +248,7 @@ private: return; // no message available (yet) } auto event = msg.value().mtext; + logHelper.trace("Received event %f", event.eventData); if (event.hasError) { logHelper.error("Received error event %s", event.errorMsg); @@ -312,7 +312,7 @@ private: */ class CalculatorImportServiceFactory final : public celix::rsa::IImportServiceFactory { public: - static constexpr const char * const CONFIGS = "pubsub"; + static constexpr const char * const CONFIGS = "ipc-mq"; explicit CalculatorImportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : ctx{std::move(_ctx)}, logHelper{ctx, "celix::rsa::RemoteServiceFactory"} {} ~CalculatorImportServiceFactory() noexcept override = default; @@ -332,10 +332,11 @@ public: private: std::string createImportedCalculatorComponent(const celix::rsa::EndpointDescription& endpoint) { - auto topic = endpoint.getProperties().get("endpoint.topic"); - auto scope = endpoint.getProperties().get("endpoint.scope"); - auto c2pChannelId = getConsumer2ProviderChannelId(scope, topic); - auto p2cChannelId = getProvider2ConsumerChannelId(scope, topic); + for (auto it : endpoint.getProperties()) { + logHelper.info("Endpoint property %s=%s", it.first.c_str(), it.second.c_str()); + } + auto c2pChannelId = endpoint.getProperties().getAsLong("endpoint.client.to.provider.channel.id", -1); + auto p2cChannelId = endpoint.getProperties().getAsLong("endpoint.provider.to.client.channel.id", -1); auto& cmp = ctx->getDependencyManager()->createComponent(std::make_unique<ImportedCalculator>(logHelper, c2pChannelId, p2cChannelId)); cmp.createServiceDependency<celix::PromiseFactory>() @@ -378,7 +379,7 @@ private: */ class ExportedCalculator final { public: - explicit ExportedCalculator(celix::LogHelper _logHelper, int c2pChannelId, int p2cChannelId) : logHelper{std::move(_logHelper)} { + explicit ExportedCalculator(celix::LogHelper _logHelper, long c2pChannelId, long p2cChannelId) : logHelper{std::move(_logHelper)} { setupMsgIpc(c2pChannelId, p2cChannelId); } @@ -434,7 +435,10 @@ public: calculator = calc; } private: - void setupMsgIpc(int c2pChannelId, int p2cChannelId) { + void setupMsgIpc(long c2pChannelId, long p2cChannelId) { + logHelper.debug("Creating msg queue for ExportCalculator with c2pChannelId=%li and p2cChannelId=%li", + c2pChannelId, + p2cChannelId); //note reverse order of sender and receiver compared to ImportedCalculator int keySender = (int)p2cChannelId; int keyReceiver = (int)c2pChannelId; @@ -443,6 +447,10 @@ private: if (qidSender == -1 || qidReceiver == -1) { throw std::logic_error{"RsaShmClient: Error creating msg queue."}; + } else { + logHelper.info("Created msg queue for ExportCalculator with qidSender=%i and qidReceiver=%i", + qidSender, + qidReceiver); } } @@ -518,7 +526,7 @@ private: */ class CalculatorExportServiceFactory final : public celix::rsa::IExportServiceFactory { public: - static constexpr const char * const CONFIGS = "pubsub"; + static constexpr const char * const CONFIGS = "ipc-mq"; static constexpr const char * const INTENTS = "osgi.async"; explicit CalculatorExportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : ctx{std::move(_ctx)}, @@ -544,13 +552,12 @@ public: private: std::string createExportedCalculatorComponent(const celix::Properties& serviceProperties) { - auto topic = serviceProperties.get("endpoint.topic"); - auto scope = serviceProperties.get("endpoint.scope"); - auto c2pChannelId = getConsumer2ProviderChannelId(scope, topic); - auto p2cChannelId = getProvider2ConsumerChannelId(scope, topic); + auto c2pChannelId = serviceProperties.getAsLong("endpoint.client.to.provider.channel.id", -1); + auto p2cChannelId = serviceProperties.getAsLong("endpoint.provider.to.client.channel.id", -1); auto svcId = serviceProperties.get(celix::SERVICE_ID); - auto& cmp = ctx->getDependencyManager()->createComponent(std::make_unique<ExportedCalculator>(logHelper, c2pChannelId, p2cChannelId)); + auto& cmp = ctx->getDependencyManager()->createComponent( + std::make_unique<ExportedCalculator>(logHelper, c2pChannelId, p2cChannelId)); cmp.createServiceDependency<celix::PromiseFactory>() .setRequired(true)
