This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/509-remove-pubsub
in repository https://gitbox.apache.org/repos/asf/celix.git

commit d2e10c5ac70aecab62e08620e616920bc7191bb6
Author: Pepijn Noltes <[email protected]>
AuthorDate: Sun Dec 17 16:52:20 2023 +0100

    Refactor cxx rsa integration to use ipc concepts
---
 .../cxx_remote_services/integration/CMakeLists.txt | 57 +++++++--------
 bundles/cxx_remote_services/integration/README.md  |  8 +++
 .../src/RemoteServicesIntegrationTestSuite.cc      |  9 ++-
 .../integration/resources/endpoint_discovery.json  |  6 +-
 .../integration/src/CalculatorConsumer.cc          | 13 ----
 .../integration/src/CalculatorProvider.cc          |  4 +-
 .../src/TestExportImportRemoteServiceFactory.cc    | 83 ++++++++++++----------
 7 files changed, 89 insertions(+), 91 deletions(-)

diff --git a/bundles/cxx_remote_services/integration/CMakeLists.txt 
b/bundles/cxx_remote_services/integration/CMakeLists.txt
index bad01c2d..9bb8d7e6 100644
--- a/bundles/cxx_remote_services/integration/CMakeLists.txt
+++ b/bundles/cxx_remote_services/integration/CMakeLists.txt
@@ -41,39 +41,36 @@ if (CXX_RSA_INTEGRATION)
     endif()
 
     ################# Integration examples ##################################
+    add_celix_container(RemoteCalculatorProvider
+        CXX
+        GROUP rsa
+        PROPERTIES
+            CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug
+        BUNDLES
+            Celix::ShellCxx
+            Celix::shell_tui
 
-    if (BUILD_LAUNCHER)
-        add_celix_container(RemoteCalculatorProvider
-            CXX
-            GROUP rsa
-            PROPERTIES
-                CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug
-            BUNDLES
-                Celix::ShellCxx
-                Celix::shell_tui
+            #Remote Services
+            Celix::RemoteServiceAdmin
+            TestExportImportRemoteServiceFactory #needed to be able to create 
a ExportedService for ICalculator
 
-                #Remote Services
-                Celix::RemoteServiceAdmin
-                TestExportImportRemoteServiceFactory #needed to be able to 
create a ExportedService for ICalculator
+            CalculatorProvider
+    )
 
-                CalculatorProvider
-        )
+    add_celix_container(RemoteCalculatorConsumer
+        CXX
+        GROUP rsa
+        PROPERTIES
+            CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug
+        BUNDLES
+            Celix::ShellCxx
+            Celix::shell_tui
 
-        add_celix_container(RemoteCalculatorConsumer
-            CXX
-            GROUP rsa
-            PROPERTIES
-                CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug
-            BUNDLES
-                Celix::ShellCxx
-                Celix::shell_tui
+            #Remote Services
+            Celix::RsaConfiguredDiscovery
+            Celix::RemoteServiceAdmin
+            TestExportImportRemoteServiceFactory #needed to be able to create 
a ExportedService for ICalculator
 
-                #Remote Services
-                Celix::RsaConfiguredDiscovery
-                Celix::RemoteServiceAdmin
-                TestExportImportRemoteServiceFactory #needed to be able to 
create a ExportedService for ICalculator
-
-                CalculatorConsumer
-        )
-    endif ()
+            CalculatorConsumer
+    )
 endif()
diff --git a/bundles/cxx_remote_services/integration/README.md 
b/bundles/cxx_remote_services/integration/README.md
new file mode 100644
index 00000000..1a34dccb
--- /dev/null
+++ b/bundles/cxx_remote_services/integration/README.md
@@ -0,0 +1,8 @@
+# C++ Remote Service Amdatu Integration
+
+This project part tests the integration of the C++ Remote Service Admin 
implementation with the 
+C++ configuration-based remote service discovery. 
+
+Because the C++ Remote Service Admin is based on export and import service 
factories and does not directly 
+implement a transportation or serializer technology, the integration tests are 
based on a simple implementation of
+inter process communication message queue (IPC mq) for transportation and a 
simple memcpy for serialization.
diff --git 
a/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
 
