This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/cxx_rsa_update
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/cxx_rsa_update by this 
push:
     new da3ff98  Adds integration example for cxx rsa
da3ff98 is described below

commit da3ff98d77e585c0887ddd7c5866dcbfb6e26d4a
Author: Pepijn Noltes <[email protected]>
AuthorDate: Sun May 9 20:58:11 2021 +0200

    Adds integration example for cxx rsa
---
 bundles/cxx_remote_services/CMakeLists.txt         |   7 +
 .../cxx_remote_services/integration/CMakeLists.txt |  97 +++++
 .../integration/include/ICalculator.h              |  29 ++
 .../resources/Calculator$add$Invoke.descriptor     |   9 +
 .../resources/Calculator$add$Return.descriptor     |   9 +
 .../integration/resources/endpoint_discovery.json  |  15 +
 .../integration/src/CalculatorConsumer.cc          |  75 ++++
 .../integration/src/CalculatorProvider.cc          |  58 +++
 .../src/TestExportImportRemoteServiceFactory.cc    | 419 +++++++++++++++++++++
 .../pubsub_admin_zmq/v1/src/pubsub_zmq_admin.c     |   1 +
 .../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c     |   1 +
 bundles/pubsub/pubsub_discovery/CMakeLists.txt     |   7 +-
 bundles/pubsub/pubsub_utils/src/pubsub_matching.c  |  18 +-
 .../src/pubsub_serialization_provider.c            |   7 +-
 libs/promises/api/celix/impl/SharedPromiseState.h  |   1 +
 15 files changed, 734 insertions(+), 19 deletions(-)

diff --git a/bundles/cxx_remote_services/CMakeLists.txt 
b/bundles/cxx_remote_services/CMakeLists.txt
index eadbd41..06bbd6b 100644
--- a/bundles/cxx_remote_services/CMakeLists.txt
+++ b/bundles/cxx_remote_services/CMakeLists.txt
@@ -22,6 +22,13 @@ if (REMOTE_SERVICE_ADMIN)
     add_subdirectory(rsa_spi)
     add_subdirectory(admin)
     add_subdirectory(discovery_configured)
