This is an automated email from the ASF dual-hosted git repository.

stegemr pushed a commit to branch feature/remote_pushstream
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/feature/remote_pushstream by 
this push:
     new 6f88d8f  Initial version of test showing remote pushstreams
6f88d8f is described below

commit 6f88d8f6e1f10659af23de7b6a418c19d87da390
Author: stegemanr <[email protected]>
AuthorDate: Fri Oct 8 10:58:32 2021 +0200

    Initial version of test showing remote pushstreams
---
 .../cxx_remote_services/integration/CMakeLists.txt |   7 +-
 .../integration/gtest/CMakeLists.txt               |   2 +-
 .../src/RemoteServicesIntegrationTestSuite.cc      |  15 ++-
 .../integration/include/ICalculator.h              |   3 +
 .../resources/Calculator$result$Event.descriptor   |   9 ++
 .../integration/src/CalculatorProvider.cc          |  49 ++++++++
 .../src/TestExportImportRemoteServiceFactory.cc    | 124 ++++++++++++++++++++-
 .../api/celix/impl/UnbufferedPushStream.h          |   4 +-
 8 files changed, 203 insertions(+), 10 deletions(-)

diff --git a/bundles/cxx_remote_services/integration/CMakeLists.txt 
b/bundles/cxx_remote_services/integration/CMakeLists.txt
index 8321f00..454c741 100644
--- a/bundles/cxx_remote_services/integration/CMakeLists.txt
+++ b/bundles/cxx_remote_services/integration/CMakeLists.txt
@@ -18,10 +18,11 @@
 add_celix_bundle(TestExportImportRemoteServiceFactory
         SOURCES src/TestExportImportRemoteServiceFactory.cc
 )
-target_link_libraries(TestExportImportRemoteServiceFactory PRIVATE 
Celix::rsa_spi Celix::pubsub_api Celix::Promises Celix::log_helper)
+target_link_libraries(TestExportImportRemoteServiceFactory PRIVATE 
Celix::rsa_spi Celix::pubsub_api Celix::Promises Celix::PushStreams 
Celix::log_helper)
 target_include_directories(TestExportImportRemoteServiceFactory PRIVATE 
include)
 celix_bundle_files(TestExportImportRemoteServiceFactory
         resources/Calculator$add$Invoke.descriptor
+        resources/Calculator$result$Event.descriptor
         resources/Calculator$add$Return.descriptor
         DESTINATION "META-INF/descriptors"
 )
