pnoltes commented on a change in pull request #363:
URL: https://github.com/apache/celix/pull/363#discussion_r713988963



##########
File path: libs/pushstreams/CMakeLists.txt
##########
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+if (NOT COMMAND celix_subproject)
+    #If COMMAND celix_subproject is not defined, this CMakeLists will
+    #act as a top level project. Making the Celix::CelixPushStreams useable
+    #stand-alone
+
+    cmake_minimum_required (VERSION 3.11)
+    project(celix_PushStreams
+            VERSION 1.0.0
+            LANGUAGES CXX
+    )
+
+    include(GNUInstallDirs)
+
+    set(CMAKE_CXX_FLAGS "-std=c++17 ${CMAKE_CXX_FLAGS}")
+    set(CMAKE_CXX_FLAGS_DEBUG "-g -DDEBUG ${CMAKE_CXX_FLAGS_DEBUG}")
+
+    set(PUSHSTREAMS_STANDALONE ON)
+else ()
+    set(PUSHSTREAMS_DEFAULT_ON ${CELIX_CXX})
+    celix_subproject(PUSHSTREAMS "Option to build the PushStreams library" 
${PUSHSTREAMS_DEFAULT_ON})
+endif ()
+
+if (PUSHSTREAMS OR PUSHSTREAMS_STANDALONE)
+    find_package(Threads)
+
+    add_library(PushStreams INTERFACE)
+    target_include_directories(PushStreams INTERFACE
+        $<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/api>
+        $<INSTALL_INTERFACE:include/celix/pushstreams>
+    )
+    target_link_libraries(PushStreams INTERFACE Threads::Threads)
+    target_link_libraries(PushStreams INTERFACE Celix::Promises)
+    add_library(Celix::PushStreams ALIAS PushStreams)
+
+    add_executable(PushStreamExamples src/PushStreamExamples.cc)
+    target_compile_options(PushStreamExamples PRIVATE -std=c++17)
+    target_link_libraries(PushStreamExamples PRIVATE Celix::PushStreams)
+
+    if (ENABLE_TESTING AND NOT PUSHSTREAMS_STANDALONE)
+        add_subdirectory(gtest)
+    endif()
+
+    install(TARGETS PushStreams EXPORT celix DESTINATION 
${CMAKE_INSTALL_LIBDIR})
+    install(DIRECTORY api/ DESTINATION include/celix/pushstreams)
+
+    if (PUSHSTREAMS_STANDALONE)

Review comment:
       The cmake/CelixPushStreamConfig(Version).cmake files are missing to make 
this library build-able for standalone

##########
File path: libs/pushstreams/src/PushStreamExamples.cc
##########
@@ -0,0 +1,25 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include <iostream>
+
+int main() {
+    std::cout << "PushStream Example" << std::endl;

Review comment:
       maybe remove the example, until we have something with a bit more 'meat' 
;)