b/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
index 007dd1f8..73ef653d 100644
--- 
a/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
+++ 
b/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
@@ -74,9 +74,6 @@ public:
     }
 
     void invokeRemoteCalcService() {
-        installProviderBundles();
-        installConsumerBundles();
-
         //If a calculator provider bundle is installed I expect a exported 
calculator interface
         auto count = serverCtx->useService<ICalculator>()
                 .setFilter("(service.exported.interfaces=*)")
@@ -98,8 +95,8 @@ public:
 
         /*
          * Testing the remote service in a while loop till it is successful or 
10 seconds has passed.
-         * Note that because pubsub does not guarantee a connection when used, 
it is possible - and likely -
-         * that the first remote test iteration fails due to not yet 
completely connected pubsub.
+         * Note that because mq does not guarantee a connection when used, it 
is possible - and likely -
+         * that the first remote test iteration fails due to not yet 
completely connected mq.
          */
         auto start = std::chrono::system_clock::now();
         auto now = std::chrono::system_clock::now();
@@ -156,5 +153,7 @@ TEST_F(RemoteServicesIntegrationTestSuite, 
StartStopFrameworks) {
 }
 
 TEST_F(RemoteServicesIntegrationTestSuite, InvokeRemoteCalcService) {
+    installProviderBundles();
+    installConsumerBundles();
     invokeRemoteCalcService();
 }
diff --git 
a/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json 
b/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json
index e7d04adf..f086cd12 100644
--- a/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json
+++ b/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json
@@ -4,12 +4,12 @@
       "endpoint.id": "id-01",
       "service.imported": true,
       "service.imported.configs": [
-        "pubsub"
+        "ipc-mq"
       ],
       "service.exported.interfaces": "ICalculator",
       "endpoint.objectClass": "ICalculator",
-      "endpoint.topic": "test",
-      "endpoint.scope": "default"
+      "endpoint.client.to.provider.channel.id": "1234",
+      "endpoint.provider.to.client.channel.id": "1235"
     }
   ]
 }
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc 
b/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc
index 35ceac04..00e22ec6 100644
--- a/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc
+++ b/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc
@@ -46,20 +46,8 @@ public:
                 });
         counter++;
     }
-
-    void start() {
-        stream = calculator->result();
-        stream->forEach([](double val) {
-            fprintf(stdout, "calc result stream: %f\n", val);
-        });
-    }
-
-    void stop() {
-        stream.reset();
-    }
 private:
     std::shared_ptr<ICalculator> calculator{};
-    std::shared_ptr<celix::PushStream<double>> stream{};
 };
 
 class CalculatorConsumerActivator {
@@ -71,7 +59,6 @@ public:
                 .setCallbacks(&CalculatorConsumer::setCalculator);
         cmp.createProvidedService<celix::IShellCommand>()
                 .addProperty(celix::IShellCommand::COMMAND_NAME, "calc");
-        cmp.setCallbacks(nullptr, &CalculatorConsumer::start, 
&CalculatorConsumer::stop, nullptr);
         cmp.build();
 
         //bootstrap own configured import discovery to the configured 
discovery manager
diff --git a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc 
b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
index 57e86b00..f3a46099 100644
--- a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
+++ b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
@@ -98,8 +98,8 @@ public:
             .setCallbacks(&CalculatorImpl::setPushStreamProvider);
         cmp.createProvidedService<ICalculator>()
                 .addProperty("service.exported.interfaces", 
celix::typeName<ICalculator>())
-                .addProperty("endpoint.topic", "test")
-                .addProperty("endpoint.scope", "default")
+                .addProperty("endpoint.client.to.provider.channel.id", "1234")
+                .addProperty("endpoint.provider.to.client.channel.id", "1235")
                 .addProperty("service.exported.intents", "osgi.async");
 
         cmp.setCallbacks(&CalculatorImpl::init, &CalculatorImpl::start, 
&CalculatorImpl::stop, &CalculatorImpl::deinit);
diff --git 
a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
 
b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
index f86ebdeb..a711a81e 100644
--- 
a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
+++ 
b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
@@ -102,38 +102,19 @@ static void sendMsgWithIpc(const celix::LogHelper& 
logHelper, int qidSender, con
     }
 }
 
