This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/cxx_rsa_update
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/feature/cxx_rsa_update by this
push:
new da3ff98 Adds integration example for cxx rsa
da3ff98 is described below
commit da3ff98d77e585c0887ddd7c5866dcbfb6e26d4a
Author: Pepijn Noltes <[email protected]>
AuthorDate: Sun May 9 20:58:11 2021 +0200
Adds integration example for cxx rsa
---
bundles/cxx_remote_services/CMakeLists.txt | 7 +
.../cxx_remote_services/integration/CMakeLists.txt | 97 +++++
.../integration/include/ICalculator.h | 29 ++
.../resources/Calculator$add$Invoke.descriptor | 9 +
.../resources/Calculator$add$Return.descriptor | 9 +
.../integration/resources/endpoint_discovery.json | 15 +
.../integration/src/CalculatorConsumer.cc | 75 ++++
.../integration/src/CalculatorProvider.cc | 58 +++
.../src/TestExportImportRemoteServiceFactory.cc | 419 +++++++++++++++++++++
.../pubsub_admin_zmq/v1/src/pubsub_zmq_admin.c | 1 +
.../pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c | 1 +
bundles/pubsub/pubsub_discovery/CMakeLists.txt | 7 +-
bundles/pubsub/pubsub_utils/src/pubsub_matching.c | 18 +-
.../src/pubsub_serialization_provider.c | 7 +-
libs/promises/api/celix/impl/SharedPromiseState.h | 1 +
15 files changed, 734 insertions(+), 19 deletions(-)
diff --git a/bundles/cxx_remote_services/CMakeLists.txt
b/bundles/cxx_remote_services/CMakeLists.txt
index eadbd41..06bbd6b 100644
--- a/bundles/cxx_remote_services/CMakeLists.txt
+++ b/bundles/cxx_remote_services/CMakeLists.txt
@@ -22,6 +22,13 @@ if (REMOTE_SERVICE_ADMIN)
add_subdirectory(rsa_spi)
add_subdirectory(admin)
add_subdirectory(discovery_configured)
+
+ if (ENABLE_TESTING)
+ add_subdirectory(integration)
+ endif()
+
+
+ #NOTE the toplogy manager is not yet used. The discovery and RSA need to
be refactor for this
#add_subdirectory(topology_manager)
#add_subdirectory(examples)
endif()
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/integration/CMakeLists.txt
b/bundles/cxx_remote_services/integration/CMakeLists.txt
new file mode 100644
index 0000000..cad4fec
--- /dev/null
+++ b/bundles/cxx_remote_services/integration/CMakeLists.txt
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_celix_bundle(TestExportImportRemoteServiceFactory
+ SOURCES src/TestExportImportRemoteServiceFactory.cc
+)
+target_link_libraries(TestExportImportRemoteServiceFactory PRIVATE
Celix::rsa_spi Celix::pubsub_api Celix::Promises Celix::log_helper)
+target_include_directories(TestExportImportRemoteServiceFactory PRIVATE
include)
+#TODO improve with serializer svc, for now using descriptors
+celix_bundle_files(TestExportImportRemoteServiceFactory
+ resources/Calculator$add$Invoke.descriptor
+ resources/Calculator$add$Return.descriptor
+ DESTINATION "META-INF/descriptors"
+)
+
+add_celix_bundle(CalculatorProvider
+ SOURCES src/CalculatorProvider.cc
+)
+target_link_libraries(CalculatorProvider PRIVATE Celix::Promises)
+target_include_directories(CalculatorProvider PRIVATE include)
+target_compile_options(CalculatorProvider PRIVATE -std=c++17) #TODO how can
this be improved (bring back -std=c++17 on INTERFACE for promises?
+
+
+add_celix_bundle(CalculatorConsumer
+ SOURCES src/CalculatorConsumer.cc
+)
+target_link_libraries(CalculatorConsumer PRIVATE Celix::Promises)
+target_include_directories(CalculatorConsumer PRIVATE include)
+target_compile_options(CalculatorConsumer PRIVATE -std=c++17) #TODO how can
this be improved (bring back -std=c++17 on INTERFACE for promises?
+
+
+################# Integration examples ##################################
+
+add_celix_container(RemoteCalculatorProvider
+ GROUP rsa
+ PROPERTIES
+ CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+
+ #Configuration to let the pubsub zmq operate without dicscovery
+
#PSA_ZMQ_STATIC_BIND_URL_FOR_test_return=ipc:///tmp/pubsub-test-return
+
#PSA_ZMQ_STATIC_CONNECT_URL_FOR_test_invoke=ipc:///tmp/pubsub-test-invoke
+ BUNDLES
+ Celix::shell
+ Celix::shell_tui
+
+ #Needed for remote services (full pubsub stack + remote services
stack)
+ Celix::pubsub_serializer_json
+ Celix::pubsub_topology_manager
+ #TODO replace with v1 when marker interfaces for a serializer type
are introduced Celix::pubsub_admin_zmq_v2
+ Celix::pubsub_admin_zmq
+ Celix::pubsub_protocol_wire_v2
+ Celix::pubsub_discovery_etcd
+ Celix::RemoteServiceAdmin
+ TestExportImportRemoteServiceFactory #needed to be able to create
a ExportedService for ICalculator
+
+ CalculatorProvider
+)
+
+add_celix_container(RemoteCalculatorConsumer
+ GROUP rsa
+ PROPERTIES
+ CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=trace
+
+ #Configuration to let the pubsub zmq operate without dicscovery
+
#PSA_ZMQ_STATIC_BIND_URL_FOR_test_invoke=ipc:///tmp/pubsub-test-invoke
+
#PSA_ZMQ_STATIC_CONNECT_URL_FOR_test_return=ipc:///tmp/pubsub-test-return
+
CELIX_RSA_CONFIGURED_DISCOVERY_DISCOVERY_FILES=${CMAKE_CURRENT_SOURCE_DIR}/resources/endpoint_discovery.json
+ BUNDLES
+ Celix::shell
+ Celix::shell_tui
+
+ #Needed for remote services (full pubsub stack + remote services
stack)
+ Celix::pubsub_serializer_json
+ Celix::pubsub_topology_manager
+ Celix::pubsub_discovery_etcd
+ Celix::pubsub_admin_zmq
+ Celix::pubsub_protocol_wire_v2
+ Celix::RsaConfiguredDiscovery
+ Celix::RemoteServiceAdmin
+ TestExportImportRemoteServiceFactory #needed to be able to create
a ExportedService for ICalculator
+
+ CalculatorConsumer
+)
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/integration/include/ICalculator.h
b/bundles/cxx_remote_services/integration/include/ICalculator.h
new file mode 100644
index 0000000..dd95aee
--- /dev/null
+++ b/bundles/cxx_remote_services/integration/include/ICalculator.h
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include "celix/Promise.h"
+
+class ICalculator {
+public:
+ virtual ~ICalculator() noexcept = default;
+
+ virtual celix::Promise<double> add(double a, double b) = 0;
+};
diff --git
a/bundles/cxx_remote_services/integration/resources/Calculator$add$Invoke.descriptor
b/bundles/cxx_remote_services/integration/resources/Calculator$add$Invoke.descriptor
new file mode 100644
index 0000000..84125ee
--- /dev/null
+++
b/bundles/cxx_remote_services/integration/resources/Calculator$add$Invoke.descriptor
@@ -0,0 +1,9 @@
+:header
+type=message
+name=Calculator$add$Invoke
+version=1.0.0
+:annotations
+classname=Calculator$add$Invoke
+:types
+:message
+{DD arg1 arg1}
diff --git
a/bundles/cxx_remote_services/integration/resources/Calculator$add$Return.descriptor
b/bundles/cxx_remote_services/integration/resources/Calculator$add$Return.descriptor
new file mode 100644
index 0000000..50e8228
--- /dev/null
+++
b/bundles/cxx_remote_services/integration/resources/Calculator$add$Return.descriptor
@@ -0,0 +1,9 @@
+:header
+type=message
+name=Calculator$add$Return
+version=1.0.0
+:annotations
+classname=Calculator$add$Return
+:types
+:message
+{[Dt optionalReturnValue optionalError}
diff --git
a/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json
b/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json
new file mode 100644
index 0000000..e7d04ad
--- /dev/null
+++ b/bundles/cxx_remote_services/integration/resources/endpoint_discovery.json
@@ -0,0 +1,15 @@
+{
+ "endpoints": [
+ {
+ "endpoint.id": "id-01",
+ "service.imported": true,
+ "service.imported.configs": [
+ "pubsub"
+ ],
+ "service.exported.interfaces": "ICalculator",
+ "endpoint.objectClass": "ICalculator",
+ "endpoint.topic": "test",
+ "endpoint.scope": "default"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc
b/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc
new file mode 100644
index 0000000..de4caae
--- /dev/null
+++ b/bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory>
+
+#include "celix/BundleActivator.h"
+#include "celix/PromiseFactory.h"
+#include "ICalculator.h"
+
+class CalculatorConsumer final {
+public:
+ void start() {
+ std::cout << "starting calc thread" << std::endl;
+ active = true;
+ calcThread = std::thread{[this]() {
+ double secondArg = 1;
+ while(active) {
+ std::cout << "Calling calc" << std::endl;
+ calculator->add(42, secondArg++)
+ .onSuccess([](double val) {
+ std::cout << "calc result is " << val << std::endl;
+ })
+ .onFailure([](const auto& exp) {
+ std::cerr << "error calling calc: " << exp.what() <<
std::endl;
+ });
+ std::this_thread::sleep_for(std::chrono::seconds{5});
+ }
+ }};
+ }
+
+ void stop() {
+ active = false;
+ if (calcThread.joinable()) {
+ calcThread.join();
+ }
+ }
+
+ void setCalculator(const std::shared_ptr<ICalculator>& cal) {
+ calculator = cal;
+ }
+private:
+ std::atomic<bool> active{false};
+ std::shared_ptr<ICalculator> calculator{};
+ std::thread calcThread{};
+};
+
+class CalculatorProviderActivator {
+public:
+ explicit CalculatorProviderActivator(const
std::shared_ptr<celix::BundleContext>& ctx) {
+ auto& cmp =
ctx->getDependencyManager()->createComponent(std::make_shared<CalculatorConsumer>());
+ cmp.createServiceDependency<ICalculator>()
+ .setRequired(true)
+ .setCallbacks(&CalculatorConsumer::setCalculator);
+ cmp.setCallbacks(nullptr, &CalculatorConsumer::start,
&CalculatorConsumer::stop, nullptr);
+ cmp.build();
+ }
+};
+
+CELIX_GEN_CXX_BUNDLE_ACTIVATOR(CalculatorProviderActivator)
diff --git a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
new file mode 100644
index 0000000..dd4a3a2
--- /dev/null
+++ b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory>
+
+#include "celix/BundleActivator.h"
+#include "celix/PromiseFactory.h"
+#include "ICalculator.h"
+
+class CalculatorImpl final : public ICalculator {
+public:
+ ~CalculatorImpl() noexcept override = default;
+
+ celix::Promise<double> add(double a, double b) override {
+ auto deferred = factory->deferred<double>();
+ deferred.resolve(a+b);
+ return deferred.getPromise();
+ }
+
+ void setFactory(const std::shared_ptr<celix::PromiseFactory>& fac) {
+ factory = fac;
+ }
+private:
+ std::shared_ptr<celix::PromiseFactory> factory{};
+};
+
+class CalculatorProviderActivator {
+public:
+ explicit CalculatorProviderActivator(const
std::shared_ptr<celix::BundleContext>& ctx) {
+ auto& cmp =
ctx->getDependencyManager()->createComponent(std::make_shared<CalculatorImpl>());
+ cmp.createServiceDependency<celix::PromiseFactory>()
+ .setRequired(true)
+ .setCallbacks(&CalculatorImpl::setFactory);
+ cmp.createProvidedService<ICalculator>()
+ .addProperty("service.exported.interfaces",
celix::typeName<ICalculator>())
+ .addProperty("endpoint.topic", "test")
+ .addProperty("endpoint.scope", "default");
+ cmp.build();
+ }
+};
+
+CELIX_GEN_CXX_BUNDLE_ACTIVATOR(CalculatorProviderActivator)
diff --git
a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
new file mode 100644
index 0000000..f4cd6ce
--- /dev/null
+++
b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <unordered_map>
+
+#include "celix/PromiseFactory.h"
+#include "celix/BundleActivator.h"
+#include "celix/rsa/IImportServiceFactory.h"
+#include "celix/rsa/IExportServiceFactory.h"
+#include "celix/LogHelper.h"
+
+#include "ICalculator.h"
+#include "pubsub/publisher.h"
+#include "pubsub/subscriber.h"
+
+constexpr auto INVOKE_TIMEOUT = std::chrono::seconds{5}; //TODO make
configurable
+
+struct Calculator$add$Invoke {
+ double arg1;
+ double arg2;
+};
+
+struct Calculator$add$Return {
+ struct {
+ uint32_t cap;
+ uint32_t len;
+ double* buf;
+ } optionalReturnValue;
+ char* optionalError;
+};
+
+/**
+ * A importedCalculater which acts as a pubsub proxy to a imported remote
service.
+ */
+class ImportedCalculator final : public ICalculator {
+public:
+ explicit ImportedCalculator(celix::LogHelper _logHelper) :
logHelper{std::move(_logHelper)} {}
+ ~ImportedCalculator() noexcept override = default;
+
+ celix::Promise<double> add(double a, double b) override {
+ //setup msg id
+ thread_local unsigned int invokeMsgId = 0;
+ if (invokeMsgId == 0) {
+ publisher->localMsgTypeIdForMsgType(publisher->handle,
"Calculator$add$Invoke", &invokeMsgId);
+ }
+
+ long invokeId = nextInvokeId++;
+ std::lock_guard lck{mutex};
+ auto deferred = factory->deferred<double>();
+ deferreds.emplace(invokeId, deferred);
+ if (invokeMsgId > 0) {
+ Calculator$add$Invoke invoke;
+ invoke.arg1 = a;
+ invoke.arg2 = b;
+ auto* meta = celix_properties_create();
+ celix_properties_setLong(meta, "invoke.id", invokeId);
+ int rc = publisher->send(publisher->handle, invokeMsgId, &invoke,
meta);
+ if (rc != 0) {
+ constexpr auto msg = "error sending invoke msg";
+ logHelper.error(msg);
+ deferred.fail(celix::rsa::RemoteServicesException{msg});
+ }
+ } else {
+ constexpr auto msg = "error getting msg id for invoke msg";
+ logHelper.error(msg);
+ deferred.fail(celix::rsa::RemoteServicesException{msg});
+ }
+ return deferred.getPromise().timeout(INVOKE_TIMEOUT);
+ }
+
+ void receive(const char *msgType, unsigned int msgTypeId, void *msg, const
celix_properties_t* meta) {
+ //setup message ids
+ thread_local unsigned int returnAddMsgId = 0;
+ if (returnAddMsgId == 0 && celix_utils_stringEquals(msgType,
"Calculator$add$Return")) {
+ returnAddMsgId = msgTypeId;
+ }
+
+ //handle incoming messages
+ if (returnAddMsgId != 0 && returnAddMsgId == msgTypeId) {
+ long invokeId = celix_properties_getAsLong(meta, "invoke.id", -1);
+ if (invokeId == -1) {
+ logHelper.error("Cannot find invoke id on metadata");
+ return;
+ }
+ auto* result = static_cast<Calculator$add$Return*>(msg);
+ std::lock_guard lock{mutex};
+ auto it = deferreds.find(invokeId);
+ if (it == end(deferreds)) {
+ logHelper.error("Cannot find deferred for invoke id %li",
invokeId);
+ return;
+ }
+ if (result->optionalReturnValue.len == 1) {
+ it->second.resolve(result->optionalReturnValue.buf[0]);
+ } else {
+ it->second.fail(celix::rsa::RemoteServicesException{"Failed
resolving remote promise"});
+ }
+ deferreds.erase(it);
+ } else {
+ logHelper.warning("Unexpected message type %s", msgType);
+ }
+ }
+
+ void setPublisher(const std::shared_ptr<pubsub_publisher>& pub) {
+ std::lock_guard lock{mutex};
+ publisher = pub;
+ }
+
+ void setPromiseFactory(const std::shared_ptr<celix::PromiseFactory>& fac) {
+ std::lock_guard lock{mutex};
+ factory = fac;
+ }
+private:
+ celix::LogHelper logHelper;
+ std::atomic<long> nextInvokeId{0};
+ std::mutex mutex{}; //protects below
+ std::shared_ptr<celix::PromiseFactory> factory{};
+ std::shared_ptr<pubsub_publisher> publisher{};
+ std::unordered_map<long, celix::Deferred<double>> deferreds{};
+};
+
+/**
+ * A import service guard, which will remove the component if it goes out of
scope.
+ */
+class ComponentImportServiceGuard final : public
celix::rsa::IImportServiceGuard {
+public:
+ ComponentImportServiceGuard(std::shared_ptr<celix::BundleContext> _ctx,
std::string _componentId) : ctx{std::move(_ctx)},
componentId{std::move(_componentId)} {}
+ ~ComponentImportServiceGuard() noexcept override {
+ auto context = ctx.lock();
+ if (context) {
+ context->getDependencyManager()->removeComponentAsync(componentId);
+ } //else already gone
+ }
+private:
+ const std::weak_ptr<celix::BundleContext> ctx;
+ const std::string componentId;
+};
+
+/**
+ * @brief a manual written import service factory for a ICalculator interface
+ */
+class CalculatorImportServiceFactory final : public
celix::rsa::IImportServiceFactory {
+public:
+ explicit
CalculatorImportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) :
ctx{std::move(_ctx)}, logHelper{ctx, "celix::rsa::RemoteServiceFactory"} {}
+
+ std::unique_ptr<celix::rsa::IImportServiceGuard> importService(const
celix::rsa::EndpointDescription& endpoint) override {
+ if (endpoint.getConfigurationTypes() != "pubsub") {
+ ctx->logTrace("skipping endpoint, not pubsub configuration. Found
config '%s'", endpoint.getConfigurationTypes().c_str());
+ return nullptr;
+ }
+
+ auto topic = endpoint.getProperties().get("endpoint.topic");
+ auto scope = endpoint.getProperties().get("endpoint.topic");
+ if (topic.empty() || scope.empty()) {
+ ctx->logError("Cannot import pubsub endpoint. Endpoint does not
have a scope and/or topic");
+ return nullptr;
+ }
+
+ auto componentId = createImportedCalculatorComponent(endpoint);
+ return std::make_unique<ComponentImportServiceGuard>(ctx,
std::move(componentId));
+ }
+private:
+ std::string createImportedCalculatorComponent(const
celix::rsa::EndpointDescription& endpoint) {
+ auto invokeTopic = endpoint.getProperties().get("endpoint.topic") +
"_invoke";
+ auto returnTopic = endpoint.getProperties().get("endpoint.topic") +
"_return";
+ auto scope = endpoint.getProperties().get("endpoint.scope");
+
+ auto& cmp =
ctx->getDependencyManager()->createComponent(std::make_unique<ImportedCalculator>(logHelper));
+
cmp.createServiceDependency<pubsub_publisher>(PUBSUB_PUBLISHER_SERVICE_NAME)
+ .setRequired(true)
+
.setFilter(std::string{"(&(topic="}.append(invokeTopic).append(")(scope=").append(scope).append("))"))
+ .setCallbacks(&ImportedCalculator::setPublisher);
+ cmp.createServiceDependency<celix::PromiseFactory>()
+ .setRequired(true)
+ .setCallbacks(&ImportedCalculator::setPromiseFactory);
+
+ auto subscriber = std::make_shared<pubsub_subscriber_t>();
+ subscriber->handle = &cmp.getInstance();
+ subscriber->receive = [](void *handle, const char *msgType, unsigned
int msgTypeId, void *msg, const celix_properties_t *metadata, bool
*/*release*/) -> int {
+ auto* inst = static_cast<ImportedCalculator*>(handle);
+ try {
+ inst->receive(msgType, msgTypeId, msg, metadata);
+ } catch (...) {
+ return -1;
+ }
+ return 0;
+ };
+ cmp.addContext(subscriber);
+ cmp.createProvidedCService<pubsub_subscriber_t>(subscriber.get(),
PUBSUB_SUBSCRIBER_SERVICE_NAME)
+ .addProperty("topic", returnTopic)
+ .addProperty("scope", scope);
+
+ //Adding the imported service as provide
+ celix::Properties svcProps{};
+ for (const auto& entry : endpoint.getProperties()) {
+ if (strncmp(entry.first.c_str(), "service.exported",
strlen("service.exported")) == 0) {
+ //skip
+ } else {
+ svcProps.set(entry.first, entry.second);
+ }
+ };
+ cmp.createProvidedService<ICalculator>()
+ .setProperties(std::move(svcProps));
+
+ cmp.buildAsync();
+
+ return cmp.getUUID();
+ }
+
+ std::shared_ptr<celix::BundleContext> ctx;
+ celix::LogHelper logHelper;
+ std::mutex mutex{}; //protects below
+};
+
+/**
+ * A ExportedCalculator which acts as a proxy user to an exported remote
service.
+ */
+class ExportedCalculator final {
+public:
+ explicit ExportedCalculator(celix::LogHelper _logHelper) :
logHelper{std::move(_logHelper)} {}
+
+ void receive(const char *msgType, unsigned int msgTypeId, void *msg, const
celix_properties_t* meta) {
+ //setup message ids
+ thread_local unsigned int invokeAddMsgId = 0;
+ if (invokeAddMsgId == 0 && celix_utils_stringEquals(msgType,
"Calculator$add$Invoke")) {
+ invokeAddMsgId = msgTypeId;
+ }
+ thread_local unsigned int returnMsgId = 0;
+ if (returnMsgId == 0) {
+ publisher->localMsgTypeIdForMsgType(publisher->handle,
"Calculator$add$Return", &returnMsgId);
+ }
+
+ //handle incoming messages
+ if (invokeAddMsgId != 0 && invokeAddMsgId == msgTypeId) {
+ auto* invoke = static_cast<Calculator$add$Invoke*>(msg);
+ long invokeId = celix_properties_getAsLong(meta, "invoke.id", -1);
+ if (invokeId == -1) {
+ logHelper.error("Cannot find invoke id on metadata");
+ return;
+ }
+
+ auto* metaProps = celix_properties_create();
+ celix_properties_set(metaProps, "invoke.id",
std::to_string(invokeId).c_str());
+ std::lock_guard lock{mutex};
+ auto promise = calculator->add(invoke->arg1, invoke->arg2);
+ promise
+ .onFailure([svc = calculator, pub = publisher, msgId =
returnMsgId, metaProps](const auto& exp) {
+ //note this lambda makes copies of the
std::shared_ptr<publisher> and svc, so a remove of publisher or service can
only happen after the promise is done
+ Calculator$add$Return ret;
+ ret.optionalReturnValue.buf = nullptr;
+ ret.optionalReturnValue.len = 0;
+ ret.optionalReturnValue.cap = 0;
+ ret.optionalError = celix_utils_strdup(exp.what());
+ pub->send(pub->handle, msgId, &ret, metaProps);
+ })
+ .onSuccess([svc = calculator, pub = publisher, msgId =
returnMsgId, metaProps](auto val) {
+ //note this lambda makes copies of the
std::shared_ptr<publisher> and svc, so a remove of publisher or service can
only happen after the promise is done
+ Calculator$add$Return ret;
+ ret.optionalReturnValue.buf = (double*)
malloc(sizeof(*ret.optionalReturnValue.buf));
+ ret.optionalReturnValue.len = 1;
+ ret.optionalReturnValue.cap = 1;
+ ret.optionalReturnValue.buf[0] = val;
+ ret.optionalError = nullptr;
+ pub->send(pub->handle, msgId, &ret, metaProps);
+ });
+ } else {
+ logHelper.warning("Unexpected message type %s", msgType);
+ }
+ }
+
+ void setPublisher(const std::shared_ptr<pubsub_publisher>& pub) {
+ std::lock_guard lock{mutex};
+ publisher = pub;
+ }
+
+ void setPromiseFactory(const std::shared_ptr<celix::PromiseFactory>& fac) {
+ std::lock_guard lock{mutex};
+ factory = fac;
+ }
+
+ void setICalculator(const std::shared_ptr<ICalculator>& calc) {
+ std::lock_guard lock{mutex};
+ calculator = calc;
+ }
+private:
+ celix::LogHelper logHelper;
+ std::atomic<long> nextInvokeId{0};
+ std::mutex mutex{}; //protects below
+ std::shared_ptr<ICalculator> calculator{};
+ std::shared_ptr<celix::PromiseFactory> factory{};
+ std::shared_ptr<pubsub_publisher> publisher{};
+ std::unordered_map<long, celix::Deferred<double>> deferreds{};
+};
+
+/**
+ * A import service guard, which will remove the component if it goes out of
scope.
+ */
+class ComponentExportServiceGuard final : public
celix::rsa::IExportServiceGuard {
+public:
+ ComponentExportServiceGuard(std::shared_ptr<celix::BundleContext> _ctx,
std::string _componentId) : ctx{std::move(_ctx)},
componentId{std::move(_componentId)} {}
+ ~ComponentExportServiceGuard() noexcept override {
+ auto context = ctx.lock();
+ if (context) {
+ context->getDependencyManager()->removeComponentAsync(componentId);
+ } //else already gone
+ }
+private:
+ const std::weak_ptr<celix::BundleContext> ctx;
+ const std::string componentId;
+};
+
+/**
+ * @brief a manual written export service factory for a ICalculator interface
+ */
+class CalculatorExportServiceFactory final : public
celix::rsa::IExportServiceFactory {
+public:
+ explicit
CalculatorExportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) :
ctx{std::move(_ctx)},
+
logHelper{ctx, "celix::rsa::RemoteServiceFactory"} {}
+
+ std::unique_ptr<celix::rsa::IExportServiceGuard> exportService(const
celix::Properties& serviceProperties) override {
+ auto topic = serviceProperties.get("endpoint.topic");
+ auto scope = serviceProperties.get("endpoint.topic");
+ if (topic.empty() || scope.empty()) {
+ ctx->logError("Cannot export remote service pubsub without
endpoint configuration. Endpoint does not have a scope and/or topic");
+ return nullptr;
+ }
+
+ auto componentId =
createExportedCalculatorComponent(serviceProperties);
+ return std::make_unique<ComponentExportServiceGuard>(ctx,
std::move(componentId));
+ }
+private:
+ std::string createExportedCalculatorComponent(const celix::Properties&
serviceProperties) {
+ auto invokeTopic = serviceProperties.get("endpoint.topic") + "_invoke";
+ auto returnTopic = serviceProperties.get("endpoint.topic") + "_return";
+ auto scope = serviceProperties.get("endpoint.scope");
+ auto svcId = serviceProperties.get(celix::SERVICE_ID);
+
+ auto& cmp =
ctx->getDependencyManager()->createComponent(std::make_unique<ExportedCalculator>(logHelper));
+
cmp.createServiceDependency<pubsub_publisher>(PUBSUB_PUBLISHER_SERVICE_NAME)
+ .setRequired(true)
+
.setFilter(std::string{"(&(topic="}.append(returnTopic).append(")(scope=").append(scope).append("))"))
+ .setCallbacks(&ExportedCalculator::setPublisher);
+
+ cmp.createServiceDependency<celix::PromiseFactory>()
+ .setRequired(true)
+ .setCallbacks(&ExportedCalculator::setPromiseFactory);
+
+ cmp.createServiceDependency<ICalculator>()
+ .setRequired(true)
+
.setFilter(std::string{"("}.append(celix::SERVICE_ID).append("=").append(svcId).append(")"))
+ .setCallbacks(&ExportedCalculator::setICalculator);
+
+ auto subscriber = std::make_shared<pubsub_subscriber_t>();
+ subscriber->handle = &cmp.getInstance();
+ subscriber->receive = [](void *handle, const char *msgType, unsigned
int msgTypeId, void *msg, const celix_properties_t *metadata, bool
*/*release*/) -> int {
+ auto* inst = static_cast<ExportedCalculator*>(handle);
+ try {
+ inst->receive(msgType, msgTypeId, msg, metadata);
+ } catch (...) {
+ return -1;
+ }
+ return 0;
+ };
+ cmp.addContext(subscriber);
+ cmp.createProvidedCService<pubsub_subscriber_t>(subscriber.get(),
PUBSUB_SUBSCRIBER_SERVICE_NAME)
+ .addProperty("topic", invokeTopic)
+ .addProperty("scope", scope);
+
+ cmp.buildAsync();
+
+ return cmp.getUUID();
+ }
+
+ std::shared_ptr<celix::BundleContext> ctx;
+ celix::LogHelper logHelper;
+ std::mutex mutex{}; //protects below
+};
+
+class FactoryActivator {
+public:
+ explicit FactoryActivator(const std::shared_ptr<celix::BundleContext>&
ctx) {
+ ctx->logInfo("Starting TestExportImportRemoteServiceFactory");
+ registrations.emplace_back(
+
ctx->registerService<celix::rsa::IImportServiceFactory>(std::make_shared<CalculatorImportServiceFactory>(ctx))
+
.addProperty(celix::rsa::IImportServiceFactory::TARGET_SERVICE_NAME,
celix::typeName<ICalculator>())
+ .build()
+ );
+ registrations.emplace_back(
+
ctx->registerService<celix::rsa::IExportServiceFactory>(std::make_shared<CalculatorExportServiceFactory>(ctx))
+
.addProperty(celix::rsa::IExportServiceFactory::TARGET_SERVICE_NAME,
celix::typeName<ICalculator>())
+ .build()
+ );
+ registrations.emplace_back(
+ //adding default promise factory with a low service ranking
+
ctx->registerService<celix::PromiseFactory>(std::make_shared<celix::PromiseFactory>())
+ .addProperty(celix::SERVICE_RANKING, -100)
+ .build()
+ );
+ }
+private:
+ std::vector<std::shared_ptr<celix::ServiceRegistration>> registrations{};
+};
+
+CELIX_GEN_CXX_BUNDLE_ACTIVATOR(FactoryActivator)
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_admin.c
b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_admin.c
index fb6206c..09c7ae5 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v1/src/pubsub_zmq_admin.c
@@ -818,6 +818,7 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const
char *commandLine, FILE
celixThreadMutex_unlock(&psa->protocols.mutex);
celixThreadMutex_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
+ free(line);
return status;
}
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
index 9001ae0..5d396e5 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
@@ -851,6 +851,7 @@ bool pubsub_zmqAdmin_executeCommand(void *handle, const
char *commandLine, FILE
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
celixThreadMutex_unlock(&psa->protocols.mutex);
fprintf(out, "\n");
+ free(line);
return status;
}
diff --git a/bundles/pubsub/pubsub_discovery/CMakeLists.txt
b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
index a84ea8e..73235e6 100644
--- a/bundles/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_discovery/CMakeLists.txt
@@ -28,9 +28,10 @@ add_celix_bundle(celix_pubsub_discovery_etcd
)
target_include_directories(celix_pubsub_discovery_etcd PRIVATE src)
target_link_libraries(celix_pubsub_discovery_etcd PRIVATE
- Celix::framework Celix::etcdlib_static
- Celix::shell_api Celix::log_helper
- CURL::libcurl Jansson)
+ Celix::framework Celix::etcdlib_static
+ Celix::shell_api Celix::log_helper
+ CURL::libcurl Jansson
+)
target_link_libraries(celix_pubsub_discovery_etcd PRIVATE Celix::pubsub_spi
Celix::pubsub_utils )
install_celix_bundle(celix_pubsub_discovery_etcd EXPORT celix COMPONENT pubsub)
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
index 6f83cac..b2a78df 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
@@ -141,27 +141,25 @@ double pubsub_utils_matchPublisher(
const char *requested_serializer = celix_properties_get(ep,
PUBSUB_ENDPOINT_SERIALIZER, NULL);
long serializerSvcId = getPSSerializer(ctx, requested_serializer);
- if (serializerSvcId < 0) {
- score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
- }
-
if (outSerializerSvcId != NULL) {
*outSerializerSvcId = serializerSvcId;
}
+ long protocolSvcId = -1;
if (matchProtocol) {
const char *requested_protocol = celix_properties_get(ep,
PUBSUB_ENDPOINT_PROTOCOL, NULL);
- long protocolSvcId = getPSProtocol(ctx, requested_protocol);
-
- if (protocolSvcId < 0) {
- score = PUBSUB_ADMIN_NO_MATCH_SCORE;
- }
-
+ protocolSvcId = getPSProtocol(ctx, requested_protocol);
if (outProtocolSvcId != NULL) {
*outProtocolSvcId = protocolSvcId;
}
}
+ if (serializerSvcId < 0) {
+ score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ } else if (matchProtocol && protocolSvcId < 0) {
+ score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+ }
+
if (outTopicProperties != NULL) {
*outTopicProperties = ep;
} else {
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
b/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
index 857a96e..d008191 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
@@ -601,12 +601,7 @@ pubsub_serialization_provider_t
*pubsub_serializationProvider_create(
provider->freeSerializeMsg = freeSerializeMsg;
provider->deserialize = deserialize;
provider->freeDeserializeMsg = freeDeserializeMsg;
-
-
- {
- provider->logHelper = celix_logHelper_create(ctx,
"celix_pubsub_serialization_provider");
-
- }
+ provider->logHelper = celix_logHelper_create(ctx,
"celix_pubsub_serialization_provider");
dynFunction_logSetup(dfi_log, provider, 1);
dynType_logSetup(dfi_log, provider, 1);
diff --git a/libs/promises/api/celix/impl/SharedPromiseState.h
b/libs/promises/api/celix/impl/SharedPromiseState.h
index c3bbda9..e108392 100644
--- a/libs/promises/api/celix/impl/SharedPromiseState.h
+++ b/libs/promises/api/celix/impl/SharedPromiseState.h
@@ -19,6 +19,7 @@
#pragma once
+#include <type_traits>
#include <functional>
#include <chrono>
#include <mutex>