+
+    if (ENABLE_TESTING)
+        add_subdirectory(integration)
+    endif()
+
+
+    #NOTE the toplogy manager is not yet used. The discovery and RSA need to 
be refactor for this
     #add_subdirectory(topology_manager)
     #add_subdirectory(examples)
 endif()
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/integration/CMakeLists.txt 
b/bundles/cxx_remote_services/integration/CMakeLists.txt
new file mode 100644
index 0000000..cad4fec
--- /dev/null
+++ b/bundles/cxx_remote_services/integration/CMakeLists.txt
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_celix_bundle(TestExportImportRemoteServiceFactory
+        SOURCES src/TestExportImportRemoteServiceFactory.cc
+)
+target_link_libraries(TestExportImportRemoteServiceFactory PRIVATE 
Celix::rsa_spi Celix::pubsub_api Celix::Promises Celix::log_helper)
+target_include_directories(TestExportImportRemoteServiceFactory PRIVATE 
include)
+#TODO improve with serializer svc, for now using descriptors
+celix_bundle_files(TestExportImportRemoteServiceFactory
+        resources/Calculator$add$Invoke.descriptor
+        resources/Calculator$add$Return.descriptor
+        DESTINATION "META-INF/descriptors"
+)
+
+add_celix_bundle(CalculatorProvider
+        SOURCES src/CalculatorProvider.cc
+)
+target_link_libraries(CalculatorProvider PRIVATE Celix::Promises)
+target_include_directories(CalculatorProvider PRIVATE include)
+target_compile_options(CalculatorProvider PRIVATE -std=c++17) #TODO how can 
this be improved (bring back -std=c++17 on INTERFACE for promises?
+
+
+add_celix_bundle(CalculatorConsumer
+        SOURCES src/CalculatorConsumer.cc
+)
+target_link_libraries(CalculatorConsumer PRIVATE Celix::Promises)
+target_include_directories(CalculatorConsumer PRIVATE include)
+target_compile_options(CalculatorConsumer PRIVATE -std=c++17) #TODO how can 
this be improved (bring back -std=c++17 on INTERFACE for promises?
+
+
+################# Integration examples ##################################
+
+add_celix_container(RemoteCalculatorProvider
+        GROUP rsa
+        PROPERTIES
+            CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+
+            #Configuration to let the pubsub zmq operate without dicscovery
+            
#PSA_ZMQ_STATIC_BIND_URL_FOR_test_return=ipc:///tmp/pubsub-test-return
+            
#PSA_ZMQ_STATIC_CONNECT_URL_FOR_test_invoke=ipc:///tmp/pubsub-test-invoke
+        BUNDLES
+            Celix::shell
+            Celix::shell_tui
+
+            #Needed for remote services (full pubsub stack + remote services 
stack)
+            Celix::pubsub_serializer_json
+            Celix::pubsub_topology_manager
+            #TODO replace with v1 when marker interfaces for a serializer type 
are introduced  Celix::pubsub_admin_zmq_v2
+            Celix::pubsub_admin_zmq
+            Celix::pubsub_protocol_wire_v2
+            Celix::pubsub_discovery_etcd
+            Celix::RemoteServiceAdmin
+            TestExportImportRemoteServiceFactory #needed to be able to create 
a ExportedService for ICalculator
+
+            CalculatorProvider
+)
+
+add_celix_container(RemoteCalculatorConsumer
+        GROUP rsa
+        PROPERTIES
+            CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+
+            #Configuration to let the pubsub zmq operate without dicscovery
+            
#PSA_ZMQ_STATIC_BIND_URL_FOR_test_invoke=ipc:///tmp/pubsub-test-invoke
+            
#PSA_ZMQ_STATIC_CONNECT_URL_FOR_test_return=ipc:///tmp/pubsub-test-return
+            
CELIX_RSA_CONFIGURED_DISCOVERY_DISCOVERY_FILES=${CMAKE_CURRENT_SOURCE_DIR}/resources/endpoint_discovery.json
+        BUNDLES
+            Celix::shell
+            Celix::shell_tui
+
+            #Needed for remote services (full pubsub stack + remote services 
stack)
+            Celix::pubsub_serializer_json
+            Celix::pubsub_topology_manager
+            Celix::pubsub_discovery_etcd
+            Celix::pubsub_admin_zmq
+            Celix::pubsub_protocol_wire_v2
+            Celix::RsaConfiguredDiscovery
+            Celix::RemoteServiceAdmin
+            TestExportImportRemoteServiceFactory #needed to be able to create 
a ExportedService for ICalculator
+
+            CalculatorConsumer
+)
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/integration/include/ICalculator.h 
b/bundles/cxx_remote_services/integration/include/ICalculator.h
new file mode 100644
index 0000000..dd95aee
--- /dev/null
+++ b/bundles/cxx_remote_services/integration/include/ICalculator.h
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include "celix/Promise.h"
+
+class ICalculator {
+public:
+    virtual ~ICalculator() noexcept = default;
+
+    virtual celix::Promise<double> add(double a, double b) = 0;
+};
diff --git 
a/bundles/cxx_remote_services/integration/resources/Calculator$add$Invoke.descriptor
 
b/bundles/cxx_remote_services/integration/resources/Calculator$add$Invoke.descriptor
new file mode 100644
index 0000000..84125ee
--- /dev/null
+++ 
b/bundles/cxx_remote_services/integration/resources/Calculator$add$Invoke.descriptor
@@ -0,0 +1,9 @@
+:header
+type=message
+name=Calculator$add$Invoke
+version=1.0.0
+:annotations
+classname=Calculator$add$Invoke
+:types
+:message
+{DD arg1 arg1}
diff --git 
a/bundles/cxx_remote_services/integration/resources/Calculator$add$Return.descriptor
 