-static int getConsumer2ProviderChannelId(std::string& scope, std::string& 
topic) {
-    auto c2pChannel = std::string{"c2p"} + "/" + scope + "/" + topic; //client 
2 provider channel
-    auto c2pId = (int)celix_utils_stringHash(c2pChannel.c_str());
-    return c2pId;
-}
-
-static long getProvider2ConsumerChannelId(std::string& scope, std::string& 
topic) {
-    auto p2cChannel = std::string{"p2c"} + "/" + scope + "/" + topic; 
//provider 2 client channel
-    int p2cId = (int)celix_utils_stringHash(p2cChannel.c_str());
-    return p2cId;
-}
-
 /**
  * A importedCalculater which acts as a pubsub proxy to a imported remote 
service.
  */
 class ImportedCalculator final : public ICalculator {
 public:
-    explicit ImportedCalculator(celix::LogHelper _logHelper, int c2pChannelId, 
int p2cChannelId) : logHelper{std::move(_logHelper)} {
+    explicit ImportedCalculator(celix::LogHelper _logHelper, long 
c2pChannelId, long p2cChannelId) : logHelper{std::move(_logHelper)} {
         setupMsgIpc(c2pChannelId, p2cChannelId);
     }
 
-    ~ImportedCalculator() noexcept override {
-        //failing al leftover deferreds
-        {
-            std::lock_guard lock{mutex};
-            for (auto& pair : deferreds) {
-                pair.second.fail(celix::rsa::RemoteServicesException{"Shutting 
down proxy"});
-            }
-        }
-    };
+    ~ImportedCalculator() noexcept override = default;
 
     std::shared_ptr<celix::PushStream<double>> result() override {
+        std::lock_guard lock{mutex};
         return stream;
     }
 
@@ -158,6 +139,7 @@ public:
     }
 
     int start() {
+        std::lock_guard lock{mutex};
         ses = psp->createSynchronousEventSource<double>(factory);
         stream = psp->createStream<double>(ses, factory);
         running.store(true, std::memory_order::memory_order_release);
@@ -176,6 +158,8 @@ public:
         running.store(false, std::memory_order::memory_order_release);
         receiveThread.join();
         receiveThread = {};
+        cleanupDeferreds();
+        std::lock_guard lock{mutex};
         ses->close();
         ses.reset();
         stream.reset();
@@ -197,7 +181,10 @@ public:
     }
 
 private:
-    void setupMsgIpc(int c2pChannelId, int p2cChannelId) {
+    void setupMsgIpc(long c2pChannelId, long p2cChannelId) {
+        logHelper.debug("Creating msg queue for ImportedCalculator with 
c2pChannelId=%li and p2cChannelId=%li",
+                    c2pChannelId,
+                    p2cChannelId);
         int keySender = (int)c2pChannelId;
         int keyReceiver = (int)p2cChannelId;
         qidSender = msgget(keySender, 0666 | IPC_CREAT);
@@ -205,7 +192,19 @@ private:
 
         if (qidSender == -1 || qidReceiver == -1) {
             throw std::logic_error{"RsaShmClient: Error creating msg queue."};
+        } else {
+            logHelper.info("Created msg queue for ImportedCalculator with 
qidSender=%i and qidReceiver=%i",
+                           qidSender,
+                           qidReceiver);
+        }
+    }
+
+    void cleanupDeferreds() {
+        std::lock_guard lock{mutex};
+        for (auto& pair : deferreds) {
+            pair.second.tryFail(celix::rsa::RemoteServicesException{"Shutting 
down proxy"});
         }
+        deferreds.clear();
     }
 
     void receiveMessages() {
@@ -233,9 +232,9 @@ private:
             lock.unlock();
 
             if (ret.hasError) {
-                
deferred.fail(celix::rsa::RemoteServicesException{ret.errorMsg});
+                
deferred.tryFail(celix::rsa::RemoteServicesException{ret.errorMsg});
             } else {
-                deferred.resolve(ret.result);
+                deferred.tryResolve(ret.result);
             }
         } catch (const IpcException& e) {
             logHelper.error("IpcException: %s", e.what());
@@ -249,6 +248,7 @@ private:
                 return; // no message available (yet)
             }
             auto event = msg.value().mtext;
+            logHelper.trace("Received event %f", event.eventData);
 
             if (event.hasError) {
                 logHelper.error("Received error event %s", event.errorMsg);
@@ -312,7 +312,7 @@ private:
  */
 class CalculatorImportServiceFactory final : public 
celix::rsa::IImportServiceFactory {
 public:
-    static constexpr const char * const CONFIGS = "pubsub";
+    static constexpr const char * const CONFIGS = "ipc-mq";
 
     explicit 
CalculatorImportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : 
ctx{std::move(_ctx)}, logHelper{ctx, "celix::rsa::RemoteServiceFactory"} {}
     ~CalculatorImportServiceFactory() noexcept override = default;
@@ -332,10 +332,11 @@ public:
 
 private:
     std::string createImportedCalculatorComponent(const 
celix::rsa::EndpointDescription& endpoint) {
-        auto topic = endpoint.getProperties().get("endpoint.topic");
-        auto scope = endpoint.getProperties().get("endpoint.scope");
-        auto c2pChannelId = getConsumer2ProviderChannelId(scope, topic);
-        auto p2cChannelId = getProvider2ConsumerChannelId(scope, topic);
+        for (auto it : endpoint.getProperties()) {
+            logHelper.info("Endpoint property %s=%s", it.first.c_str(), 
it.second.c_str());
+        }
+        auto c2pChannelId = 
endpoint.getProperties().getAsLong("endpoint.client.to.provider.channel.id",  
-1);
+        auto p2cChannelId = 
endpoint.getProperties().getAsLong("endpoint.provider.to.client.channel.id",  
-1);
 
         auto& cmp = 
ctx->getDependencyManager()->createComponent(std::make_unique<ImportedCalculator>(logHelper,
 c2pChannelId, p2cChannelId));
         cmp.createServiceDependency<celix::PromiseFactory>()
@@ -378,7 +379,7 @@ private:
  */
 class ExportedCalculator final {
 public:
-    explicit ExportedCalculator(celix::LogHelper _logHelper, int c2pChannelId, 
int p2cChannelId) : logHelper{std::move(_logHelper)} {
+    explicit ExportedCalculator(celix::LogHelper _logHelper, long 
c2pChannelId, long p2cChannelId) : logHelper{std::move(_logHelper)} {
         setupMsgIpc(c2pChannelId, p2cChannelId);
     }
 
@@ -434,7 +435,10 @@ public:
         calculator = calc;
     }
 private:
-    void setupMsgIpc(int c2pChannelId, int p2cChannelId) {
+    void setupMsgIpc(long c2pChannelId, long p2cChannelId) {
+        logHelper.debug("Creating msg queue for ExportCalculator with 
c2pChannelId=%li and p2cChannelId=%li",
+                        c2pChannelId,
+                        p2cChannelId);
         //note reverse order of sender and receiver compared to 
ImportedCalculator
         int keySender = (int)p2cChannelId;
         int keyReceiver = (int)c2pChannelId;
@@ -443,6 +447,10 @@ private:
 
         if (qidSender == -1 || qidReceiver == -1) {
             throw std::logic_error{"RsaShmClient: Error creating msg queue."};
+        } else {
+            logHelper.info("Created msg queue for ExportCalculator with 
qidSender=%i and qidReceiver=%i",
+                           qidSender,
+                           qidReceiver);
         }
     }
 
@@ -518,7 +526,7 @@ private:
  */
 class CalculatorExportServiceFactory final : public 
celix::rsa::IExportServiceFactory {
 public:
-    static constexpr const char * const CONFIGS = "pubsub";
+    static constexpr const char * const CONFIGS = "ipc-mq";
     static constexpr const char * const INTENTS = "osgi.async";
 
     explicit 
CalculatorExportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : 
ctx{std::move(_ctx)},
@@ -544,13 +552,12 @@ public:
 
 private:
     std::string createExportedCalculatorComponent(const celix::Properties& 
serviceProperties) {
-        auto topic = serviceProperties.get("endpoint.topic");
-        auto scope = serviceProperties.get("endpoint.scope");
-        auto c2pChannelId = getConsumer2ProviderChannelId(scope, topic);
-        auto p2cChannelId = getProvider2ConsumerChannelId(scope, topic);
+        auto c2pChannelId = 
serviceProperties.getAsLong("endpoint.client.to.provider.channel.id",  -1);
+        auto p2cChannelId = 
serviceProperties.getAsLong("endpoint.provider.to.client.channel.id",  -1);
         auto svcId = serviceProperties.get(celix::SERVICE_ID);
 
-        auto& cmp = 
ctx->getDependencyManager()->createComponent(std::make_unique<ExportedCalculator>(logHelper,
 c2pChannelId, p2cChannelId));
+        auto& cmp = ctx->getDependencyManager()->createComponent(
+            std::make_unique<ExportedCalculator>(logHelper, c2pChannelId, 
p2cChannelId));
 
         cmp.createServiceDependency<celix::PromiseFactory>()
                 .setRequired(true)

Reply via email to