This is an automated email from the ASF dual-hosted git repository.
stegemr pushed a commit to branch feature/remote_pushstream
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/feature/remote_pushstream by
this push:
new 6f88d8f Initial version of test showing remote pushstreams
6f88d8f is described below
commit 6f88d8f6e1f10659af23de7b6a418c19d87da390
Author: stegemanr <[email protected]>
AuthorDate: Fri Oct 8 10:58:32 2021 +0200
Initial version of test showing remote pushstreams
---
.../cxx_remote_services/integration/CMakeLists.txt | 7 +-
.../integration/gtest/CMakeLists.txt | 2 +-
.../src/RemoteServicesIntegrationTestSuite.cc | 15 ++-
.../integration/include/ICalculator.h | 3 +
.../resources/Calculator$result$Event.descriptor | 9 ++
.../integration/src/CalculatorProvider.cc | 49 ++++++++
.../src/TestExportImportRemoteServiceFactory.cc | 124 ++++++++++++++++++++-
.../api/celix/impl/UnbufferedPushStream.h | 4 +-
8 files changed, 203 insertions(+), 10 deletions(-)
diff --git a/bundles/cxx_remote_services/integration/CMakeLists.txt
b/bundles/cxx_remote_services/integration/CMakeLists.txt
index 8321f00..454c741 100644
--- a/bundles/cxx_remote_services/integration/CMakeLists.txt
+++ b/bundles/cxx_remote_services/integration/CMakeLists.txt
@@ -18,10 +18,11 @@
add_celix_bundle(TestExportImportRemoteServiceFactory
SOURCES src/TestExportImportRemoteServiceFactory.cc
)
-target_link_libraries(TestExportImportRemoteServiceFactory PRIVATE
Celix::rsa_spi Celix::pubsub_api Celix::Promises Celix::log_helper)
+target_link_libraries(TestExportImportRemoteServiceFactory PRIVATE
Celix::rsa_spi Celix::pubsub_api Celix::Promises Celix::PushStreams
Celix::log_helper)
target_include_directories(TestExportImportRemoteServiceFactory PRIVATE
include)
celix_bundle_files(TestExportImportRemoteServiceFactory
resources/Calculator$add$Invoke.descriptor
+ resources/Calculator$result$Event.descriptor
resources/Calculator$add$Return.descriptor
DESTINATION "META-INF/descriptors"
)
@@ -29,14 +30,14 @@ celix_bundle_files(TestExportImportRemoteServiceFactory
add_celix_bundle(CalculatorProvider
SOURCES src/CalculatorProvider.cc
)
-target_link_libraries(CalculatorProvider PRIVATE Celix::Promises)
+target_link_libraries(CalculatorProvider PRIVATE Celix::Promises
Celix::PushStreams)
target_include_directories(CalculatorProvider PRIVATE include)
add_celix_bundle(CalculatorConsumer
SOURCES src/CalculatorConsumer.cc
)
celix_bundle_files(CalculatorConsumer
${CMAKE_CURRENT_SOURCE_DIR}/resources/endpoint_discovery.json DESTINATION
"META-INF/discovery") # add configured discovery as resource in the bundle.
-target_link_libraries(CalculatorConsumer PRIVATE Celix::Promises
Celix::shell_api Celix::RsaConfiguredDiscovery_api)
+target_link_libraries(CalculatorConsumer PRIVATE Celix::Promises
Celix::PushStreams Celix::shell_api Celix::RsaConfiguredDiscovery_api)
target_include_directories(CalculatorConsumer PRIVATE include)
if (ENABLE_TESTING)
diff --git a/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
b/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
index d6a66e9..10438d0 100644
--- a/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
+++ b/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
@@ -18,7 +18,7 @@
add_executable(test_cxx_remote_services_integration
src/RemoteServicesIntegrationTestSuite.cc
)
-target_link_libraries(test_cxx_remote_services_integration PRIVATE
Celix::framework Celix::Promises Celix::shell_api ZMQ::lib CZMQ::lib
GTest::gtest GTest::gtest_main)
+target_link_libraries(test_cxx_remote_services_integration PRIVATE
Celix::framework Celix::Promises Celix::PushStreams Celix::shell_api ZMQ::lib
CZMQ::lib GTest::gtest GTest::gtest_main)
target_include_directories(test_cxx_remote_services_integration PRIVATE
../include) #Add ICalculator
add_celix_bundle_dependencies(test_cxx_remote_services_integration
diff --git
a/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
b/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
index 3012f7e..25d113d 100644
---
a/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
+++
b/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
@@ -115,16 +115,27 @@ TEST_F(RemoteServicesIntegrationTestSuite,
InvokeRemoteCalcService) {
cmd.executeCommand(cmd.handle, "psa_zmq", stdout, stdout);
})
.build();
-
+ std::shared_ptr<celix::PushStream<double>> stream;
//When I call the calculator service from the client, I expect a answer
+ int streamCount = 0;
+ double lastValue = 0.0;
count = clientCtx->useService<ICalculator>()
- .addUseCallback([](auto& calc) {
+ .addUseCallback([&](auto& calc) {
+ stream = calc.result();
+ stream->forEach([&](double event){
+ lastValue = event;
+ streamCount++;
+ });
+
auto promise = calc.add(2, 4);
promise.wait();
EXPECT_TRUE(promise.isSuccessfullyResolved());
if (promise.isSuccessfullyResolved()) {
EXPECT_EQ(6, promise.getValue());
}
+ sleep(1);
+ EXPECT_GE(streamCount,0 );
+ EXPECT_GE(lastValue, 0.0);
})
.build();
EXPECT_EQ(count, 1);
diff --git a/bundles/cxx_remote_services/integration/include/ICalculator.h
b/bundles/cxx_remote_services/integration/include/ICalculator.h
index dd95aee..1d063c5 100644
--- a/bundles/cxx_remote_services/integration/include/ICalculator.h
+++ b/bundles/cxx_remote_services/integration/include/ICalculator.h
@@ -20,10 +20,13 @@
#pragma once
#include "celix/Promise.h"
+#include "celix/PushStream.h"
class ICalculator {
public:
virtual ~ICalculator() noexcept = default;
virtual celix::Promise<double> add(double a, double b) = 0;
+
+ virtual std::shared_ptr<celix::PushStream<double>> result() = 0;
};
diff --git
a/bundles/cxx_remote_services/integration/resources/Calculator$result$Event.descriptor
b/bundles/cxx_remote_services/integration/resources/Calculator$result$Event.descriptor
new file mode 100644
index 0000000..fb1e018
--- /dev/null
+++
b/bundles/cxx_remote_services/integration/resources/Calculator$result$Event.descriptor
@@ -0,0 +1,9 @@
+:header
+type=message
+name=Calculator$result$Event
+version=1.0.0
+:annotations
+classname=Calculator$result$Event
+:types
+:message
+{[Dt optionalReturnValue optionalError}
diff --git a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
index ec4ea86..c6337d3 100644
--- a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
+++ b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
@@ -21,6 +21,7 @@
#include "celix/BundleActivator.h"
#include "celix/PromiseFactory.h"
+#include "celix/PushStreamProvider.h"
#include "ICalculator.h"
class CalculatorImpl final : public ICalculator {
@@ -30,14 +31,57 @@ public:
celix::Promise<double> add(double a, double b) override {
auto deferred = factory->deferred<double>();
deferred.resolve(a+b);
+
return deferred.getPromise();
}
+ void setPushStreamProvider(const
std::shared_ptr<celix::PushStreamProvider>& provider) {
+ psp = provider;
+ }
+
void setFactory(const std::shared_ptr<celix::PromiseFactory>& fac) {
factory = fac;
}
+
+ std::shared_ptr<celix::PushStream<double>> result() override {
+ return psp->createUnbufferedStream<double>(ses);
+ }
+
+ int init() {
+ return CELIX_SUCCESS;
+ }
+
+ int start() {
+ ses = psp->template createSynchronousEventSource<double>();
+
+ t = std::make_unique<std::thread>([&]() {
+ int counter = 0;
+ stopThread = false;
+ while(!stopThread) {
+ ses->publish((double)counter);
+ counter++;
+ std::this_thread::sleep_for(std::chrono::milliseconds{100});
+ }
+ });
+ return CELIX_SUCCESS;
+ }
+
+ int stop() {
+ stopThread = true;
+ t->join();
+ return CELIX_SUCCESS;
+ }
+
+ int deinit() {
+ return CELIX_SUCCESS;
+ }
+
private:
+ std::unique_ptr<std::thread> t{};
std::shared_ptr<celix::PromiseFactory> factory{};
+ std::shared_ptr<celix::PushStreamProvider> psp {};
+ std::shared_ptr<celix::SynchronousPushEventSource<double>> ses {};
+ volatile bool stopThread{false};
};
class CalculatorProviderActivator {
@@ -47,11 +91,16 @@ public:
cmp.createServiceDependency<celix::PromiseFactory>()
.setRequired(true)
.setCallbacks(&CalculatorImpl::setFactory);
+ cmp.createServiceDependency<celix::PushStreamProvider>()
+ .setRequired(true)
+ .setCallbacks(&CalculatorImpl::setPushStreamProvider);
cmp.createProvidedService<ICalculator>()
.addProperty("service.exported.interfaces",
celix::typeName<ICalculator>())
.addProperty("endpoint.topic", "test")
.addProperty("endpoint.scope", "default")
.addProperty("service.exported.intents", "osgi.async");
+
+ cmp.setCallbacks(&CalculatorImpl::init, &CalculatorImpl::start,
&CalculatorImpl::stop, &CalculatorImpl::deinit);
cmp.build();
}
};
diff --git
a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
index de04155..5e8d725 100644
---
a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
+++
b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
@@ -20,6 +20,8 @@
#include <unordered_map>
#include "celix/PromiseFactory.h"
+#include "celix/PushStream.h"
+#include "celix/PushStreamProvider.h"
#include "celix/BundleActivator.h"
#include "celix/rsa/IImportServiceFactory.h"
#include "celix/rsa/IExportServiceFactory.h"
@@ -29,7 +31,7 @@
#include "pubsub/publisher.h"
#include "pubsub/subscriber.h"
-constexpr auto INVOKE_TIMEOUT = std::chrono::seconds{10};
+constexpr auto INVOKE_TIMEOUT = std::chrono::seconds{5}; //TODO make
configurable
struct Calculator$add$Invoke {
double arg1{};
@@ -45,6 +47,15 @@ struct Calculator$add$Return {
char* optionalError{};
};
+struct Calculator$result$Event {
+ struct {
+ uint32_t cap{};
+ uint32_t len{};
+ double* buf{};
+ } optionalReturnValue{};
+ char* optionalError{};
+};
+
/**
* A importedCalculater which acts as a pubsub proxy to a imported remote
service.
*/
@@ -61,6 +72,11 @@ public:
}
}
};
+public:
+
+ std::shared_ptr<celix::PushStream<double>> result() override {
+ return stream;
+ }
celix::Promise<double> add(double a, double b) override {
//setup msg id
@@ -96,12 +112,29 @@ public:
void receive(const char *msgType, unsigned int msgTypeId, void *msg, const
celix_properties_t* meta) {
//setup message ids
thread_local unsigned int returnAddMsgId = 0;
+ thread_local unsigned int streamResultMsgId = 0;
+
if (returnAddMsgId == 0 && celix_utils_stringEquals(msgType,
"Calculator$add$Return")) {
returnAddMsgId = msgTypeId;
}
+ if (streamResultMsgId == 0 && celix_utils_stringEquals(msgType,
"Calculator$result$Event")) {
+ streamResultMsgId = msgTypeId;
+ }
//handle incoming messages
- if (returnAddMsgId != 0 && returnAddMsgId == msgTypeId) {
+ if (streamResultMsgId != 0 && streamResultMsgId == msgTypeId) {
+ long invokeId = celix_properties_getAsLong(meta, "invoke.id", -1);
+ if (invokeId == -1) {
+ logHelper.error("Cannot find invoke id on metadata");
+ return;
+ }
+ auto* result = static_cast<Calculator$result$Event*>(msg);
+ if (result->optionalReturnValue.len == 1) {
+ ses->publish(result->optionalReturnValue.buf[0]);
+ } else {
+ ses->close();
+ }
+ } else if (returnAddMsgId != 0 && returnAddMsgId == msgTypeId) {
long invokeId = celix_properties_getAsLong(meta, "invoke.id", -1);
if (invokeId == -1) {
logHelper.error("Cannot find invoke id on metadata");
@@ -126,6 +159,23 @@ public:
logHelper.warning("Unexpected message type %s", msgType);
}
}
+ int init() {
+ return CELIX_SUCCESS;
+ }
+
+ int start() {
+ ses = psp->createSynchronousEventSource<double>();
+ stream = psp->createStream<double>(ses);
+ return CELIX_SUCCESS;
+ }
+
+ int stop() {
+ return CELIX_SUCCESS;
+ }
+
+ int deinit() {
+ return CELIX_SUCCESS;
+ }
void setPublisher(const std::shared_ptr<pubsub_publisher>& pub) {
std::lock_guard lock{mutex};
@@ -136,6 +186,12 @@ public:
std::lock_guard lock{mutex};
factory = fac;
}
+
+ void setPushStreamProvider(const
std::shared_ptr<celix::PushStreamProvider>& provider) {
+ std::lock_guard lock{mutex};
+ psp = provider;
+ }
+
private:
celix::LogHelper logHelper;
std::atomic<long> nextInvokeId{0};
@@ -143,6 +199,9 @@ private:
std::shared_ptr<celix::PromiseFactory> factory{};
std::shared_ptr<pubsub_publisher> publisher{};
std::unordered_map<long, celix::Deferred<double>> deferreds{};
+ std::shared_ptr<celix::PushStreamProvider> psp {};
+ std::shared_ptr<celix::SynchronousPushEventSource<double>> ses{};
+ std::shared_ptr<celix::PushStream<double>> stream{};
};
/**
@@ -206,6 +265,11 @@ private:
cmp.createServiceDependency<celix::PromiseFactory>()
.setRequired(true)
.setCallbacks(&ImportedCalculator::setPromiseFactory);
+ cmp.createServiceDependency<celix::PushStreamProvider>()
+ .setRequired(true)
+ .setCallbacks(&ImportedCalculator::setPushStreamProvider);
+
+ cmp.setCallbacks(&ImportedCalculator::init,
&ImportedCalculator::start, &ImportedCalculator::stop,
&ImportedCalculator::deinit);
auto subscriber = std::make_shared<pubsub_subscriber_t>();
subscriber->handle = &cmp.getInstance();
@@ -252,7 +316,9 @@ private:
*/
class ExportedCalculator final {
public:
- explicit ExportedCalculator(celix::LogHelper _logHelper) :
logHelper{std::move(_logHelper)} {}
+ explicit ExportedCalculator(celix::LogHelper _logHelper) :
logHelper{std::move(_logHelper)} {
+
+ }
void receive(const char *msgType, unsigned int msgTypeId, void *msg, const
celix_properties_t* meta) {
//setup message ids
@@ -323,6 +389,47 @@ public:
factory = fac;
}
+ int init() {
+ return CELIX_SUCCESS;
+ }
+
+ int start() {
+ resultStream = calculator->result();
+ resultStream->forEach([weakSvc =
std::weak_ptr<ICalculator>{calculator}, weakPub =
std::weak_ptr<pubsub_publisher>{publisher}](const double& event){
+ auto pub = weakPub.lock();
+ auto svc = weakSvc.lock();
+ if (pub && svc) {
+ thread_local unsigned int eventMsgId = 0;
+ if (eventMsgId == 0) {
+ pub->localMsgTypeIdForMsgType(pub->handle,
"Calculator$result$Event", &eventMsgId);
+ }
+
+ auto* metaProps = celix_properties_create();
+ celix_properties_set(metaProps, "invoke.id",
std::to_string(0).c_str());
+ Calculator$result$Event wireEvent;
+ wireEvent.optionalReturnValue.buf = (double *)
malloc(sizeof(*wireEvent.optionalReturnValue.buf));
+ wireEvent.optionalReturnValue.len = 1;
+ wireEvent.optionalReturnValue.cap = 1;
+ wireEvent.optionalReturnValue.buf[0] = event;
+ wireEvent.optionalError = nullptr;
+ pub->send(pub->handle, eventMsgId, &wireEvent, metaProps);
+ free(wireEvent.optionalReturnValue.buf);
+ } else {
+ //TODO error handling
+ }
+ });
+ return CELIX_SUCCESS;
+ }
+
+ int stop() {
+ resultStream->close();
+ return CELIX_SUCCESS;
+ }
+
+ int deinit() {
+ return CELIX_SUCCESS;
+ }
+
void setICalculator(const std::shared_ptr<ICalculator>& calc) {
std::lock_guard lock{mutex};
calculator = calc;
@@ -335,6 +442,7 @@ private:
std::shared_ptr<celix::PromiseFactory> factory{};
std::shared_ptr<pubsub_publisher> publisher{};
std::unordered_map<long, celix::Deferred<double>> deferreds{};
+ std::shared_ptr<celix::PushStream<double>> resultStream{};
};
/**
@@ -412,6 +520,9 @@ private:
.setFilter(std::string{"("}.append(celix::SERVICE_ID).append("=").append(svcId).append(")"))
.setCallbacks(&ExportedCalculator::setICalculator);
+ cmp.setCallbacks(&ExportedCalculator::init,
&ExportedCalculator::start, &ExportedCalculator::stop,
&ExportedCalculator::deinit);
+
+
auto subscriber = std::make_shared<pubsub_subscriber_t>();
subscriber->handle = &cmp.getInstance();
subscriber->receive = [](void *handle, const char *msgType, unsigned
int msgTypeId, void *msg, const celix_properties_t *metadata, bool
*/*release*/) -> int {
@@ -464,6 +575,13 @@ public:
.addProperty(celix::SERVICE_RANKING, -100)
.build()
);
+
+ registrations.emplace_back(
+ //adding default promise factory with a low service ranking
+
ctx->registerService<celix::PushStreamProvider>(std::make_shared<celix::PushStreamProvider>())
+ .addProperty(celix::SERVICE_RANKING, -100)
+ .build()
+ );
}
private:
std::vector<std::shared_ptr<celix::ServiceRegistration>> registrations{};
diff --git a/libs/pushstreams/api/celix/impl/UnbufferedPushStream.h
b/libs/pushstreams/api/celix/impl/UnbufferedPushStream.h
index 1b17ec7..c0cde8a 100644
--- a/libs/pushstreams/api/celix/impl/UnbufferedPushStream.h
+++ b/libs/pushstreams/api/celix/impl/UnbufferedPushStream.h
@@ -63,7 +63,9 @@ bool celix::UnbufferedPushStream<T>::begin() {
template<typename T>
void celix::UnbufferedPushStream<T>::upstreamClose(const PushEvent<T>&
/*event*/) {
- toClose->close();
+ if (toClose) {
+ toClose->close();
+ }
}