@@ -29,14 +30,14 @@ celix_bundle_files(TestExportImportRemoteServiceFactory
 add_celix_bundle(CalculatorProvider
         SOURCES src/CalculatorProvider.cc
 )
-target_link_libraries(CalculatorProvider PRIVATE Celix::Promises)
+target_link_libraries(CalculatorProvider PRIVATE Celix::Promises 
Celix::PushStreams)
 target_include_directories(CalculatorProvider PRIVATE include)
 
 add_celix_bundle(CalculatorConsumer
         SOURCES src/CalculatorConsumer.cc
 )
 celix_bundle_files(CalculatorConsumer 
${CMAKE_CURRENT_SOURCE_DIR}/resources/endpoint_discovery.json DESTINATION 
"META-INF/discovery") # add configured discovery as resource in the bundle.
-target_link_libraries(CalculatorConsumer PRIVATE Celix::Promises 
Celix::shell_api Celix::RsaConfiguredDiscovery_api)
+target_link_libraries(CalculatorConsumer PRIVATE Celix::Promises 
Celix::PushStreams Celix::shell_api Celix::RsaConfiguredDiscovery_api)
 target_include_directories(CalculatorConsumer PRIVATE include)
 
 if (ENABLE_TESTING)
diff --git a/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt 
b/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
index d6a66e9..10438d0 100644
--- a/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
+++ b/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
@@ -18,7 +18,7 @@
 add_executable(test_cxx_remote_services_integration
     src/RemoteServicesIntegrationTestSuite.cc
 )
-target_link_libraries(test_cxx_remote_services_integration PRIVATE 
Celix::framework Celix::Promises Celix::shell_api ZMQ::lib CZMQ::lib 
GTest::gtest GTest::gtest_main)
+target_link_libraries(test_cxx_remote_services_integration PRIVATE 
Celix::framework Celix::Promises Celix::PushStreams Celix::shell_api ZMQ::lib 
CZMQ::lib GTest::gtest GTest::gtest_main)
 target_include_directories(test_cxx_remote_services_integration PRIVATE 
../include) #Add ICalculator
 
 add_celix_bundle_dependencies(test_cxx_remote_services_integration
diff --git 
a/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
 
b/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
index 3012f7e..25d113d 100644
--- 
a/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
+++ 
b/bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc
@@ -115,16 +115,27 @@ TEST_F(RemoteServicesIntegrationTestSuite, 
InvokeRemoteCalcService) {
                 cmd.executeCommand(cmd.handle, "psa_zmq", stdout, stdout);
             })
             .build();
-
+    std::shared_ptr<celix::PushStream<double>> stream;
     //When I call the calculator service from the client, I expect a answer
+    int streamCount = 0;
+    double lastValue = 0.0;
     count = clientCtx->useService<ICalculator>()
-            .addUseCallback([](auto& calc) {
+            .addUseCallback([&](auto& calc) {
+                stream = calc.result();
+                stream->forEach([&](double event){
+                    lastValue = event;
+                    streamCount++;
+                });
+
                 auto promise = calc.add(2, 4);
                 promise.wait();
                 EXPECT_TRUE(promise.isSuccessfullyResolved());
                 if (promise.isSuccessfullyResolved()) {
                     EXPECT_EQ(6, promise.getValue());
                 }
+                sleep(1);
+                EXPECT_GE(streamCount,0 );
+                EXPECT_GE(lastValue, 0.0);
             })
             .build();
     EXPECT_EQ(count, 1);
diff --git a/bundles/cxx_remote_services/integration/include/ICalculator.h 
b/bundles/cxx_remote_services/integration/include/ICalculator.h
index dd95aee..1d063c5 100644
--- a/bundles/cxx_remote_services/integration/include/ICalculator.h
+++ b/bundles/cxx_remote_services/integration/include/ICalculator.h
@@ -20,10 +20,13 @@
 #pragma once
 
 #include "celix/Promise.h"
+#include "celix/PushStream.h"
 
 class ICalculator {
 public:
     virtual ~ICalculator() noexcept = default;
 
     virtual celix::Promise<double> add(double a, double b) = 0;
+
+    virtual std::shared_ptr<celix::PushStream<double>> result() = 0;
 };
diff --git 
a/bundles/cxx_remote_services/integration/resources/Calculator$result$Event.descriptor
 
b/bundles/cxx_remote_services/integration/resources/Calculator$result$Event.descriptor
new file mode 100644
index 0000000..fb1e018
--- /dev/null
+++ 
b/bundles/cxx_remote_services/integration/resources/Calculator$result$Event.descriptor
@@ -0,0 +1,9 @@
+:header
+type=message
+name=Calculator$result$Event
+version=1.0.0
+:annotations
+classname=Calculator$result$Event
+:types
+:message
+{[Dt optionalReturnValue optionalError}
diff --git a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc 
b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
index ec4ea86..c6337d3 100644
--- a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
+++ b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
@@ -21,6 +21,7 @@
 
 #include "celix/BundleActivator.h"
 #include "celix/PromiseFactory.h"
+#include "celix/PushStreamProvider.h"
 #include "ICalculator.h"
 
 class CalculatorImpl final : public ICalculator {
@@ -30,14 +31,57 @@ public:
     celix::Promise<double> add(double a, double b) override {
         auto deferred = factory->deferred<double>();
         deferred.resolve(a+b);
+
         return deferred.getPromise();
     }
 
+    void setPushStreamProvider(const 
std::shared_ptr<celix::PushStreamProvider>& provider) {
+        psp = provider;
+    }
+
     void setFactory(const std::shared_ptr<celix::PromiseFactory>& fac) {
         factory = fac;
     }
+
+    std::shared_ptr<celix::PushStream<double>> result() override {
+        return psp->createUnbufferedStream<double>(ses);
+    }
+
+    int init() {
+        return CELIX_SUCCESS;
+    }
+
+    int start() {
+        ses = psp->template createSynchronousEventSource<double>();
+
+        t = std::make_unique<std::thread>([&]() {
+            int counter = 0;
+            stopThread = false;
+            while(!stopThread) {
+                ses->publish((double)counter);
+                counter++;
+                std::this_thread::sleep_for(std::chrono::milliseconds{100});
+            }
+        });
+        return CELIX_SUCCESS;
+    }
+
+    int stop() {
+        stopThread = true;
+        t->join();
+        return CELIX_SUCCESS;
+    }
+
+    int deinit() {
+        return CELIX_SUCCESS;
+    }
+
 private:
+    std::unique_ptr<std::thread> t{};
     std::shared_ptr<celix::PromiseFactory> factory{};
+    std::shared_ptr<celix::PushStreamProvider> psp {};
+    std::shared_ptr<celix::SynchronousPushEventSource<double>> ses {};
+    volatile bool stopThread{false};
 };
 
 class CalculatorProviderActivator {
@@ -47,11 +91,16 @@ public:
         cmp.createServiceDependency<celix::PromiseFactory>()
                 .setRequired(true)
                 .setCallbacks(&CalculatorImpl::setFactory);
+        cmp.createServiceDependency<celix::PushStreamProvider>()
+            .setRequired(true)
+            .setCallbacks(&CalculatorImpl::setPushStreamProvider);
         cmp.createProvidedService<ICalculator>()
                 .addProperty("service.exported.interfaces", 
celix::typeName<ICalculator>())
                 .addProperty("endpoint.topic", "test")
                 .addProperty("endpoint.scope", "default")
                 .addProperty("service.exported.intents", "osgi.async");
+
+        cmp.setCallbacks(&CalculatorImpl::init, &CalculatorImpl::start, 
&CalculatorImpl::stop, &CalculatorImpl::deinit);
         cmp.build();
     }
 };
diff --git 
a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
 
b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
index de04155..5e8d725 100644
--- 
a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
+++ 
b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
@@ -20,6 +20,8 @@
 #include <unordered_map>
 
 #include "celix/PromiseFactory.h"
+#include "celix/PushStream.h"
+#include "celix/PushStreamProvider.h"
 #include "celix/BundleActivator.h"
 #include "celix/rsa/IImportServiceFactory.h"
 #include "celix/rsa/IExportServiceFactory.h"
@@ -29,7 +31,7 @@
 #include "pubsub/publisher.h"
 #include "pubsub/subscriber.h"
 
-constexpr auto INVOKE_TIMEOUT = std::chrono::seconds{10};
+constexpr auto INVOKE_TIMEOUT = std::chrono::seconds{5}; //TODO make 
configurable
 
 struct Calculator$add$Invoke {
     double arg1{};
@@ -45,6 +47,15 @@ struct Calculator$add$Return {
     char* optionalError{};
 };
 
+struct Calculator$result$Event {
+    struct {
+        uint32_t cap{};
+        uint32_t len{};
+        double* buf{};
+    } optionalReturnValue{};
+    char* optionalError{};
+};
+
 /**
  * A importedCalculater which acts as a pubsub proxy to a imported remote 
service.
  */
@@ -61,6 +72,11 @@ public:
             }
         }
     };
+public:
+
+    std::shared_ptr<celix::PushStream<double>> result() override {
+        return stream;
+    }
 
     celix::Promise<double> add(double a, double b) override {
         //setup msg id
@@ -96,12 +112,29 @@ public:
     void receive(const char *msgType, unsigned int msgTypeId, void *msg, const 
celix_properties_t* meta) {
         //setup message ids
         thread_local unsigned int returnAddMsgId = 0;
+        thread_local unsigned int streamResultMsgId = 0;
+
         if (returnAddMsgId == 0 && celix_utils_stringEquals(msgType, 
"Calculator$add$Return")) {
             returnAddMsgId = msgTypeId;
         }
+        if (streamResultMsgId == 0 && celix_utils_stringEquals(msgType, 
"Calculator$result$Event")) {
+            streamResultMsgId = msgTypeId;
+        }
 
         //handle incoming messages
-        if (returnAddMsgId != 0 && returnAddMsgId == msgTypeId) {
+        if (streamResultMsgId != 0 && streamResultMsgId == msgTypeId) {
+            long invokeId = celix_properties_getAsLong(meta, "invoke.id", -1);
+            if (invokeId == -1) {
+                logHelper.error("Cannot find invoke id on metadata");
+                return;
+            }
+            auto* result = static_cast<Calculator$result$Event*>(msg);
+            if (result->optionalReturnValue.len == 1) {
+                ses->publish(result->optionalReturnValue.buf[0]);
+            } else {
+                ses->close();
+            }
+        } else if (returnAddMsgId != 0 && returnAddMsgId == msgTypeId) {
             long invokeId = celix_properties_getAsLong(meta, "invoke.id", -1);
             if (invokeId == -1) {
                 logHelper.error("Cannot find invoke id on metadata");
@@ -126,6 +159,23 @@ public:
             logHelper.warning("Unexpected message type %s", msgType);
         }
     }
+    int init() {
+        return CELIX_SUCCESS;
+    }
+
+    int start() {
+        ses = psp->createSynchronousEventSource<double>();
+        stream = psp->createStream<double>(ses);
+        return CELIX_SUCCESS;
+    }
+
+    int stop() {
+        return CELIX_SUCCESS;
+    }
+
+    int deinit() {
+        return CELIX_SUCCESS;
+    }
 
     void setPublisher(const std::shared_ptr<pubsub_publisher>& pub) {
         std::lock_guard lock{mutex};
@@ -136,6 +186,12 @@ public:
         std::lock_guard lock{mutex};
         factory = fac;
     }
+
+    void setPushStreamProvider(const 
std::shared_ptr<celix::PushStreamProvider>& provider) {
+        std::lock_guard lock{mutex};
+        psp = provider;
+    }
+
 private:
     celix::LogHelper logHelper;
     std::atomic<long> nextInvokeId{0};
@@ -143,6 +199,9 @@ private:
     std::shared_ptr<celix::PromiseFactory> factory{};
     std::shared_ptr<pubsub_publisher> publisher{};
     std::unordered_map<long, celix::Deferred<double>> deferreds{};
+    std::shared_ptr<celix::PushStreamProvider> psp {};
+    std::shared_ptr<celix::SynchronousPushEventSource<double>> ses{};
+    std::shared_ptr<celix::PushStream<double>> stream{};
 };
 
 /**
@@ -206,6 +265,11 @@ private:
         cmp.createServiceDependency<celix::PromiseFactory>()
                 .setRequired(true)
                 .setCallbacks(&ImportedCalculator::setPromiseFactory);
+        cmp.createServiceDependency<celix::PushStreamProvider>()
+                .setRequired(true)
+                .setCallbacks(&ImportedCalculator::setPushStreamProvider);
+
+        cmp.setCallbacks(&ImportedCalculator::init, 
&ImportedCalculator::start, &ImportedCalculator::stop, 
&ImportedCalculator::deinit);
 
         auto subscriber = std::make_shared<pubsub_subscriber_t>();
         subscriber->handle = &cmp.getInstance();
@@ -252,7 +316,9 @@ private:
  */
 class ExportedCalculator final {
 public:
-    explicit ExportedCalculator(celix::LogHelper _logHelper) : 
logHelper{std::move(_logHelper)} {}
+    explicit ExportedCalculator(celix::LogHelper _logHelper) : 
logHelper{std::move(_logHelper)} {
+
+    }
 
     void receive(const char *msgType, unsigned int msgTypeId, void *msg, const 
celix_properties_t* meta) {
         //setup message ids
@@ -323,6 +389,47 @@ public:
         factory = fac;
     }
 
+    int init() {
+        return CELIX_SUCCESS;
+    }
+
+    int start() {
+        resultStream = calculator->result();
+        resultStream->forEach([weakSvc = 
std::weak_ptr<ICalculator>{calculator}, weakPub = 
std::weak_ptr<pubsub_publisher>{publisher}](const double& event){
+            auto pub = weakPub.lock();
+            auto svc = weakSvc.lock();
+            if (pub && svc) {
+                thread_local unsigned int eventMsgId = 0;
+                if (eventMsgId == 0) {
+                    pub->localMsgTypeIdForMsgType(pub->handle, 
"Calculator$result$Event", &eventMsgId);
+                }
+
+                auto* metaProps = celix_properties_create();
+                celix_properties_set(metaProps, "invoke.id", 
std::to_string(0).c_str());
+                Calculator$result$Event wireEvent;
+                wireEvent.optionalReturnValue.buf = (double *) 
malloc(sizeof(*wireEvent.optionalReturnValue.buf));
+                wireEvent.optionalReturnValue.len = 1;
+                wireEvent.optionalReturnValue.cap = 1;
+                wireEvent.optionalReturnValue.buf[0] = event;
+                wireEvent.optionalError = nullptr;
+                pub->send(pub->handle, eventMsgId, &wireEvent, metaProps);
+                free(wireEvent.optionalReturnValue.buf);
+            } else {
+                //TODO error handling
+            }
+        });
+        return CELIX_SUCCESS;
+    }
+
+    int stop() {
+        resultStream->close();
+        return CELIX_SUCCESS;
+    }
+
+    int deinit() {
+        return CELIX_SUCCESS;
+    }
+
     void setICalculator(const std::shared_ptr<ICalculator>& calc) {
         std::lock_guard lock{mutex};
         calculator = calc;
@@ -335,6 +442,7 @@ private:
     std::shared_ptr<celix::PromiseFactory> factory{};
     std::shared_ptr<pubsub_publisher> publisher{};
     std::unordered_map<long, celix::Deferred<double>> deferreds{};
+    std::shared_ptr<celix::PushStream<double>> resultStream{};
 };
 
 /**
@@ -412,6 +520,9 @@ private:
                 
.setFilter(std::string{"("}.append(celix::SERVICE_ID).append("=").append(svcId).append(")"))
                 .setCallbacks(&ExportedCalculator::setICalculator);
 
+        cmp.setCallbacks(&ExportedCalculator::init, 
&ExportedCalculator::start, &ExportedCalculator::stop, 
&ExportedCalculator::deinit);
+
+
         auto subscriber = std::make_shared<pubsub_subscriber_t>();
         subscriber->handle = &cmp.getInstance();
         subscriber->receive = [](void *handle, const char *msgType, unsigned 
int msgTypeId, void *msg, const celix_properties_t *metadata, bool 
*/*release*/) ->  int {
@@ -464,6 +575,13 @@ public:
                         .addProperty(celix::SERVICE_RANKING, -100)
                         .build()
         );
+
+        registrations.emplace_back(
+                //adding default promise factory with a low service ranking
+                
ctx->registerService<celix::PushStreamProvider>(std::make_shared<celix::PushStreamProvider>())
+                .addProperty(celix::SERVICE_RANKING, -100)
+                .build()
+                );
     }
 private:
     std::vector<std::shared_ptr<celix::ServiceRegistration>> registrations{};
diff --git a/libs/pushstreams/api/celix/impl/UnbufferedPushStream.h 
b/libs/pushstreams/api/celix/impl/UnbufferedPushStream.h
index 1b17ec7..c0cde8a 100644
--- a/libs/pushstreams/api/celix/impl/UnbufferedPushStream.h
+++ b/libs/pushstreams/api/celix/impl/UnbufferedPushStream.h
@@ -63,7 +63,9 @@ bool celix::UnbufferedPushStream<T>::begin() {
 
 template<typename T>
 void celix::UnbufferedPushStream<T>::upstreamClose(const PushEvent<T>& 
/*event*/) {
-    toClose->close();
+    if (toClose) {
+        toClose->close();
+    }
 }
 
 

Reply via email to