##########
File path: libs/pushstreams/gtest/src/PushStreamTestSuite.cc
##########
@@ -0,0 +1,695 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celix/PushStreamProvider.h"
+
+using celix::PushStreamProvider;
+
+class TestException : public std::exception {
+public:
+    explicit TestException(const char* what) : w{what} {}
+    explicit TestException(std::string what) : w{std::move(what)} {}
+
+    TestException(const TestException&) = delete;
+    TestException(TestException&&) noexcept = default;
+
+    TestException& operator=(const TestException&) = delete;
+    TestException& operator=(TestException&&) noexcept = default;
+
+    [[nodiscard]] const char* what() const noexcept override { return 
w.c_str(); }
+private:
+    std::string w;
+};
+
+class EventObject {
+public:
+    EventObject() : val{0} {
+    }
+
+    explicit EventObject(int _val) : val{_val} {
+    }
+
+    EventObject(const EventObject& _val) = default;
+    EventObject& operator=(const EventObject& other) = default;
+
+    EventObject& operator=(int other) {
+        val = other;
+        return *this;
+    };
+
+    friend EventObject operator+(const EventObject &eo1, const EventObject  
&eo2) {
+        return EventObject{eo1.val + eo2.val};
+    }
+    friend int operator+(const int &eo1, const EventObject  &eo2) {
+        return eo1 + eo2.val;
+    }
+    friend int operator+(const EventObject &eo1, const int  &eo2) {
+        return eo1.val + eo2;
+    }
+    int val;
+};
+
+class PushStreamTestSuite : public ::testing::Test {
+public:
+    ~PushStreamTestSuite() noexcept override = default;
+
+    PushStreamProvider psp {};
+    std::unique_ptr<std::thread> t{};
+
+    std::shared_ptr<celix::IExecutor> executor 
{std::make_shared<celix::DefaultExecutor>()};
+    celix::PromiseFactory promiseFactory {executor};
+    celix::Deferred<void> done{promiseFactory.deferred<void>()};
+    celix::Promise<void> donepromise = done.getPromise();
+
+    template <typename T>
+    std::shared_ptr<celix::SynchronousPushEventSource<T>> createEventSource(T 
event, int publishCount, bool autoinc = false) {
+        auto ses = psp.template createSynchronousEventSource<T>();
+
+        auto successLambda = [this, ses, event, publishCount, 
autoinc](celix::Promise<void> p) -> celix::Promise<void> {
+            t = std::make_unique<std::thread>([&, event, publishCount, 
autoinc]() {
+                int counter = 0;
+                T data {event};
+                // Keep going as long as someone is listening
+                while (counter < publishCount) {
+                    ses->publish(data);
+                    if (autoinc) {
+                        data = data + 1;
+                    }
+                    counter++;
+                }
+            });
+
+            t->join();
+            ses->close();
+            done.resolve();
+            return p;
+        };
+
+        auto x = ses->connectPromise().template then<void>(successLambda);
+
+        return ses;
+    }
+};
+
+TEST_F(PushStreamTestSuite, EventSourceCloseTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+
+    auto ses = psp.template createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().then<void>(successLambda);
+    auto stream = psp.createUnbufferedStream<int>(ses);
+
+    auto streamEnded = stream->onClose([&](){
+        onClosedReceived++;
+    }).onError([&](){
+        onErrorReceived++;
+    }).forEach([&](int /*event*/) { });
+
+    ses->close();
+
+    streamEnded.wait();
+
+    ASSERT_EQ(1, onClosedReceived);
+    ASSERT_EQ(0, onErrorReceived);
+}
+
+TEST_F(PushStreamTestSuite, ChainedEventSourceCloseTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+
+    auto ses = psp.template createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().then<void>(successLambda);
+
+    auto stream = psp.createUnbufferedStream<int>(ses);
+    auto& filteredStream = stream->filter([](const int& /*event*/) -> bool {
+            return true;
+        }).onClose([&](){
+            onClosedReceived++;
+        }).onError([&](){
+            onErrorReceived++;
+        });
+
+    auto streamEnded = filteredStream.forEach([&](int /*event*/) { });
+
+    ses->close();
+
+    streamEnded.wait();
+
+    ASSERT_EQ(1, onClosedReceived);
+    ASSERT_EQ(0, onErrorReceived);
+}
+
+
+TEST_F(PushStreamTestSuite, StreamCloseTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+
+    auto ses = psp.createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().then<void>(successLambda);
+
+    auto stream = psp.createUnbufferedStream<int>(ses);
+    auto streamEnded = stream->onClose([&](){
+        onClosedReceived++;
+    }).onError([&](){
+        onErrorReceived++;
+    }).forEach([&](int /*event*/) { });
+
+    stream->close();
+
+    streamEnded.wait();
+
+    ASSERT_EQ(1, onClosedReceived);
+    ASSERT_EQ(0, onErrorReceived);
+}
+
+TEST_F(PushStreamTestSuite, PublishAfterStreamCloseTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+    int onEventReceived{0};
+
+    auto ses = psp.createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().then<void>(successLambda);
+
+    auto stream = psp.createUnbufferedStream<int>(ses);
+    auto streamEnded = stream->onClose([&](){
+        onClosedReceived++;
+    }).onError([&](){
+        onErrorReceived++;
+    }).forEach([&](int /*event*/) {
+        onEventReceived++;
+    });
+
+    stream->close();
+    ses->publish(1);
+
+    streamEnded.wait();
+
+    ASSERT_EQ(1, onClosedReceived);
+    ASSERT_EQ(0, onErrorReceived);
+    ASSERT_EQ(0, onEventReceived);
+}
+
+
+TEST_F(PushStreamTestSuite, ChainedStreamCloseTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+
+    auto ses = psp.template createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().template then<void>(successLambda);
+
+    auto stream = psp.createUnbufferedStream<int>(ses);
+    auto streamEnded = stream->
+            filter([](const int& /*event*/) -> bool {
+                return true;
+            }).onClose([&](){
+                onClosedReceived++;
+            }).onError([&](){
+                onErrorReceived++;
+            }).forEach([&](int /*event*/) { });
+
+    stream->close();
+
+    streamEnded.wait();
+
+    ASSERT_EQ(1, onClosedReceived);
+    ASSERT_EQ(0, onErrorReceived);
+}
+
+TEST_F(PushStreamTestSuite, ChainedStreamIntermedateCloseTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+
+    auto ses = psp.template createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().template then<void>(successLambda);
+
+    auto stream1 = psp.createUnbufferedStream<int>(ses);
+    stream1->onClose([&](){
+        onClosedReceived++;
+    });
+    auto& stream2 = stream1->filter([](const int& /*event*/) -> bool {
+                return true;
+            }).onClose([&](){
+                onClosedReceived++;
+            }).onError([&](){
+                onErrorReceived++;
+            });
+    auto streamEnded = stream2.forEach([&](int /*event*/) { });
+
+    stream1->close();
+
+    streamEnded.wait();
+
+    ASSERT_EQ(2, onClosedReceived);
+    ASSERT_EQ(0, onErrorReceived);
+
+}
+
+TEST_F(PushStreamTestSuite, ExceptionInStreamTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+
+    auto ses = psp.createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().then<void>(successLambda);
+
+    auto stream = psp.createUnbufferedStream<int>(ses);
+    auto streamEnded = stream->onClose([&](){
+        onClosedReceived++;
+    }).onError([&](){
+        onErrorReceived++;
+    }).forEach([&](int /*event*/) {
+        throw TestException("Oops");
+    });
+
+    ses->publish(1);
+
+    streamEnded.wait();
+
+    ASSERT_EQ(1, onClosedReceived);
+    ASSERT_EQ(1, onErrorReceived);
+}
+
+TEST_F(PushStreamTestSuite, ExceptionInChainedStreamTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+    auto ses = psp.template createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().template then<void>(successLambda);
+
+    auto stream = psp.createUnbufferedStream<int>(ses);
+
+    auto streamEnded = stream->filter([](const int& /*event*/) -> bool {
+                return true;
+            }).onClose([&](){
+                onClosedReceived++;
+            }).onError([&](){
+                onErrorReceived++;
+            }).forEach([&](int /*event*/) {
+        throw TestException("Oops");
+    });
+
+    ses->publish(1);
+
+    streamEnded.wait();
+
+    ASSERT_EQ(1, onClosedReceived);
+    ASSERT_EQ(1, onErrorReceived);
+}
+
+//
+///
+/// forEach test
+///
+TEST_F(PushStreamTestSuite, ForEachTestBasicType) {
+    int consumeCount{0};
+    int consumeSum{0};
+    int lastConsumed{-1};
+    auto ses = createEventSource<int>(0, 10'000, true);
+
+    auto stream = psp.createUnbufferedStream<int>(ses);
+    auto streamEnded = stream->
+            forEach([&](int event) {
+                GTEST_ASSERT_EQ(lastConsumed + 1, event);
+
+                lastConsumed = event;
+                consumeCount++;
+                consumeSum = consumeSum + event;
+            });
+
+    streamEnded.wait();
+    donepromise.wait();
+
+    GTEST_ASSERT_EQ(10'000, consumeCount);
+    GTEST_ASSERT_EQ(49'995'000, consumeSum);
+}
+
+///
+/// forEach test
+///
+TEST_F(PushStreamTestSuite, ForEachTestBasicType_Buffered) {
+    int consumeCount{0};
+    int consumeSum{0};
+    int lastConsumed{-1};
+    auto ses = createEventSource<int>(0, 10'000, true);
+
+    auto stream = psp.createStream<int>(ses);
+    auto streamEnded = stream->
+            forEach([&](int event) {
+                GTEST_ASSERT_EQ(lastConsumed + 1, event);
+
+                lastConsumed = event;
+                consumeCount++;
+                consumeSum = consumeSum + event;
+            });
+
+    streamEnded.wait();
+    donepromise.wait();
+
+    GTEST_ASSERT_EQ(10'000, consumeCount);
+    GTEST_ASSERT_EQ(49'995'000, consumeSum);
+}
+
+TEST_F(PushStreamTestSuite, ForEachTestObjectType) {
+    int consumeCount{0};
+    int consumeSum{0};
+    auto ses = createEventSource<EventObject>(EventObject{2}, 10);
+
+    auto stream = psp.createUnbufferedStream<EventObject>(ses);
+    auto streamEnded = stream->
+            forEach([&](const EventObject& event) {
+                consumeCount++;
+                consumeSum = consumeSum + event;
+            });
+
+    streamEnded.wait();
+    donepromise.wait();
+
+    GTEST_ASSERT_EQ(10, consumeCount);
+    GTEST_ASSERT_EQ(20, consumeSum);
+}
+
+
+//Filter Test
+TEST_F(PushStreamTestSuite, FilterTestObjectType_true) {
+    int consumeCount{0};
+    int consumeSum{0};
+    auto ses = createEventSource<EventObject>(EventObject{2}, 10);
+
+    auto stream = psp.createUnbufferedStream<EventObject>(ses);
+    auto streamEnded = stream->
+            filter([](const EventObject& /*event*/) -> bool {
+               return true;

Review comment:
       Although this is following the spec, I really expected that a return 
`true` for a filter predicate would filter the events out not keep them in. So 
"no change", but just a remark

##########
File path: libs/pushstreams/api/celix/PushEvent.h
##########
@@ -0,0 +1,142 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#pragma once
+
+#include "celix/IllegalStateException.h"
+
+namespace celix {
+    template <typename T>
+    class PushEvent {
+    public:
+        virtual  ~PushEvent() = default;
+   
+        enum class EventType {
+            DATA,
+            ERROR,
+            CLOSE
+        };
+
+        explicit PushEvent(EventType _type);

Review comment:
       Can this ctor be moved to protected and thus not directly useable by 
users?

##########
File path: libs/pushstreams/api/celix/PushStreamProvider.h
##########
@@ -0,0 +1,98 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#pragma once
+
+#include "celix/SimplePushEventSource.h"
+#include "celix/SynchronousPushEventSource.h"
+#include "celix/IPushEventSource.h"
+#include "celix/impl/StreamPushEventConsumer.h"
+#include "celix/PushStream.h"
+
+namespace celix {
+
+    class PushStreamProvider final {
+    public:
+        PushStreamProvider();

Review comment:
       IMO the ctor should required with a PromiseFactory as first argument 
with maybe a default value:
   `PushStreamProvider(std::shared_ptr<celix::PromiseFactory> promiseFactory = 
std::make_shared<celix::PromiseFactory>())`  
   
   And get the executor from the promiseFactory 
(`promiseFactory->getExecutor();`).
   
   That way it is possible to create a PromiseFactory and PushStreamProvider 
where they shared the executor.

##########
File path: libs/pushstreams/api/celix/PushStream.h
##########
@@ -0,0 +1,253 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#pragma once
+
+#include <optional>
+#include <iostream>
+#include <queue>
+
+#include "celix/impl/PushEventConsumer.h"
+#include "celix/IAutoCloseable.h"
+
+#include "celix/Promise.h"
+#include "celix/PromiseFactory.h"
+#include "celix/Deferred.h"
+
+namespace celix {
+    template<typename T>
+    class PushStream: public IAutoCloseable {
+    public:
+        using PredicateFunction = std::function<bool(const T&)>;
+        using CloseFunction = std::function<void(void)>;
+        using ErrorFunction = std::function<void(void)>;
+        using ForEachFunction = std::function<void(const T&)>;
+
+        explicit PushStream(PromiseFactory& promiseFactory);
+
+        Promise<void> forEach(ForEachFunction func);
+
+        PushStream<T>& filter(PredicateFunction predicate);
+
+        template<typename R>
+        PushStream<R>& map(std::function<R(const T&)>);
+
+        std::vector<std::shared_ptr<PushStream<T>>> 
split(std::vector<PredicateFunction> predicates);
+
+        PushStream<T>& onClose(CloseFunction closeFunction);
+
+        PushStream<T>& onError(ErrorFunction errorFunction);
+
+        void close() override;
+
+    protected:
+        enum class State {
+            BUILDING,
+            STARTED,
+            CLOSED
+        };
+
+        virtual bool begin() = 0;
+        virtual void upstreamClose(const PushEvent<T>& event) = 0;
+        virtual long handleEvent(const PushEvent<T>& event);
+
+        void close(const PushEvent<T>& event, bool sendDownStreamEvent);
+        bool internal_close(const PushEvent<T>& event, bool 
sendDownStreamEvent);
+
+        bool compareAndSetState(State expectedValue, State newValue);
+
+        State getAndSetState(State newValue);
+
+        std::mutex mutex {};
+        PromiseFactory& promiseFactory;
+        PushEventConsumer<T> nextEvent{};
+        ErrorFunction onErrorCallback{};
+        CloseFunction onCloseCallback{};
+        State closed {State::BUILDING};
+    private:
+        Deferred<void> streamEnd{promiseFactory.deferred<void>()};
+
+        template<typename, typename> friend class IntermediatePushStream;
+        template<typename> friend class UnbufferedPushStream;
+        template<typename> friend class PushStream;
+        template<typename> friend class StreamPushEventConsumer;
+    };
+}
+
+/*********************************************************************************
+ Implementation
+*********************************************************************************/
+
+#include "celix/impl/IntermediatePushStream.h"
+#include "celix/impl/UnbufferedPushStream.h"
+#include "celix/impl/BufferedPushStream.h"
+
+template<typename T>
+celix::PushStream<T>::PushStream(PromiseFactory& _promiseFactory) : 
promiseFactory{_promiseFactory} {
+}
+
+template<typename T>
+long celix::PushStream<T>::handleEvent(const PushEvent<T>& event) {
+    if(closed != celix::PushStream<T>::State::CLOSED) {
+        return nextEvent.accept(event);
+    }
+    return IPushEventConsumer<T>::ABORT;
+}
+
+template<typename T>
+celix::Promise<void> celix::PushStream<T>::forEach(ForEachFunction func) {
+    nextEvent = PushEventConsumer<T>([func = std::move(func), this](const 
PushEvent<T>& event) -> long {
+        try {
+            switch(event.getType()) {
+                case celix::PushEvent<T>::EventType::DATA:
+                    func(event.getData());
+                    return IPushEventConsumer<T>::CONTINUE;
+                case celix::PushEvent<T>::EventType::CLOSE:
+                    streamEnd.resolve();
+                    break;
+                case celix::PushEvent<T>::EventType::ERROR:
+                    streamEnd.fail(event.getFailure());
+                    break;
+            }
+            close(event, false);
+            return IPushEventConsumer<T>::ABORT;
+        } catch (const std::exception& e) {
+            auto errorEvent = ErrorPushEvent<T>(std::current_exception());
+            streamEnd.fail(errorEvent.getFailure());
+            close(errorEvent, false);
+            return IPushEventConsumer<T>::ABORT;
+        }
+    });
+
+    begin();
+    return streamEnd.getPromise();           
+}
+
+template<typename T>
+celix::PushStream<T>& celix::PushStream<T>::filter(PredicateFunction 
predicate) {
+    auto downstream = 
std::make_shared<celix::IntermediatePushStream<T>>(promiseFactory, *this);
+    nextEvent = PushEventConsumer<T>([downstream = downstream, predicate = 
std::move(predicate)](const PushEvent<T>& event) -> long {
+        if (event.getType() != celix::PushEvent<T>::EventType::DATA || 
predicate(event.getData())) {
+            downstream->handleEvent(event);
+        }
+        return IPushEventConsumer<T>::CONTINUE;
+    });
+
+    return *downstream;
+}
+
+template<typename T>
+std::vector<std::shared_ptr<celix::PushStream<T>>> 
celix::PushStream<T>::split(std::vector<PredicateFunction> predicates) {
+
+    std::vector<std::shared_ptr<PushStream<T>>> result{};
+    for(long unsigned int i = 0; i < predicates.size(); i++) {
+        
result.push_back(std::make_shared<celix::IntermediatePushStream<T>>(promiseFactory,
 *this));
+    }
+
+    nextEvent = PushEventConsumer<T>([result = result, predicates = 
std::move(predicates)](const PushEvent<T>& event) -> long {
+        for(long unsigned int i = 0; i < predicates.size(); i++) {
+            if (event.getType() != celix::PushEvent<T>::EventType::DATA || 
predicates[i](event.getData())) {
+                result[i]->handleEvent(event);
+            }
+        }
+
+        return IPushEventConsumer<T>::CONTINUE;
+    });
+
+    return result;
+}
+
+template<typename T>
+template<typename R>
+celix::PushStream<R>& celix::PushStream<T>::map(std::function<R(const T&)> 
mapper) {
+    auto downstream = std::make_shared<celix::IntermediatePushStream<R, 
T>>(promiseFactory, *this);
+
+    nextEvent = PushEventConsumer<T>([downstream = downstream, mapper = 
std::move(mapper)](const PushEvent<T>& event) -> long {
+        if (event.getType() == celix::PushEvent<T>::EventType::DATA) {
+            downstream->handleEvent(DataPushEvent<R>(mapper(event.getData())));
+        } else {
+            downstream->handleEvent(celix::ClosePushEvent<R>());

Review comment:
       now downstream get a close event even if the upstream event is a close 
or **error**

##########
File path: libs/pushstreams/api/celix/impl/AbstractPushEventSource.h
##########
@@ -0,0 +1,132 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#pragma once
+
+#include <iostream>
+#include <set>
+
+#include "celix/IPushEventSource.h"
+#include "celix/IAutoCloseable.h"
+
+#include "celix/IllegalStateException.h"
+#include "celix/PromiseFactory.h"
+#include "celix/Promise.h"
+#include "celix/DefaultExecutor.h"
+#include "celix/PushEvent.h"
+
+namespace celix {
+    template <typename T>
+    class AbstractPushEventSource: public IPushEventSource<T> {
+    public:
+        explicit AbstractPushEventSource(PromiseFactory& _promiseFactory);
+
+        void publish(const T& event);
+
+        [[nodiscard]] celix::Promise<void> connectPromise();

Review comment:
       can this be `getConnectPromise();`

##########
File path: libs/pushstreams/api/celix/PushStream.h
##########
@@ -0,0 +1,253 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#pragma once
+
+#include <optional>
+#include <iostream>
+#include <queue>
+
+#include "celix/impl/PushEventConsumer.h"
+#include "celix/IAutoCloseable.h"
+
+#include "celix/Promise.h"
+#include "celix/PromiseFactory.h"
+#include "celix/Deferred.h"
+
+namespace celix {
+    template<typename T>
+    class PushStream: public IAutoCloseable {
+    public:
+        using PredicateFunction = std::function<bool(const T&)>;
+        using CloseFunction = std::function<void(void)>;
+        using ErrorFunction = std::function<void(void)>;
+        using ForEachFunction = std::function<void(const T&)>;
+
+        explicit PushStream(PromiseFactory& promiseFactory);
+
+        Promise<void> forEach(ForEachFunction func);
+
+        PushStream<T>& filter(PredicateFunction predicate);
+
+        template<typename R>
+        PushStream<R>& map(std::function<R(const T&)>);
+
+        std::vector<std::shared_ptr<PushStream<T>>> 
split(std::vector<PredicateFunction> predicates);
+
+        PushStream<T>& onClose(CloseFunction closeFunction);
+
+        PushStream<T>& onError(ErrorFunction errorFunction);
+
+        void close() override;
+
+    protected:
+        enum class State {
+            BUILDING,
+            STARTED,
+            CLOSED
+        };
+
+        virtual bool begin() = 0;
+        virtual void upstreamClose(const PushEvent<T>& event) = 0;
+        virtual long handleEvent(const PushEvent<T>& event);
+
+        void close(const PushEvent<T>& event, bool sendDownStreamEvent);
+        bool internal_close(const PushEvent<T>& event, bool 
sendDownStreamEvent);
+
+        bool compareAndSetState(State expectedValue, State newValue);
+
+        State getAndSetState(State newValue);
+
+        std::mutex mutex {};
+        PromiseFactory& promiseFactory;
+        PushEventConsumer<T> nextEvent{};
+        ErrorFunction onErrorCallback{};
+        CloseFunction onCloseCallback{};
+        State closed {State::BUILDING};
+    private:
+        Deferred<void> streamEnd{promiseFactory.deferred<void>()};
+
+        template<typename, typename> friend class IntermediatePushStream;
+        template<typename> friend class UnbufferedPushStream;
+        template<typename> friend class PushStream;
+        template<typename> friend class StreamPushEventConsumer;
+    };
+}
+
+/*********************************************************************************
+ Implementation
+*********************************************************************************/
+
+#include "celix/impl/IntermediatePushStream.h"
+#include "celix/impl/UnbufferedPushStream.h"
+#include "celix/impl/BufferedPushStream.h"
+
+template<typename T>
+celix::PushStream<T>::PushStream(PromiseFactory& _promiseFactory) : 
promiseFactory{_promiseFactory} {
+}
+
+template<typename T>
+long celix::PushStream<T>::handleEvent(const PushEvent<T>& event) {
+    if(closed != celix::PushStream<T>::State::CLOSED) {
+        return nextEvent.accept(event);
+    }
+    return IPushEventConsumer<T>::ABORT;
+}
+
+template<typename T>
+celix::Promise<void> celix::PushStream<T>::forEach(ForEachFunction func) {
+    nextEvent = PushEventConsumer<T>([func = std::move(func), this](const 
PushEvent<T>& event) -> long {
+        try {
+            switch(event.getType()) {
+                case celix::PushEvent<T>::EventType::DATA:
+                    func(event.getData());
+                    return IPushEventConsumer<T>::CONTINUE;
+                case celix::PushEvent<T>::EventType::CLOSE:
+                    streamEnd.resolve();
+                    break;
+                case celix::PushEvent<T>::EventType::ERROR:
+                    streamEnd.fail(event.getFailure());
+                    break;
+            }
+            close(event, false);
+            return IPushEventConsumer<T>::ABORT;

Review comment:
       Is it correct that ::ABORT is returned with a EventType::ERROR and 
EventType::CLOSE is recieved?

##########
File path: libs/pushstreams/docs/.gitignore
##########
@@ -0,0 +1 @@
+*.png

Review comment:
       Add ignore to top level .gitignore

##########
File path: libs/pushstreams/api/celix/impl/AbstractPushEventSource.h
##########
@@ -0,0 +1,132 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#pragma once
+
+#include <iostream>
+#include <set>
+
+#include "celix/IPushEventSource.h"
+#include "celix/IAutoCloseable.h"
+
+#include "celix/IllegalStateException.h"
+#include "celix/PromiseFactory.h"
+#include "celix/Promise.h"
+#include "celix/DefaultExecutor.h"
+#include "celix/PushEvent.h"
+
+namespace celix {
+    template <typename T>
+    class AbstractPushEventSource: public IPushEventSource<T> {
+    public:
+        explicit AbstractPushEventSource(PromiseFactory& _promiseFactory);
+
+        void publish(const T& event);
+
+        [[nodiscard]] celix::Promise<void> connectPromise();
+
+        void open(std::shared_ptr<IPushEventConsumer<T>> _eventConsumer) 
override;
+
+        bool isConnected();
+
+        void close() override;
+    protected:
+        virtual void execute(std::function<void()> task) = 0;
+
+        PromiseFactory& promiseFactory;
+
+    private:
+        std::mutex mutex {};
+        bool closed{false};
+        Deferred<void> connected;
+        std::vector<std::shared_ptr<IPushEventConsumer<T>>> eventConsumers {};
+    };
+}
+
+/*********************************************************************************
+ Implementation
+*********************************************************************************/
+
+template <typename T>
+celix::AbstractPushEventSource<T>::AbstractPushEventSource(PromiseFactory& 
_promiseFactory):
+    promiseFactory{_promiseFactory},
+    connected{promiseFactory.deferred<void>()}  {
+}
+
+template <typename T>
+void 
celix::AbstractPushEventSource<T>::open(std::shared_ptr<celix::IPushEventConsumer<T>>
 _eventConsumer) {
+    std::lock_guard lck{mutex};
+    if (closed) {
+        _eventConsumer->accept(celix::ClosePushEvent<T>());
+    } else {
+        eventConsumers.push_back(_eventConsumer);
+        connected.resolve();

Review comment:
       I am note sure if resolving a connected promise and directly creating a 
new one is desirable. Why is this needed?
   
   

##########
File path: libs/pushstreams/api/celix/PushStream.h
##########
@@ -0,0 +1,253 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#pragma once
+
+#include <optional>
+#include <iostream>
+#include <queue>
+
+#include "celix/impl/PushEventConsumer.h"
+#include "celix/IAutoCloseable.h"
+
+#include "celix/Promise.h"
+#include "celix/PromiseFactory.h"
+#include "celix/Deferred.h"
+
+namespace celix {
+    template<typename T>
+    class PushStream: public IAutoCloseable {
+    public:
+        using PredicateFunction = std::function<bool(const T&)>;
+        using CloseFunction = std::function<void(void)>;
+        using ErrorFunction = std::function<void(void)>;
+        using ForEachFunction = std::function<void(const T&)>;
+
+        explicit PushStream(PromiseFactory& promiseFactory);
+
+        Promise<void> forEach(ForEachFunction func);
+
+        PushStream<T>& filter(PredicateFunction predicate);
+
+        template<typename R>
+        PushStream<R>& map(std::function<R(const T&)>);
+
+        std::vector<std::shared_ptr<PushStream<T>>> 
split(std::vector<PredicateFunction> predicates);
+
+        PushStream<T>& onClose(CloseFunction closeFunction);
+
+        PushStream<T>& onError(ErrorFunction errorFunction);
+
+        void close() override;
+
+    protected:
+        enum class State {
+            BUILDING,
+            STARTED,
+            CLOSED
+        };
+
+        virtual bool begin() = 0;
+        virtual void upstreamClose(const PushEvent<T>& event) = 0;
+        virtual long handleEvent(const PushEvent<T>& event);
+
+        void close(const PushEvent<T>& event, bool sendDownStreamEvent);
+        bool internal_close(const PushEvent<T>& event, bool 
sendDownStreamEvent);
+
+        bool compareAndSetState(State expectedValue, State newValue);
+
+        State getAndSetState(State newValue);
+
+        std::mutex mutex {};
+        PromiseFactory& promiseFactory;
+        PushEventConsumer<T> nextEvent{};
+        ErrorFunction onErrorCallback{};
+        CloseFunction onCloseCallback{};
+        State closed {State::BUILDING};

Review comment:
       state is called closed, but it can have the values BUILDING, STARTED and 
CLOSED so maybe rename to `state` 
   
   Also it is not protected and as result I think the `compareAndSetState` and 
`getAndSetState` have race conditions.
   So maybe make this a `std::atomic<State>`
   
   

##########
File path: libs/pushstreams/api/celix/SimplePushEventSource.h
##########
@@ -0,0 +1,69 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#pragma once
+
+#include <queue>
+
+#include "celix/impl/AbstractPushEventSource.h"
+#include "celix/IAutoCloseable.h"
+
+#include "IllegalStateException.h"
+#include "celix/PromiseFactory.h"
+#include "celix/Promise.h"
+#include "celix/DefaultExecutor.h"
+#include "celix/PushEvent.h"
+
+namespace celix {
+    template <typename T>
+    class SimplePushEventSource: public AbstractPushEventSource<T> {
+    public:
+        explicit SimplePushEventSource(PromiseFactory& promiseFactory);
+
+    protected:
+        void execute(std::function<void()> task) override;
+
+    private:
+        std::shared_ptr<IExecutor> executor {};
+        std::queue<std::function<void()>> queue {};
+    };
+}
+
+/*********************************************************************************
+ Implementation
+*********************************************************************************/
+
+template <typename T>
+celix::SimplePushEventSource<T>::SimplePushEventSource(PromiseFactory& 
promiseFactory): AbstractPushEventSource<T>{promiseFactory},
+    executor{promiseFactory.getExecutor()} {
+
+//    executor->execute([]() {
+//        for(;;) {
+//
+//        }
+//    });
+}
+
+template <typename T>
+void celix::SimplePushEventSource<T>::execute(std::function<void()> task) {
+    task();

Review comment:
       Currently this is the same as the SynchronizedPushEventSource, I assume 
this will eventually use a queue and the executor?

##########
File path: libs/pushstreams/api/celix/IPushEventConsumer.h
##########
@@ -0,0 +1,35 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#pragma once
+
+#include "celix/PushEvent.h"
+
+namespace celix {
+    template <typename T>
+    class IPushEventConsumer {
+    public:
+        static constexpr int const& ABORT = -1;

Review comment:
       Can these types not be a enum?

##########
File path: libs/pushstreams/api/celix/impl/BufferedPushStream.h
##########
@@ -0,0 +1,85 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#pragma once
+
+#include "celix/IPushEventSource.h"
+
+namespace celix {
+
+    template<typename T>
+    class BufferedPushStream: public UnbufferedPushStream<T> {
+    public:
+        BufferedPushStream(PromiseFactory& _promiseFactory);
+
+    protected:
+        long handleEvent(const PushEvent<T>& event) override;
+
+    private:
+        void startWorker();
+        std::unique_ptr<PushEvent<T>> popQueue();
+
+        std::queue<std::unique_ptr<PushEvent<T>>> queue{};
+        std::mutex mutex{};
+        int nrWorkers{0};
+    };
+}
+
+/*********************************************************************************
+ Implementation
+*********************************************************************************/
+
+template<typename T>
+celix::BufferedPushStream<T>::BufferedPushStream(PromiseFactory& 
_promiseFactory) : celix::UnbufferedPushStream<T>(_promiseFactory) {
+}
+
+template<typename T>
+long celix::BufferedPushStream<T>::handleEvent(const PushEvent<T>& event) {
+    std::unique_lock lk(mutex);
+    queue.push(std::move(event.clone()));
+    if (nrWorkers == 0) startWorker();

Review comment:
       if without `{` and `}`

##########
File path: libs/pushstreams/gtest/src/PushStreamTestSuite.cc
##########
@@ -0,0 +1,695 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celix/PushStreamProvider.h"
+
+using celix::PushStreamProvider;
+
+class TestException : public std::exception {
+public:
+    explicit TestException(const char* what) : w{what} {}
+    explicit TestException(std::string what) : w{std::move(what)} {}
+
+    TestException(const TestException&) = delete;
+    TestException(TestException&&) noexcept = default;
+
+    TestException& operator=(const TestException&) = delete;
+    TestException& operator=(TestException&&) noexcept = default;
+
+    [[nodiscard]] const char* what() const noexcept override { return 
w.c_str(); }
+private:
+    std::string w;
+};
+
+class EventObject {
+public:
+    EventObject() : val{0} {
+    }
+
+    explicit EventObject(int _val) : val{_val} {
+    }
+
+    EventObject(const EventObject& _val) = default;
+    EventObject& operator=(const EventObject& other) = default;
+
+    EventObject& operator=(int other) {
+        val = other;
+        return *this;
+    };
+
+    friend EventObject operator+(const EventObject &eo1, const EventObject  
&eo2) {
+        return EventObject{eo1.val + eo2.val};
+    }
+    friend int operator+(const int &eo1, const EventObject  &eo2) {
+        return eo1 + eo2.val;
+    }
+    friend int operator+(const EventObject &eo1, const int  &eo2) {
+        return eo1.val + eo2;
+    }
+    int val;
+};
+
+class PushStreamTestSuite : public ::testing::Test {
+public:
+    ~PushStreamTestSuite() noexcept override = default;
+
+    PushStreamProvider psp {};
+    std::unique_ptr<std::thread> t{};
+
+    std::shared_ptr<celix::IExecutor> executor 
{std::make_shared<celix::DefaultExecutor>()};
+    celix::PromiseFactory promiseFactory {executor};
+    celix::Deferred<void> done{promiseFactory.deferred<void>()};
+    celix::Promise<void> donepromise = done.getPromise();
+
+    template <typename T>
+    std::shared_ptr<celix::SynchronousPushEventSource<T>> createEventSource(T 
event, int publishCount, bool autoinc = false) {

Review comment:
       Nice event stream to do some testing :+1: 

##########
File path: libs/pushstreams/api/celix/impl/AbstractPushEventSource.h
##########
@@ -0,0 +1,132 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#pragma once
+
+#include <iostream>
+#include <set>
+
+#include "celix/IPushEventSource.h"
+#include "celix/IAutoCloseable.h"
+
+#include "celix/IllegalStateException.h"
+#include "celix/PromiseFactory.h"
+#include "celix/Promise.h"
+#include "celix/DefaultExecutor.h"
+#include "celix/PushEvent.h"
+
+namespace celix {
+    template <typename T>
+    class AbstractPushEventSource: public IPushEventSource<T> {
+    public:
+        explicit AbstractPushEventSource(PromiseFactory& _promiseFactory);
+
+        void publish(const T& event);
+
+        [[nodiscard]] celix::Promise<void> connectPromise();

Review comment:
       Also I could not find the connect promise in the OSGI spec, did I miss 
something?

##########
File path: libs/pushstreams/gtest/src/PushStreamTestSuite.cc
##########
@@ -0,0 +1,695 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celix/PushStreamProvider.h"
+
+using celix::PushStreamProvider;
+
+class TestException : public std::exception {
+public:
+    explicit TestException(const char* what) : w{what} {}
+    explicit TestException(std::string what) : w{std::move(what)} {}
+
+    TestException(const TestException&) = delete;
+    TestException(TestException&&) noexcept = default;
+
+    TestException& operator=(const TestException&) = delete;
+    TestException& operator=(TestException&&) noexcept = default;
+
+    [[nodiscard]] const char* what() const noexcept override { return 
w.c_str(); }
+private:
+    std::string w;
+};
+
+class EventObject {
+public:
+    EventObject() : val{0} {
+    }
+
+    explicit EventObject(int _val) : val{_val} {
+    }
+
+    EventObject(const EventObject& _val) = default;
+    EventObject& operator=(const EventObject& other) = default;
+
+    EventObject& operator=(int other) {
+        val = other;
+        return *this;
+    };
+
+    friend EventObject operator+(const EventObject &eo1, const EventObject  
&eo2) {
+        return EventObject{eo1.val + eo2.val};
+    }
+    friend int operator+(const int &eo1, const EventObject  &eo2) {
+        return eo1 + eo2.val;
+    }
+    friend int operator+(const EventObject &eo1, const int  &eo2) {
+        return eo1.val + eo2;
+    }
+    int val;
+};
+
+class PushStreamTestSuite : public ::testing::Test {
+public:
+    ~PushStreamTestSuite() noexcept override = default;
+
+    PushStreamProvider psp {};
+    std::unique_ptr<std::thread> t{};
+
+    std::shared_ptr<celix::IExecutor> executor 
{std::make_shared<celix::DefaultExecutor>()};
+    celix::PromiseFactory promiseFactory {executor};
+    celix::Deferred<void> done{promiseFactory.deferred<void>()};
+    celix::Promise<void> donepromise = done.getPromise();
+
+    template <typename T>
+    std::shared_ptr<celix::SynchronousPushEventSource<T>> createEventSource(T 
event, int publishCount, bool autoinc = false) {
+        auto ses = psp.template createSynchronousEventSource<T>();
+
+        auto successLambda = [this, ses, event, publishCount, 
autoinc](celix::Promise<void> p) -> celix::Promise<void> {
+            t = std::make_unique<std::thread>([&, event, publishCount, 
autoinc]() {
+                int counter = 0;
+                T data {event};
+                // Keep going as long as someone is listening
+                while (counter < publishCount) {
+                    ses->publish(data);
+                    if (autoinc) {
+                        data = data + 1;
+                    }
+                    counter++;
+                }
+            });
+
+            t->join();
+            ses->close();
+            done.resolve();
+            return p;
+        };
+
+        auto x = ses->connectPromise().template then<void>(successLambda);
+
+        return ses;
+    }
+};
+
+TEST_F(PushStreamTestSuite, EventSourceCloseTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+
+    auto ses = psp.template createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().then<void>(successLambda);
+    auto stream = psp.createUnbufferedStream<int>(ses);
+
+    auto streamEnded = stream->onClose([&](){
+        onClosedReceived++;
+    }).onError([&](){
+        onErrorReceived++;
+    }).forEach([&](int /*event*/) { });
+
+    ses->close();
+
+    streamEnded.wait();
+
+    ASSERT_EQ(1, onClosedReceived);
+    ASSERT_EQ(0, onErrorReceived);
+}
+
+TEST_F(PushStreamTestSuite, ChainedEventSourceCloseTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+
+    auto ses = psp.template createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().then<void>(successLambda);
+
+    auto stream = psp.createUnbufferedStream<int>(ses);
+    auto& filteredStream = stream->filter([](const int& /*event*/) -> bool {
+            return true;
+        }).onClose([&](){
+            onClosedReceived++;
+        }).onError([&](){
+            onErrorReceived++;
+        });
+
+    auto streamEnded = filteredStream.forEach([&](int /*event*/) { });
+
+    ses->close();
+
+    streamEnded.wait();
+
+    ASSERT_EQ(1, onClosedReceived);
+    ASSERT_EQ(0, onErrorReceived);
+}
+
+
+TEST_F(PushStreamTestSuite, StreamCloseTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+
+    auto ses = psp.createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().then<void>(successLambda);
+
+    auto stream = psp.createUnbufferedStream<int>(ses);
+    auto streamEnded = stream->onClose([&](){
+        onClosedReceived++;
+    }).onError([&](){
+        onErrorReceived++;
+    }).forEach([&](int /*event*/) { });
+
+    stream->close();
+
+    streamEnded.wait();
+
+    ASSERT_EQ(1, onClosedReceived);
+    ASSERT_EQ(0, onErrorReceived);
+}
+
+TEST_F(PushStreamTestSuite, PublishAfterStreamCloseTest) {
+    int onClosedReceived{0};
+    int onErrorReceived{0};
+    int onEventReceived{0};
+
+    auto ses = psp.createSynchronousEventSource<int>();
+
+    auto successLambda = [](celix::Promise<void> p) -> celix::Promise<void> {
+        return p;
+    };
+    auto x = ses->connectPromise().then<void>(successLambda);

Review comment:
       What does the `successLambda` and `x` do?
   
   For this and the rest of the test cases

##########
File path: libs/pushstreams/api/celix/impl/BufferedPushStream.h
##########
@@ -0,0 +1,85 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#pragma once
+
+#include "celix/IPushEventSource.h"
+
+namespace celix {
+
+    template<typename T>
+    class BufferedPushStream: public UnbufferedPushStream<T> {
+    public:
+        BufferedPushStream(PromiseFactory& _promiseFactory);
+
+    protected:
+        long handleEvent(const PushEvent<T>& event) override;
+
+    private:
+        void startWorker();
+        std::unique_ptr<PushEvent<T>> popQueue();
+
+        std::queue<std::unique_ptr<PushEvent<T>>> queue{};
+        std::mutex mutex{};
+        int nrWorkers{0};
+    };
+}
+
+/*********************************************************************************
+ Implementation
+*********************************************************************************/
+
+template<typename T>
+celix::BufferedPushStream<T>::BufferedPushStream(PromiseFactory& 
_promiseFactory) : celix::UnbufferedPushStream<T>(_promiseFactory) {
+}
+
+template<typename T>
+long celix::BufferedPushStream<T>::handleEvent(const PushEvent<T>& event) {
+    std::unique_lock lk(mutex);
+    queue.push(std::move(event.clone()));
+    if (nrWorkers == 0) startWorker();

Review comment:
       This function lock the mutex and calls `startWorker()`, which calls 
`popQueue()` which (tries) to lock the mutex




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@celix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to