b/bundles/cxx_remote_services/integration/resources/Calculator$add$Return.descriptor
new file mode 100644
index 0000000..50e8228
--- /dev/null
+++ 
b/bundles/cxx_remote_services/integration/resources/Calculator$add$Return.descriptor
@@ -0,0 +1,9 @@
+:header
+type=message
+name=Calculator$add$Return
+version=1.0.0
+:annotations
+classname=Calculator$add$Return
+:types
+:message
+{[Dt optionalReturnValue optionalError}
diff --git 
a/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json 
b/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json
new file mode 100644
index 0000000..e7d04ad
--- /dev/null
+++ b/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json
@@ -0,0 +1,15 @@
+{
+  "endpoints": [
+    {
+      "endpoint.id": "id-01",
+      "service.imported": true,
+      "service.imported.configs": [
+        "pubsub"
+      ],
+      "service.exported.interfaces": "ICalculator",
+      "endpoint.objectClass": "ICalculator",
+      "endpoint.topic": "test",
+      "endpoint.scope": "default"
+    }
+  ]
+}
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc 
b/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc
new file mode 100644
index 0000000..de4caae
--- /dev/null
+++ b/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory>
+
+#include "celix/BundleActivator.h"
+#include "celix/PromiseFactory.h"
+#include "ICalculator.h"
+
+class CalculatorConsumer final {
+public:
+    void start() {
+        std::cout << "starting calc thread" << std::endl;
+        active = true;
+        calcThread = std::thread{[this]() {
+            double secondArg = 1;
+            while(active) {
+                std::cout << "Calling calc" << std::endl;
+                calculator->add(42, secondArg++)
+                    .onSuccess([](double val) {
+                        std::cout << "calc result is " << val << std::endl;
+                    })
+                    .onFailure([](const auto& exp) {
+                        std::cerr << "error calling calc: " << exp.what() << 
std::endl;
+                    });
+                std::this_thread::sleep_for(std::chrono::seconds{5});
+            }
+        }};
+    }
+
+    void stop() {
+        active = false;
+        if (calcThread.joinable()) {
+            calcThread.join();
+        }
+    }
+
+    void setCalculator(const std::shared_ptr<ICalculator>& cal) {
+        calculator = cal;
+    }
+private:
+    std::atomic<bool> active{false};
+    std::shared_ptr<ICalculator> calculator{};
+    std::thread calcThread{};
+};
+
+class CalculatorProviderActivator {
+public:
+    explicit CalculatorProviderActivator(const 
std::shared_ptr<celix::BundleContext>& ctx) {
+        auto& cmp = 
ctx->getDependencyManager()->createComponent(std::make_shared<CalculatorConsumer>());
+        cmp.createServiceDependency<ICalculator>()
+                .setRequired(true)
+                .setCallbacks(&CalculatorConsumer::setCalculator);
+        cmp.setCallbacks(nullptr, &CalculatorConsumer::start, 
&CalculatorConsumer::stop, nullptr);
+        cmp.build();
+    }
+};
+
+CELIX_GEN_CXX_BUNDLE_ACTIVATOR(CalculatorProviderActivator)
diff --git a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc 
b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
new file mode 100644
index 0000000..dd4a3a2
--- /dev/null
+++ b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory>
+
+#include "celix/BundleActivator.h"
+#include "celix/PromiseFactory.h"
+#include "ICalculator.h"
+
+class CalculatorImpl final : public ICalculator {
+public:
+    ~CalculatorImpl() noexcept override = default;
+
+    celix::Promise<double> add(double a, double b) override {
+        auto deferred = factory->deferred<double>();
+        deferred.resolve(a+b);
+        return deferred.getPromise();
+    }
+
+    void setFactory(const std::shared_ptr<celix::PromiseFactory>& fac) {
+        factory = fac;
+    }
+private:
+    std::shared_ptr<celix::PromiseFactory> factory{};
+};
+
+class CalculatorProviderActivator {
+public:
+    explicit CalculatorProviderActivator(const 
std::shared_ptr<celix::BundleContext>& ctx) {
+        auto& cmp = 
ctx->getDependencyManager()->createComponent(std::make_shared<CalculatorImpl>());
+        cmp.createServiceDependency<celix::PromiseFactory>()
+                .setRequired(true)
+                .setCallbacks(&CalculatorImpl::setFactory);
+        cmp.createProvidedService<ICalculator>()
+                .addProperty("service.exported.interfaces", 
celix::typeName<ICalculator>())
+                .addProperty("endpoint.topic", "test")
+                .addProperty("endpoint.scope", "default");
+        cmp.build();
+    }
+};
+
+CELIX_GEN_CXX_BUNDLE_ACTIVATOR(CalculatorProviderActivator)
diff --git 
a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
 
b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
new file mode 100644
index 0000000..f4cd6ce
--- /dev/null
+++ 
b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <unordered_map>
+
+#include "celix/PromiseFactory.h"
+#include "celix/BundleActivator.h"
+#include "celix/rsa/IImportServiceFactory.h"
+#include "celix/rsa/IExportServiceFactory.h"
+#include "celix/LogHelper.h"
+
+#include "ICalculator.h"
+#include "pubsub/publisher.h"
+#include "pubsub/subscriber.h"
+
+constexpr auto INVOKE_TIMEOUT = std::chrono::seconds{5}; //TODO make 
configurable
+
+struct Calculator$add$Invoke {
+    double arg1;
+    double arg2;
+};
+
+struct Calculator$add$Return {
+    struct {
+        uint32_t cap;
+        uint32_t len;
+        double* buf;
+    } optionalReturnValue;
+    char* optionalError;
+};
+
+/**
+ * A importedCalculater which acts as a pubsub proxy to a imported remote 
service.
+ */
+class ImportedCalculator final : public ICalculator {
+public:
+    explicit ImportedCalculator(celix::LogHelper _logHelper) : 
logHelper{std::move(_logHelper)} {}
+    ~ImportedCalculator() noexcept override = default;
+
+    celix::Promise<double> add(double a, double b) override {
+        //setup msg id
+        thread_local unsigned int invokeMsgId = 0;
+        if (invokeMsgId == 0) {
+            publisher->localMsgTypeIdForMsgType(publisher->handle, 
"Calculator$add$Invoke", &invokeMsgId);
+        }
+
+        long invokeId = nextInvokeId++;
+        std::lock_guard lck{mutex};
+        auto deferred = factory->deferred<double>();
+        deferreds.emplace(invokeId, deferred);
+        if (invokeMsgId > 0) {
+            Calculator$add$Invoke invoke;
+            invoke.arg1 = a;
+            invoke.arg2 = b;
+            auto* meta = celix_properties_create();
+            celix_properties_setLong(meta, "invoke.id", invokeId);
+            int rc = publisher->send(publisher->handle, invokeMsgId, &invoke, 
meta);
+            if (rc != 0) {
+                constexpr auto msg = "error sending invoke msg";
+                logHelper.error(msg);
+                deferred.fail(celix::rsa::RemoteServicesException{msg});
+            }
+        } else {
+            constexpr auto msg = "error getting msg id for invoke msg";
+            logHelper.error(msg);
+            deferred.fail(celix::rsa::RemoteServicesException{msg});
+        }
+        return deferred.getPromise().timeout(INVOKE_TIMEOUT);
+    }
+
+    void receive(const char *msgType, unsigned int msgTypeId, void *msg, const 
celix_properties_t* meta) {
+        //setup message ids
+        thread_local unsigned int returnAddMsgId = 0;
+        if (returnAddMsgId == 0 && celix_utils_stringEquals(msgType, 
"Calculator$add$Return")) {
+            returnAddMsgId = msgTypeId;
+        }
+
+        //handle incoming messages
+        if (returnAddMsgId != 0 && returnAddMsgId == msgTypeId) {
+            long invokeId = celix_properties_getAsLong(meta, "invoke.id", -1);
+            if (invokeId == -1) {
+                logHelper.error("Cannot find invoke id on metadata");
+                return;
+            }
+            auto* result = static_cast<Calculator$add$Return*>(msg);
+            std::lock_guard lock{mutex};
+            auto it = deferreds.find(invokeId);
+            if (it == end(deferreds)) {
+                logHelper.error("Cannot find deferred for invoke id %li", 
invokeId);
+                return;
+            }
+            if (result->optionalReturnValue.len == 1) {
+                it->second.resolve(result->optionalReturnValue.buf[0]);
+            } else {
+                it->second.fail(celix::rsa::RemoteServicesException{"Failed 
resolving remote promise"});
+            }
+            deferreds.erase(it);
+        } else {
+            logHelper.warning("Unexpected message type %s", msgType);
+        }
+    }
+
+    void setPublisher(const std::shared_ptr<pubsub_publisher>& pub) {
+        std::lock_guard lock{mutex};
+        publisher = pub;
+    }
+
+    void setPromiseFactory(const std::shared_ptr<celix::PromiseFactory>& fac) {
+        std::lock_guard lock{mutex};
+        factory = fac;
+    }
+private:
+    celix::LogHelper logHelper;
+    std::atomic<long> nextInvokeId{0};
+    std::mutex mutex{}; //protects below
+    std::shared_ptr<celix::PromiseFactory> factory{};
+    std::shared_ptr<pubsub_publisher> publisher{};
+    std::unordered_map<long, celix::Deferred<double>> deferreds{};
+};
+
+/**
+ * A import service guard, which will remove the component if it goes out of 
scope.
+ */
+class ComponentImportServiceGuard final : public 
celix::rsa::IImportServiceGuard {
+public:
+    ComponentImportServiceGuard(std::shared_ptr<celix::BundleContext> _ctx, 
std::string _componentId) : ctx{std::move(_ctx)}, 
componentId{std::move(_componentId)} {}
+    ~ComponentImportServiceGuard() noexcept override {
+        auto context = ctx.lock();
+        if (context) {
+            context->getDependencyManager()->removeComponentAsync(componentId);
+        } //else already gone
+    }
+private:
+    const std::weak_ptr<celix::BundleContext> ctx;
+    const std::string componentId;
+};
+
+/**
+ * @brief a manual written import service factory for a ICalculator interface
+ */
+class CalculatorImportServiceFactory final : public 
celix::rsa::IImportServiceFactory {
+public:
+    explicit 
CalculatorImportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : 
ctx{std::move(_ctx)}, logHelper{ctx, "celix::rsa::RemoteServiceFactory"} {}
+
+    std::unique_ptr<celix::rsa::IImportServiceGuard> importService(const 
celix::rsa::EndpointDescription& endpoint) override {
+        if (endpoint.getConfigurationTypes() != "pubsub") {
+            ctx->logTrace("skipping endpoint, not pubsub configuration. Found 
config '%s'", endpoint.getConfigurationTypes().c_str());
+            return nullptr;
+        }
+
+        auto topic = endpoint.getProperties().get("endpoint.topic");
+        auto scope = endpoint.getProperties().get("endpoint.topic");
+        if (topic.empty() || scope.empty()) {
+            ctx->logError("Cannot import pubsub endpoint. Endpoint does not 
have a scope and/or topic");
+            return nullptr;
+        }
+
+        auto componentId = createImportedCalculatorComponent(endpoint);
+        return std::make_unique<ComponentImportServiceGuard>(ctx, 
std::move(componentId));
+    }
+private:
+    std::string createImportedCalculatorComponent(const 
celix::rsa::EndpointDescription& endpoint) {
+        auto invokeTopic = endpoint.getProperties().get("endpoint.topic") + 
"_invoke";
+        auto returnTopic = endpoint.getProperties().get("endpoint.topic") + 
"_return";
+        auto scope = endpoint.getProperties().get("endpoint.scope");
+
+        auto& cmp = 
ctx->getDependencyManager()->createComponent(std::make_unique<ImportedCalculator>(logHelper));
+        
cmp.createServiceDependency<pubsub_publisher>(PUBSUB_PUBLISHER_SERVICE_NAME)
+                .setRequired(true)
+                
.setFilter(std::string{"(&(topic="}.append(invokeTopic).append(")(scope=").append(scope).append("))"))
+                .setCallbacks(&ImportedCalculator::setPublisher);
+        cmp.createServiceDependency<celix::PromiseFactory>()
+                .setRequired(true)
+                .setCallbacks(&ImportedCalculator::setPromiseFactory);
+
+        auto subscriber = std::make_shared<pubsub_subscriber_t>();
+        subscriber->handle = &cmp.getInstance();
+        subscriber->receive = [](void *handle, const char *msgType, unsigned 
int msgTypeId, void *msg, const celix_properties_t *metadata, bool 
*/*release*/) ->  int {
+            auto* inst = static_cast<ImportedCalculator*>(handle);
+            try {
+                inst->receive(msgType, msgTypeId, msg, metadata);
+            } catch (...) {
+                return -1;
+            }
+            return 0;
+        };
+        cmp.addContext(subscriber);
+        cmp.createProvidedCService<pubsub_subscriber_t>(subscriber.get(), 
PUBSUB_SUBSCRIBER_SERVICE_NAME)
+                .addProperty("topic", returnTopic)
+                .addProperty("scope", scope);
+
+        //Adding the imported service as provide
+        celix::Properties svcProps{};
+        for (const auto& entry : endpoint.getProperties()) {
+            if (strncmp(entry.first.c_str(), "service.exported", 
strlen("service.exported")) == 0) {
+                //skip
+            } else {
+                svcProps.set(entry.first, entry.second);
+            }
+        };
+        cmp.createProvidedService<ICalculator>()
+                .setProperties(std::move(svcProps));
+
+        cmp.buildAsync();
+
+        return cmp.getUUID();
+    }
+
+    std::shared_ptr<celix::BundleContext> ctx;
+    celix::LogHelper logHelper;
+    std::mutex mutex{}; //protects below
+};
+
+/**
+ * A ExportedCalculator which acts as a proxy user to an exported remote 
service.
+ */
+class ExportedCalculator final {
+public:
+    explicit ExportedCalculator(celix::LogHelper _logHelper) : 
logHelper{std::move(_logHelper)} {}
+
+    void receive(const char *msgType, unsigned int msgTypeId, void *msg, const 
celix_properties_t* meta) {
+        //setup message ids
+        thread_local unsigned int invokeAddMsgId = 0;
+        if (invokeAddMsgId == 0 && celix_utils_stringEquals(msgType, 
"Calculator$add$Invoke")) {
+            invokeAddMsgId = msgTypeId;
+        }
+        thread_local unsigned int returnMsgId = 0;
+        if (returnMsgId == 0) {
+            publisher->localMsgTypeIdForMsgType(publisher->handle, 
"Calculator$add$Return", &returnMsgId);
+        }
+
+        //handle incoming messages
+        if (invokeAddMsgId != 0 && invokeAddMsgId == msgTypeId) {
+            auto* invoke = static_cast<Calculator$add$Invoke*>(msg);
+            long invokeId = celix_properties_getAsLong(meta, "invoke.id", -1);
+            if (invokeId == -1) {
+                logHelper.error("Cannot find invoke id on metadata");
+                return;
+            }
+
+            auto* metaProps = celix_properties_create();
+            celix_properties_set(metaProps, "invoke.id", 
std::to_string(invokeId).c_str());
+            std::lock_guard lock{mutex};
+            auto promise = calculator->add(invoke->arg1, invoke->arg2);
+            promise
+                .onFailure([svc = calculator, pub = publisher, msgId = 
returnMsgId, metaProps](const auto& exp) {
+                    //note this lambda makes copies of the 
std::shared_ptr<publisher> and svc, so a remove of publisher or service can 
only happen after the promise is done
+                    Calculator$add$Return ret;
+                    ret.optionalReturnValue.buf = nullptr;
+                    ret.optionalReturnValue.len = 0;
+                    ret.optionalReturnValue.cap = 0;
+                    ret.optionalError = celix_utils_strdup(exp.what());
+                    pub->send(pub->handle, msgId, &ret, metaProps);
+                })
+                .onSuccess([svc = calculator, pub = publisher, msgId = 
returnMsgId, metaProps](auto val) {
+                    //note this lambda makes copies of the 
std::shared_ptr<publisher> and svc, so a remove of publisher or service can 
only happen after the promise is done
+                    Calculator$add$Return ret;
+                    ret.optionalReturnValue.buf = (double*) 
malloc(sizeof(*ret.optionalReturnValue.buf));
+                    ret.optionalReturnValue.len = 1;
+                    ret.optionalReturnValue.cap = 1;
+                    ret.optionalReturnValue.buf[0] = val;
+                    ret.optionalError = nullptr;
+                    pub->send(pub->handle, msgId, &ret, metaProps);
+                });
+        } else {
+            logHelper.warning("Unexpected message type %s", msgType);
+        }
+    }
+
+    void setPublisher(const std::shared_ptr<pubsub_publisher>& pub) {
+        std::lock_guard lock{mutex};
+        publisher = pub;
+    }
+
+    void setPromiseFactory(const std::shared_ptr<celix::PromiseFactory>& fac) {
+        std::lock_guard lock{mutex};
+        factory = fac;
+    }
+
+    void setICalculator(const std::shared_ptr<ICalculator>& calc) {
+        std::lock_guard lock{mutex};
+        calculator = calc;
+    }
+private:
+    celix::LogHelper logHelper;
+    std::atomic<long> nextInvokeId{0};
+    std::mutex mutex{}; //protects below
+    std::shared_ptr<ICalculator> calculator{};
+    std::shared_ptr<celix::PromiseFactory> factory{};
+    std::shared_ptr<pubsub_publisher> publisher{};
+    std::unordered_map<long, celix::Deferred<double>> deferreds{};
+};
+
+/**
+ * A import service guard, which will remove the component if it goes out of 
scope.
+ */
+class ComponentExportServiceGuard final : public 
celix::rsa::IExportServiceGuard {
+public:
+    ComponentExportServiceGuard(std::shared_ptr<celix::BundleContext> _ctx, 
std::string _componentId) : ctx{std::move(_ctx)}, 
componentId{std::move(_componentId)} {}
+    ~ComponentExportServiceGuard() noexcept override {
+        auto context = ctx.lock();
+        if (context) {
+            context->getDependencyManager()->removeComponentAsync(componentId);
+        } //else already gone
+    }
+private:
+    const std::weak_ptr<celix::BundleContext> ctx;
+    const std::string componentId;
+};
+
+/**
+ * @brief a manual written export service factory for a ICalculator interface
+ */
+class CalculatorExportServiceFactory final : public 
celix::rsa::IExportServiceFactory {
+public:
+    explicit 
CalculatorExportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : 
ctx{std::move(_ctx)},
+                                                                               
           logHelper{ctx, "celix::rsa::RemoteServiceFactory"} {}
+
+    std::unique_ptr<celix::rsa::IExportServiceGuard> exportService(const 
celix::Properties& serviceProperties) override {
+        auto topic = serviceProperties.get("endpoint.topic");
+        auto scope = serviceProperties.get("endpoint.topic");
+        if (topic.empty() || scope.empty()) {
+            ctx->logError("Cannot export remote service pubsub without 
endpoint configuration. Endpoint does not have a scope and/or topic");
+            return nullptr;
+        }
+
+        auto componentId = 
createExportedCalculatorComponent(serviceProperties);
+        return std::make_unique<ComponentExportServiceGuard>(ctx, 
std::move(componentId));
+    }
+private:
+    std::string createExportedCalculatorComponent(const celix::Properties& 
serviceProperties) {
+        auto invokeTopic = serviceProperties.get("endpoint.topic") + "_invoke";
+        auto returnTopic = serviceProperties.get("endpoint.topic") + "_return";
+        auto scope = serviceProperties.get("endpoint.scope");
+        auto svcId = serviceProperties.get(celix::SERVICE_ID);
+
+        auto& cmp = 
ctx->getDependencyManager()->createComponent(std::make_unique<ExportedCalculator>(logHelper));
+        
cmp.createServiceDependency<pubsub_publisher>(PUBSUB_PUBLISHER_SERVICE_NAME)
+                .setRequired(true)
+                
.setFilter(std::string{"(&(topic="}.append(returnTopic).append(")(scope=").append(scope).append("))"))
+                .setCallbacks(&ExportedCalculator::setPublisher);
+
+        cmp.createServiceDependency<celix::PromiseFactory>()
+                .setRequired(true)
+                .setCallbacks(&ExportedCalculator::setPromiseFactory);
+
+        cmp.createServiceDependency<ICalculator>()
+                .setRequired(true)
+                
.setFilter(std::string{"("}.append(celix::SERVICE_ID).append("=").append(svcId).append(")"))
+                .setCallbacks(&ExportedCalculator::setICalculator);
+
+        auto subscriber = std::make_shared<pubsub_subscriber_t>();
+        subscriber->handle = &cmp.getInstance();
+        subscriber->receive = [](void *handle, const char *msgType, unsigned 
int msgTypeId, void *msg, const celix_properties_t *metadata, bool 
*/*release*/) ->  int {
+            auto* inst = static_cast<ExportedCalculator*>(handle);
+            try {
+                inst->receive(msgType, msgTypeId, msg, metadata);
+            } catch (...) {
+                return -1;
+            }
+            return 0;
+        };
+        cmp.addContext(subscriber);
+        cmp.createProvidedCService<pubsub_subscriber_t>(subscriber.get(), 
PUBSUB_SUBSCRIBER_SERVICE_NAME)
+                .addProperty("topic", invokeTopic)
+                .addProperty("scope", scope);
+
+        cmp.buildAsync();
+
+        return cmp.getUUID();
+    }
+
+    std::shared_ptr<celix::BundleContext> ctx;
+    celix::LogHelper logHelper;
+    std::mutex mutex{}; //protects below
+};
+
+class FactoryActivator {
+public:
+    explicit FactoryActivator(const std::shared_ptr<celix::BundleContext>& 
ctx) {
+        ctx->logInfo("Starting TestExportImportRemoteServiceFactory");
+        registrations.emplace_back(
+                
ctx->registerService<celix::rsa::IImportServiceFactory>(std::make_shared<CalculatorImportServiceFactory>(ctx))
+                        
.addProperty(celix::rsa::IImportServiceFactory::TARGET_SERVICE_NAME, 
celix::typeName<ICalculator>())
+                        .build()
+        );
+        registrations.emplace_back(
+                
ctx->registerService<celix::rsa::IExportServiceFactory>(std::make_shared<CalculatorExportServiceFactory>(ctx))
+                        
.addProperty(celix::rsa::IExportServiceFactory::TARGET_SERVICE_NAME, 
celix::typeName<ICalculator>())
+                        .build()
+        );
+        registrations.emplace_back(
+                //adding default promise factory with a low service ranking
+                
ctx->registerService<celix::PromiseFactory>(std::make_shared<celix::PromiseFactory>())
+                        .addProperty(celix::SERVICE_RANKING, -100)
+                        .build()
+        );
+    }
+private:
+    std::vector<std::shared_ptr<celix::ServiceRegistration>> registrations{};
+};
+
+CELIX_GEN_CXX_BUNDLE_ACTIVATOR(FactoryActivator)
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_admin.c 
b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_admin.c
index fb6206c..09c7ae5 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_admin.c
@@ -818,6 +818,7 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const 
char *commandLine, FILE
     celixThreadMutex_unlock(&psa->protocols.mutex);
     celixThreadMutex_unlock(&psa->serializers.mutex);
     fprintf(out, "\n");
+    free(line);
 
     return status;
 }
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c 
b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
index 9001ae0..5d396e5 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
@@ -851,6 +851,7 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const 
char *commandLine, FILE
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
     celixThreadMutex_unlock(&psa->protocols.mutex);
     fprintf(out, "\n");
+    free(line);
 
     return status;
 }
diff --git a/bundles/pubsub/pubsub_discovery/CMakeLists.txt 
b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
index a84ea8e..73235e6 100644
--- a/bundles/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
@@ -28,9 +28,10 @@ add_celix_bundle(celix_pubsub_discovery_etcd
 )
 target_include_directories(celix_pubsub_discovery_etcd PRIVATE src)
 target_link_libraries(celix_pubsub_discovery_etcd PRIVATE
-         Celix::framework Celix::etcdlib_static
-        Celix::shell_api Celix::log_helper
-        CURL::libcurl Jansson)
+    Celix::framework Celix::etcdlib_static
+    Celix::shell_api Celix::log_helper
+    CURL::libcurl Jansson
+)
 target_link_libraries(celix_pubsub_discovery_etcd PRIVATE Celix::pubsub_spi 
Celix::pubsub_utils )
 
 install_celix_bundle(celix_pubsub_discovery_etcd EXPORT celix COMPONENT pubsub)
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c 
b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
index 6f83cac..b2a78df 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
@@ -141,27 +141,25 @@ double pubsub_utils_matchPublisher(
     const char *requested_serializer = celix_properties_get(ep, 
PUBSUB_ENDPOINT_SERIALIZER, NULL);
     long serializerSvcId = getPSSerializer(ctx, requested_serializer);
 
-    if (serializerSvcId < 0) {
-        score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
-    }
-
     if (outSerializerSvcId != NULL) {
         *outSerializerSvcId = serializerSvcId;
     }
 
+    long protocolSvcId = -1;
     if (matchProtocol) {
         const char *requested_protocol = celix_properties_get(ep, 
PUBSUB_ENDPOINT_PROTOCOL, NULL);
-        long protocolSvcId = getPSProtocol(ctx, requested_protocol);
-
-        if (protocolSvcId < 0) {
-            score = PUBSUB_ADMIN_NO_MATCH_SCORE;
-        }
-
+        protocolSvcId = getPSProtocol(ctx, requested_protocol);
         if (outProtocolSvcId != NULL) {
             *outProtocolSvcId = protocolSvcId;
         }
     }
 
+    if (serializerSvcId < 0) {
+        score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+    } else if (matchProtocol && protocolSvcId < 0) {
+        score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+    }
+
     if (outTopicProperties != NULL) {
         *outTopicProperties = ep;
     } else {
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c 
b/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
index 857a96e..d008191 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
@@ -601,12 +601,7 @@ pubsub_serialization_provider_t 
*pubsub_serializationProvider_create(
     provider->freeSerializeMsg = freeSerializeMsg;
     provider->deserialize = deserialize;
     provider->freeDeserializeMsg = freeDeserializeMsg;
-
-
-    {
-        provider->logHelper = celix_logHelper_create(ctx, 
"celix_pubsub_serialization_provider");
-
-    }
+    provider->logHelper = celix_logHelper_create(ctx, 
"celix_pubsub_serialization_provider");
 
     dynFunction_logSetup(dfi_log, provider, 1);
     dynType_logSetup(dfi_log, provider, 1);
diff --git a/libs/promises/api/celix/impl/SharedPromiseState.h 
b/libs/promises/api/celix/impl/SharedPromiseState.h
index c3bbda9..e108392 100644
--- a/libs/promises/api/celix/impl/SharedPromiseState.h
+++ b/libs/promises/api/celix/impl/SharedPromiseState.h
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <type_traits>
 #include <functional>
 #include <chrono>
 #include <mutex>

Reply via email to