This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/promise_with_executor_abstraction in repository https://gitbox.apache.org/repos/asf/celix.git
commit 87fe188bcadd9f536200b19f5bcfff67b2bb0e6b Author: Pepijn Noltes <[email protected]> AuthorDate: Tue Jan 26 16:57:19 2021 +0100 Add initial executor abstraction to promise and implementation of a simple executor Also adds a initial IScheduledExecutor interface. --- misc/experimental/promise/CMakeLists.txt | 9 +- misc/experimental/promise/README.md | 32 +- .../promise/api/celix/DefaultExecutor.h | 70 ++++ misc/experimental/promise/api/celix/Deferred.h | 43 +-- misc/experimental/promise/api/celix/IExecutor.h | 53 +++ .../promise/api/celix/IScheduledExecutor.h | 127 +++++++ misc/experimental/promise/api/celix/Promise.h | 4 +- .../promise/api/celix/PromiseFactory.h | 86 ++++- .../promise/api/celix/RejectedExecutionException.h | 32 ++ .../promise/api/celix/impl/SharedPromiseState.h | 347 ++++++++++------- misc/experimental/promise/cmake/FindTBB.cmake | 418 --------------------- .../promise/gtest/src/PromiseTestSuite.cc | 194 ++++++---- .../promise/gtest/src/VoidPromiseTestSuite.cc | 110 ++++-- misc/experimental/promise/src/PromiseExamples.cc | 57 ++- 14 files changed, 816 insertions(+), 766 deletions(-) diff --git a/misc/experimental/promise/CMakeLists.txt b/misc/experimental/promise/CMakeLists.txt index 2d547a4..6d31677 100644 --- a/misc/experimental/promise/CMakeLists.txt +++ b/misc/experimental/promise/CMakeLists.txt @@ -37,13 +37,6 @@ else () endif () if (PROMISE OR PROMISE_STANDALONE) - - find_package(TBB QUIET) - if (NOT TBB_FOUND) - #NOTE: TBB does not yet deliver a TBBConfig.cmake on Ubuntu 18, using a FindTBB.cmake file - set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH};${CMAKE_CURRENT_SOURCE_DIR}/cmake") - endif () - find_package(TBB REQUIRED) find_package(Threads) add_library(Promise INTERFACE) @@ -51,7 +44,7 @@ if (PROMISE OR PROMISE_STANDALONE) $<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/api> $<INSTALL_INTERFACE:include/celix/promise> ) - target_link_libraries(Promise INTERFACE TBB::tbb Threads::Threads) + target_link_libraries(Promise INTERFACE Threads::Threads) target_compile_options(Promise INTERFACE -frtti -std=c++17) #Note -frtti needed for TBB add_library(Celix::Promise ALIAS Promise) diff --git a/misc/experimental/promise/README.md b/misc/experimental/promise/README.md index 97c2ae8..c8973c4 100644 --- a/misc/experimental/promise/README.md +++ b/misc/experimental/promise/README.md @@ -24,8 +24,9 @@ NOTE: this implementation is still experiment and the api and behaviour will pro * A simple example of a promise. * Note this is not an ideal use of a promise. */ -celix::Promise<Integer> foo(int n) { - auto deferred = new Deferred<int>(); +celix::Promise<int> foo(int n) { + static celix::PromiseFactory factory{} + celix::Deferred<int> deferred = factory.deferred<int>(); if (n > 10) { deferred.resolve(n); @@ -56,7 +57,8 @@ static long calc_fib(long n) { * A more complex example where a heavy work load is done on a separate thread. */ celix::Promise<long> fib(long n) { - auto deferred = celix::Deferred<long>{}; + static celix::PromiseFactory factory{} + auto deferred = factory.deferred<long>{}; if (n <= 0) { deferred.fail(std::logic_error{"argument must be positive"}); @@ -96,13 +98,23 @@ void processPayload(celix::Promise<std::shared_ptr<RestApi::Payload>> promise) { ## Open Issues & TODOs -- TODO: refactors use of std::function as function arguments to templates. -- Currently the Promises implementation uses the Intel Threading Building Block (TBB) library (apache license 2.0) for its async communication. -It is not yet clear whether the TBB library is the correct library to use and if the library is used correctly at all. -- There is no solution chosen yet for scheduling task, like the ScheduledExecutorService used in Java. -- It also unclear if the "out of scope" handling of Promises and Deferred is good enough. As it is implemented now, - unresolved promises can be kept in memory if they also have a (direct or indirect) reference to it self. - If promises are resolved (successfully or not) they will destruct correctly. +### Circular References in the execution model + +There is a issue with circular references of shared_ptr between the executor and +SharedPromiseState. SharedPromiseState has a shared_ptr to the executor to delegate +execution of chains (onResolve, onSuccess, etc) and the executor accept +SharedPromiseState - in the form of tasks - to really execute those chains. +This will lead to the situation where the process exists, but the destructor of +the executor any some SharedPromiseState are not called leading to mem leaks. + +<b>TODO Discuss</b>: One way to solve this is that the SharedPromiseState has a weak_ptr to the +executor and only delegates chain execution if the executor still exists. +If the executor is gone the chains will not be executor resulting into a error situation. + +This mean that the end user is responsible to keep the executor lifecycle longer than the +PromiseFactory users. + +### Other issues - PromiseFactory is not complete yet - The static helper class Promises is not implemented yet (e.g. all/any) - Promise::flatMap not implemented yet \ No newline at end of file diff --git a/misc/experimental/promise/api/celix/DefaultExecutor.h b/misc/experimental/promise/api/celix/DefaultExecutor.h new file mode 100644 index 0000000..60626a7 --- /dev/null +++ b/misc/experimental/promise/api/celix/DefaultExecutor.h @@ -0,0 +1,70 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#pragma once + +#include <future> +#include <random> + +#include "celix/IExecutor.h" + +namespace celix { + + /** + * Simple default executor which uses std::async to run tasks. + * Does not support priority argument. + */ + class DefaultExecutor : public celix::IExecutor { + public: + explicit DefaultExecutor(std::launch _policy = std::launch::async | std::launch::deferred) : policy{_policy} {} + + void execute(std::function<void()> task, int /*priority*/) override { + std::lock_guard<std::mutex> lck{mutex}; + futures.emplace_back(std::async(policy, std::move(task))); + removeCompletedFutures(); + } + + void wait() override { + bool done = false; + while (!done) { + std::lock_guard<std::mutex> lck{mutex}; + removeCompletedFutures(); + done = futures.empty(); + } + } + private: + void removeCompletedFutures() { + //note should be called while mutex is lock. + auto it = futures.begin(); + while (it != futures.end()) { + auto status = it->wait_for(std::chrono::seconds{0}); + if (status == std::future_status::ready) { + //remove future + it = futures.erase(it); + } else { + ++it; + } + } + } + + const std::launch policy; + std::mutex mutex{}; //protect futures. + std::vector<std::future<void>> futures{}; + }; +} \ No newline at end of file diff --git a/misc/experimental/promise/api/celix/Deferred.h b/misc/experimental/promise/api/celix/Deferred.h index 711620d..7dfb0c3 100644 --- a/misc/experimental/promise/api/celix/Deferred.h +++ b/misc/experimental/promise/api/celix/Deferred.h @@ -24,21 +24,7 @@ #include "celix/impl/SharedPromiseState.h" #include "celix/Promise.h" - -#include <tbb/task.h> -#include <tbb/task_group.h> -#include <tbb/task_scheduler_observer.h> -#if __has_include(<tbb/global_control.h>) - -#if TBB_INTERFACE_VERSION_MAJOR < 12 -#define TBB_PREVIEW_GLOBAL_CONTROL 1 -#endif - -#include <tbb/global_control.h> -#else -// deprecated in newer versions of TBB -#include <tbb/task_scheduler_init.h> -#endif +#include "celix/DefaultExecutor.h" namespace celix { @@ -64,12 +50,8 @@ namespace celix { public: using type = T; - Deferred(); - explicit Deferred(std::shared_ptr<celix::impl::SharedPromiseState<T>> state); - //TODO deferred ctor with factory - /** * Fail the Promise associated with this Deferred. * <p/> @@ -156,8 +138,6 @@ namespace celix { public: using type = void; - Deferred(); - explicit Deferred(std::shared_ptr<celix::impl::SharedPromiseState<void>> state); //TODO deferred ctor with factory @@ -228,17 +208,12 @@ namespace celix { *********************************************************************************/ template<typename T> -inline celix::Deferred<T>::Deferred() : state{std::make_shared<celix::impl::SharedPromiseState<T>>()} {} - -inline celix::Deferred<void>::Deferred() : state{std::make_shared<celix::impl::SharedPromiseState<void>>()} {} - -template<typename T> -inline celix::Deferred<T>::Deferred(std::shared_ptr<celix::impl::SharedPromiseState<T>> _state) : state{std::move(_state)} {} +celix::Deferred<T>::Deferred(std::shared_ptr<celix::impl::SharedPromiseState<T>> _state) : state{std::move(_state)} {} inline celix::Deferred<void>::Deferred(std::shared_ptr<celix::impl::SharedPromiseState<void>> _state) : state{std::move(_state)} {} template<typename T> -inline void celix::Deferred<T>::fail(std::exception_ptr failure) { +void celix::Deferred<T>::fail(std::exception_ptr failure) { state->fail(std::move(failure)); } @@ -247,7 +222,7 @@ inline void celix::Deferred<void>::fail(std::exception_ptr failure) { } template<typename T> -inline void celix::Deferred<T>::fail(const std::exception& failure) { +void celix::Deferred<T>::fail(const std::exception& failure) { state->fail(failure); } @@ -256,7 +231,7 @@ inline void celix::Deferred<void>::fail(const std::exception& failure) { } template<typename T> -inline celix::Promise<T> celix::Deferred<T>::getPromise() { +celix::Promise<T> celix::Deferred<T>::getPromise() { return celix::Promise<T>{state}; } @@ -266,7 +241,7 @@ inline celix::Promise<void> celix::Deferred<void>::getPromise() { template<typename T> template<typename U> -inline void celix::Deferred<T>::resolveWith(celix::Promise<U> with) { +void celix::Deferred<T>::resolveWith(celix::Promise<U> with) { with.onResolve([s = state, with] () mutable { if (with.isSuccessfullyResolved()) { s->resolve(with.moveOrGetValue()); @@ -277,7 +252,7 @@ inline void celix::Deferred<T>::resolveWith(celix::Promise<U> with) { } inline void celix::Deferred<void>::resolveWith(celix::Promise<void> with) { - with.onResolve([s = state, with]{ + with.onResolve([s = state, with] { if (with.isSuccessfullyResolved()) { with.getValue(); s->resolve(); @@ -288,12 +263,12 @@ inline void celix::Deferred<void>::resolveWith(celix::Promise<void> with) { } template<typename T> -inline void celix::Deferred<T>::resolve(T&& value) { +void celix::Deferred<T>::resolve(T&& value) { state->resolve(std::forward<T>(value)); } template<typename T> -inline void celix::Deferred<T>::resolve(const T& value) { +void celix::Deferred<T>::resolve(const T& value) { state->resolve(value); } diff --git a/misc/experimental/promise/api/celix/IExecutor.h b/misc/experimental/promise/api/celix/IExecutor.h new file mode 100644 index 0000000..b7f2cee --- /dev/null +++ b/misc/experimental/promise/api/celix/IExecutor.h @@ -0,0 +1,53 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#pragma once + +#include <functional> + +#include "celix/RejectedExecutionException.h" + +namespace celix { + + /** + * An object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission + * from the mechanics of how each task will be run, including details of thread use, scheduling, etc. + * + * Note that supporting the priority argument is optional. If not priority values are ignored. + */ + class IExecutor { + public: + virtual ~IExecutor() noexcept = default; + + /** + * Executes the given command at some time in the future. The command may execute in a new thread, + * in a pooled thread, or in the calling thread, at the discretion of the Executor implementation. + * + * @param command the "runnable" task + * @param priority the priority of the task. It depends on the executor implementation whether this is supported. + * @throws celix::RejectedExecutionException if this task cannot be accepted for execution. + */ + virtual void execute(std::function<void()> task, int priority = 0) = 0; + + /** + * Wait until the executor has no pending task left. + */ + virtual void wait() = 0; + }; +} \ No newline at end of file diff --git a/misc/experimental/promise/api/celix/IScheduledExecutor.h b/misc/experimental/promise/api/celix/IScheduledExecutor.h new file mode 100644 index 0000000..6526a50 --- /dev/null +++ b/misc/experimental/promise/api/celix/IScheduledExecutor.h @@ -0,0 +1,127 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#pragma once + +#include <functional> +#include <future> + +#include "celix/IExecutor.h" + +namespace celix { + + class enum ScheduledType { + OneShot, + FixedDelay, + FixedRate + }; + + class IScheduledFuture { + public: + virtual ~IScheduledFuture() noexcept = default; + + virtual ScheduledType getType() const = 0; + + virtual bool isCancelled() const = 0; + + virtual void cancel() = 0; + + template<typename Rep, typename Period> + std::chrono::duration<Rep, Period> getDelayOrPeriod() const { + auto milli = getDelayOrPeriodInMilli(); + return std::chrono::duration<Rep, Period>{milli}; + } + + private: + virtual std::chrono::duration<double, std::milli> getDelayOrPeriodInMilli() const = 0; + }; + + + + /** + * An ExecutorService that can schedule commands to run after a given delay, or to execute periodically. + * + * The schedule methods create tasks with various delays and return a task object that can be used to cancel or check execution. + * The scheduleAtFixedRate and scheduleWithFixedDelay methods create and execute tasks that run periodically until cancelled. + * Commands submitted using the celix::IExecutor::execute method are scheduled with a requested delay of zero. + * Zero and negative delays (but not periods) are also allowed in schedule methods, and are treated as requests for immediate execution. + * All schedule methods accept relative delays and periods as arguments, not absolute times or dates. + */ + class IScheduledExecutor : public celix::IExecutor { + public: + virtual ~IScheduledExecutor () noexcept = default; + + //TODO TBD schedule uses a executor, is a executor abstraction still needed or is a scheduled executor enough? + + /** + * Creates and executes a ScheduledFuture that becomes enabled after the given delay. + * @throws celix::RejectedExecutionException if this task cannot be accepted for execution. + */ + template<typename Rep, typename Period> + std::shared_ptr<celix::IScheduledFuture> schedule(std::function<void()> task, std::chrono::duration<Rep, Period> delay) { + std::chrono::duration<double, std::milli> delayInMilli = delay; + return scheduleInMilli(std::move(task), delayInMilli); + } + + /** + * Creates and executes a periodic action that becomes enabled first after the given initial delay, + * and subsequently with the given period; that is executions will commence after initialDelay then + * initialDelay+period, then initialDelay + 2 * period, and so on. + * If any execution of the task encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or termination of the executor. + * If any execution of this task takes longer than its period, + * then subsequent executions may start late, but will not concurrently execute. + * + * @param task the task to execute + * @param period the period between successive executions + * @param initialDelay the time to delay first execution + * @return a ScheduledFuture representing pending completion of the task. + */ + template<typename Rep1, typename Period1, typename Rep2 = double, typename Period2 = std::milli> + std::shared_ptr<celix::IScheduledFuture> scheduleAtFixedRate(std::function<void()> task, std::chrono::duration<std::chrono::duration<Rep1, Period1> period, std::chrono::duration<Rep2, Period2> initialDelay = 0) { + std::chrono::duration<double, std::milli> periodInMilli = period; + std::chrono::duration<double, std::milli> initialDelayInMilli = initialDelay; + return scheduleAtFixedRateInMilli(std::move(task), periodInMilli, initialDelay); + } + + /** + * Creates and executes a periodic action that becomes enabled first after the given initial delay, + * and subsequently with the given delay between the termination of one execution and the commencement of + * the next. If any execution of the task encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or termination of the executor. + * + * @param task the task to execute + * @param delay the delay between the termination of one execution and the commencement of the next + * @param initialDelay the time to delay first execution + * @return a ScheduledFuture representing pending completion of the task. + */ + template<typename Rep1, typename Period1, typename Rep2 = double, typename Period2 = std::milli> + std::shared_ptr<celix::IScheduledFuture> scheduleWithFixedDelay(std::function<void()> task, std::chrono::duration<Rep, Period> delay, std::chrono::duration<Rep2, Period2> initialDelay = 0) { + std::chrono::duration<double, std::milli> delayInMilli = delay; + std::chrono::duration<double, std::milli> initialDelayInMilli = initialDelay; + return scheduleWithFixedDelayInMilli(std::move(task), delayInMilli, initialDelay); + } + private: + virtual std::shared_ptr<celix::IScheduledFuture> scheduleInMilli(std::function<void()> task, std::chrono::duration<double, std::milli> delay) = 0; + + virtual std::shared_ptr<celix::IScheduledFuture> scheduleAtFixedRateInMilli(std::function<void()> task, std::chrono::duration<double, std::milli> period, std::chrono::duration<double, std::milli> initialDelay) = 0; + + virtual std::shared_ptr<celix::IScheduledFuture> scheduleWithFixedDelayInMilli(std::function<void()> task, std::chrono::duration<double, std::milli> delay, std::chrono::duration<double, std::milli> initialDelay) = 0; + }; +} \ No newline at end of file diff --git a/misc/experimental/promise/api/celix/Promise.h b/misc/experimental/promise/api/celix/Promise.h index dd8a501..7e6e65d 100644 --- a/misc/experimental/promise/api/celix/Promise.h +++ b/misc/experimental/promise/api/celix/Promise.h @@ -616,7 +616,7 @@ inline void celix::Promise<void>::wait() const { template<typename T> template<typename U> inline celix::Promise<U> celix::Promise<T>::then(std::function<celix::Promise<U>(celix::Promise<T>)> success, std::function<void(celix::Promise<T>)> failure) { - auto p = std::make_shared<celix::impl::SharedPromiseState<U>>(state->getExecutor()); + auto p = celix::impl::SharedPromiseState<U>::create(state->getExecutor(), state->getPriority()); auto chain = [s = state, p, success = std::move(success), failure = std::move(failure)]() { //chain is called when s is resolved @@ -641,7 +641,7 @@ inline celix::Promise<U> celix::Promise<T>::then(std::function<celix::Promise<U> template<typename U> inline celix::Promise<U> celix::Promise<void>::then(std::function<celix::Promise<U>(celix::Promise<void>)> success, std::function<void(celix::Promise<void>)> failure) { - auto p = std::make_shared<celix::impl::SharedPromiseState<U>>(state->getExecutor()); + auto p = celix::impl::SharedPromiseState<U>::create(state->getExecutor(), state->getPriority()); auto chain = [s = state, p, success = std::move(success), failure = std::move(failure)]() { //chain is called when s is resolved diff --git a/misc/experimental/promise/api/celix/PromiseFactory.h b/misc/experimental/promise/api/celix/PromiseFactory.h index 23a23b2..5a9aec8 100644 --- a/misc/experimental/promise/api/celix/PromiseFactory.h +++ b/misc/experimental/promise/api/celix/PromiseFactory.h @@ -20,32 +20,52 @@ #pragma once #include "celix/Deferred.h" +#include "celix/IExecutor.h" +#include "celix/DefaultExecutor.h" namespace celix { + //TODO documentation class PromiseFactory { public: - explicit PromiseFactory(const tbb::task_arena &executor = {}); - //TODO ctor with callbackExecutor and scheduledExecutor + explicit PromiseFactory(std::shared_ptr<celix::IExecutor> _executor = std::make_shared<celix::DefaultExecutor>()); + + ~PromiseFactory() noexcept; + + PromiseFactory(PromiseFactory&&) = default; + PromiseFactory& operator=(PromiseFactory&&) = default; + PromiseFactory(const PromiseFactory&) = default; + PromiseFactory& operator=(const PromiseFactory&) = default; + + template<typename T> + [[nodiscard]] celix::Deferred<T> deferred(int priority = 0) const; + + template<typename T> + [[nodiscard]] celix::Promise<T> deferredTask(std::function<void(celix::Deferred<T>)> task, int priority = 0) const; template<typename T> - [[nodiscard]] celix::Deferred<T> deferred(); + [[nodiscard]] celix::Promise<T> failed(const std::exception& e, int priority = 0) const; template<typename T> - [[nodiscard]] celix::Promise<T> failed(const std::exception& e); + [[nodiscard]] celix::Promise<T> failed(std::exception_ptr ptr, int priority = 0) const; template<typename T> - [[nodiscard]] celix::Promise<T> failed(std::exception_ptr ptr); + [[nodiscard]] celix::Promise<T> resolved(T&& value) const; template<typename T> - [[nodiscard]] celix::Promise<T> resolved(T&& value); + [[nodiscard]] celix::Promise<T> resolvedWithPrio(T&& value, int priority) const; + + [[nodiscard]] celix::Promise<void> resolved() const; + + [[nodiscard]] celix::Promise<void> resolvedWithPrio(int priority) const; - [[nodiscard]] celix::Promise<void> resolved(); + [[nodiscard]] std::shared_ptr<celix::IExecutor> getExecutor() const; - //TODO rest + //TODO + //[[nodiscard]] std::shared_ptr<celix::IScheduledExecutor> getScheduledExecutor() const; private: - tbb::task_arena executor; //TODO look into different thread pool libraries + std::shared_ptr<celix::IExecutor> executor; }; } @@ -54,36 +74,62 @@ namespace celix { Implementation *********************************************************************************/ -inline celix::PromiseFactory::PromiseFactory(const tbb::task_arena &_executor) : executor(_executor) {} +inline celix::PromiseFactory::PromiseFactory(std::shared_ptr<celix::IExecutor> _executor) : executor{std::move(_executor)} {} + +inline celix::PromiseFactory::~PromiseFactory() noexcept { + executor->wait(); //ensure that the executor is empty before allowing the to be deallocated. +} + +template<typename T> +celix::Deferred<T> celix::PromiseFactory::deferred(int priority) const { + return celix::Deferred<T>{celix::impl::SharedPromiseState<T>::create(executor, priority)}; +} template<typename T> -inline celix::Deferred<T> celix::PromiseFactory::deferred() { - return celix::Deferred<T>{std::make_shared<celix::impl::SharedPromiseState<T>>(executor)}; +[[nodiscard]] celix::Promise<T> celix::PromiseFactory::deferredTask(std::function<void(celix::Deferred<T>)> task, int priority) const { + auto def = deferred<T>(priority); + executor->execute([def, task=std::move(task)]{ + task(def); + }, priority); + return def.getPromise(); } template<typename T> -inline celix::Promise<T> celix::PromiseFactory::failed(const std::exception &e) { - auto p = std::make_shared<celix::impl::SharedPromiseState<T>>(executor); +celix::Promise<T> celix::PromiseFactory::failed(const std::exception &e, int priority) const { + auto p = celix::impl::SharedPromiseState<T>::create(executor, priority); p->fail(e); return celix::Promise<T>{p}; } template<typename T> -inline celix::Promise<T> celix::PromiseFactory::failed(std::exception_ptr ptr) { - auto p = std::make_shared<celix::impl::SharedPromiseState<T>>(executor); +celix::Promise<T> celix::PromiseFactory::failed(std::exception_ptr ptr, int priority) const { + auto p = celix::impl::SharedPromiseState<T>::create(executor, priority); p->fail(ptr); return celix::Promise<T>{p}; } template<typename T> -inline celix::Promise<T> celix::PromiseFactory::resolved(T &&value) { - auto p = std::make_shared<celix::impl::SharedPromiseState<T>>(executor); +celix::Promise<T> celix::PromiseFactory::resolved(T &&value) const { + return resolvedWithPrio<T>(std::forward<T>(value), 0); +} + +template<typename T> +celix::Promise<T> celix::PromiseFactory::resolvedWithPrio(T &&value, int priority) const { + auto p = celix::impl::SharedPromiseState<T>::create(executor, priority); p->resolve(std::forward<T>(value)); return celix::Promise<T>{p}; } -inline celix::Promise<void> celix::PromiseFactory::resolved() { - auto p = std::make_shared<celix::impl::SharedPromiseState<void>>(executor); +inline celix::Promise<void> celix::PromiseFactory::resolved() const { + return resolvedWithPrio(0); +} + +inline celix::Promise<void> celix::PromiseFactory::resolvedWithPrio(int priority) const { + auto p = celix::impl::SharedPromiseState<void>::create(executor, priority); p->resolve(); return celix::Promise<void>{p}; +} + +inline std::shared_ptr<celix::IExecutor> celix::PromiseFactory::getExecutor() const { + return executor; } \ No newline at end of file diff --git a/misc/experimental/promise/api/celix/RejectedExecutionException.h b/misc/experimental/promise/api/celix/RejectedExecutionException.h new file mode 100644 index 0000000..da8c3d5 --- /dev/null +++ b/misc/experimental/promise/api/celix/RejectedExecutionException.h @@ -0,0 +1,32 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#pragma once + +namespace celix { + + struct RejectedExecutionException : public std::exception + { + [[nodiscard]] const char * what () const throw () + { + return "Cannot accept task for execution"; + } + }; + +} \ No newline at end of file diff --git a/misc/experimental/promise/api/celix/impl/SharedPromiseState.h b/misc/experimental/promise/api/celix/impl/SharedPromiseState.h index 6c1dd70..888260b 100644 --- a/misc/experimental/promise/api/celix/impl/SharedPromiseState.h +++ b/misc/experimental/promise/api/celix/impl/SharedPromiseState.h @@ -28,7 +28,7 @@ #include <thread> #include <optional> -#include <tbb/task_arena.h> +#include "celix/IExecutor.h" #include "celix/PromiseInvocationException.h" #include "celix/PromiseTimeoutException.h" @@ -40,13 +40,16 @@ namespace celix::impl { // Pointers make using promises properly unnecessarily complicated. static_assert(!std::is_pointer_v<T>, "Cannot use pointers with promises."); public: - explicit SharedPromiseState(const tbb::task_arena &executor = {}); + static std::shared_ptr<SharedPromiseState<T>> create(std::shared_ptr<celix::IExecutor> _executor, int priority); ~SharedPromiseState() = default; void resolve(T&& value); void resolve(const T& value); + + template<typename U> + void resolveWith(std::shared_ptr<SharedPromiseState<U>> with); void fail(std::exception_ptr e); @@ -101,8 +104,14 @@ namespace celix::impl { void addChain(std::function<void()> chainFunction); - tbb::task_arena getExecutor() const; + std::shared_ptr<celix::IExecutor> getExecutor() const; + + int getPriority() const; private: + explicit SharedPromiseState(std::shared_ptr<celix::IExecutor> _executor, int _priority); + + void setSelf(std::weak_ptr<SharedPromiseState<T>> self); + /** * Complete the resolving and call the registered tasks * A reference to the possible locked unique_lock. @@ -114,8 +123,9 @@ namespace celix::impl { */ void waitForAndCheckData(std::unique_lock<std::mutex> &lck, bool expectValid) const; - tbb::task_arena executor; //TODO look into different thread pool libraries - //TODO add ScheduledExecutorService like object + const std::shared_ptr<celix::IExecutor> executor; + const int priority; + std::weak_ptr<SharedPromiseState<T>> self{}; mutable std::mutex mutex{}; //protects below mutable std::condition_variable cond{}; @@ -129,8 +139,7 @@ namespace celix::impl { template<> class SharedPromiseState<void> { public: - explicit SharedPromiseState(const tbb::task_arena &executor = {}); - + static std::shared_ptr<SharedPromiseState<void>> create(std::shared_ptr<celix::IExecutor> _executor, int priority); ~SharedPromiseState() = default; void resolve(); @@ -178,8 +187,14 @@ namespace celix::impl { void addChain(std::function<void()> chainFunction); - tbb::task_arena getExecutor() const; + std::shared_ptr<celix::IExecutor> getExecutor() const; + + int getPriority() const; private: + explicit SharedPromiseState(std::shared_ptr<celix::IExecutor> _executor, int _priority); + + void setSelf(std::weak_ptr<SharedPromiseState<void>> self); + /** * Complete the resolving and call the registered tasks * A reference to the possible locked unique_lock. @@ -191,8 +206,9 @@ namespace celix::impl { */ void waitForAndCheckData(std::unique_lock<std::mutex> &lck, bool expectValid) const; - tbb::task_arena executor; //TODO look into different thread pool libraries - //TODO add ScheduledExecutorService like object + const std::shared_ptr<celix::IExecutor> executor; + const int priority; + std::weak_ptr<SharedPromiseState<void>> self{}; mutable std::mutex mutex{}; //protects below mutable std::condition_variable cond{}; @@ -208,12 +224,36 @@ namespace celix::impl { *********************************************************************************/ template<typename T> -inline celix::impl::SharedPromiseState<T>::SharedPromiseState(const tbb::task_arena& _executor) : executor{_executor} {} +std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::create(std::shared_ptr<celix::IExecutor> _executor, int priority) { + auto state = std::shared_ptr<celix::impl::SharedPromiseState<T>>{new celix::impl::SharedPromiseState<T>{std::move(_executor), priority}}; + std::weak_ptr<celix::impl::SharedPromiseState<T>> self = state; + state->setSelf(std::move(self)); + return state; +} + +inline std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::SharedPromiseState<void>::create(std::shared_ptr<celix::IExecutor> _executor, int priority) { + auto state = std::shared_ptr<celix::impl::SharedPromiseState<void>>{new celix::impl::SharedPromiseState<void>{std::move(_executor), priority}}; + std::weak_ptr<celix::impl::SharedPromiseState<void>> self = state; + state->setSelf(std::move(self)); + return state; +} + +template<typename T> +celix::impl::SharedPromiseState<T>::SharedPromiseState(std::shared_ptr<celix::IExecutor> _executor, int _priority) : executor{std::move(_executor)}, priority{_priority} {} -inline celix::impl::SharedPromiseState<void>::SharedPromiseState(const tbb::task_arena& _executor) : executor{_executor} {} +inline celix::impl::SharedPromiseState<void>::SharedPromiseState(std::shared_ptr<celix::IExecutor> _executor, int _priority) : executor{std::move(_executor)}, priority{_priority} {} template<typename T> -inline void celix::impl::SharedPromiseState<T>::resolve(T&& value) { +void celix::impl::SharedPromiseState<T>::setSelf(std::weak_ptr<SharedPromiseState<T>> _self) { + self = std::move(_self); +} + +inline void celix::impl::SharedPromiseState<void>::setSelf(std::weak_ptr<SharedPromiseState<void>> _self) { + self = std::move(_self); +} + +template<typename T> +void celix::impl::SharedPromiseState<T>::resolve(T&& value) { std::unique_lock<std::mutex> lck{mutex}; if (done) { throw celix::PromiseInvocationException("Cannot resolve Promise. Promise is already done"); @@ -230,7 +270,7 @@ inline void celix::impl::SharedPromiseState<T>::resolve(T&& value) { template<typename T> -inline void celix::impl::SharedPromiseState<T>::resolve(const T& value) { +void celix::impl::SharedPromiseState<T>::resolve(const T& value) { std::unique_lock<std::mutex> lck{mutex}; if (done) { throw celix::PromiseInvocationException("Cannot resolve Promise. Promise is already done"); @@ -251,7 +291,7 @@ inline void celix::impl::SharedPromiseState<void>::resolve() { } template<typename T> -inline void celix::impl::SharedPromiseState<T>::fail(std::exception_ptr e) { +void celix::impl::SharedPromiseState<T>::fail(std::exception_ptr e) { std::unique_lock<std::mutex> lck{mutex}; if (done) { throw celix::PromiseInvocationException("Cannot fail Promise. Promise is already done"); @@ -270,7 +310,7 @@ inline void celix::impl::SharedPromiseState<void>::fail(std::exception_ptr e) { } template<typename T> -inline void celix::impl::SharedPromiseState<T>::fail(const std::exception& e) { +void celix::impl::SharedPromiseState<T>::fail(const std::exception& e) { fail(std::make_exception_ptr(e)); } @@ -279,7 +319,7 @@ inline void celix::impl::SharedPromiseState<void>::fail(const std::exception& e) } template<typename T> -inline void celix::impl::SharedPromiseState<T>::tryResolve(T&& value) { +void celix::impl::SharedPromiseState<T>::tryResolve(T&& value) { std::unique_lock<std::mutex> lck{mutex}; if (!done) { dataMoved = false; @@ -298,7 +338,7 @@ inline void celix::impl::SharedPromiseState<void>::tryResolve() { } template<typename T> -inline void celix::impl::SharedPromiseState<T>::tryFail(std::exception_ptr e) { +void celix::impl::SharedPromiseState<T>::tryFail(std::exception_ptr e) { std::unique_lock<std::mutex> lck{mutex}; if (!done) { exp = std::move(e); @@ -315,7 +355,7 @@ inline void celix::impl::SharedPromiseState<void>::tryFail(std::exception_ptr e) } template<typename T> -inline bool celix::impl::SharedPromiseState<T>::isDone() const { +bool celix::impl::SharedPromiseState<T>::isDone() const { std::lock_guard<std::mutex> lck{mutex}; return done; } @@ -326,7 +366,7 @@ inline bool celix::impl::SharedPromiseState<void>::isDone() const { } template<typename T> -inline bool celix::impl::SharedPromiseState<T>::isSuccessfullyResolved() const { +bool celix::impl::SharedPromiseState<T>::isSuccessfullyResolved() const { std::lock_guard<std::mutex> lck{mutex}; return done && !exp; } @@ -338,7 +378,7 @@ inline bool celix::impl::SharedPromiseState<void>::isSuccessfullyResolved() cons template<typename T> -inline void celix::impl::SharedPromiseState<T>::waitForAndCheckData(std::unique_lock<std::mutex>& lck, bool expectValid) const { +void celix::impl::SharedPromiseState<T>::waitForAndCheckData(std::unique_lock<std::mutex>& lck, bool expectValid) const { if (!lck.owns_lock()) { lck.lock(); } @@ -381,28 +421,28 @@ inline void celix::impl::SharedPromiseState<void>::waitForAndCheckData(std::uniq } template<typename T> -inline T& celix::impl::SharedPromiseState<T>::getValue() & { +T& celix::impl::SharedPromiseState<T>::getValue() & { std::unique_lock<std::mutex> lck{mutex}; waitForAndCheckData(lck, true); return *data; } template<typename T> -inline const T& celix::impl::SharedPromiseState<T>::getValue() const & { +const T& celix::impl::SharedPromiseState<T>::getValue() const & { std::unique_lock<std::mutex> lck{mutex}; waitForAndCheckData(lck, true); return *data; } template<typename T> -inline T&& celix::impl::SharedPromiseState<T>::getValue() && { +T&& celix::impl::SharedPromiseState<T>::getValue() && { std::unique_lock<std::mutex> lck{mutex}; waitForAndCheckData(lck, true); return std::move(*data); } template<typename T> -inline const T&& celix::impl::SharedPromiseState<T>::getValue() const && { +const T&& celix::impl::SharedPromiseState<T>::getValue() const && { std::unique_lock<std::mutex> lck{mutex}; waitForAndCheckData(lck, true); return std::move(*data); @@ -415,7 +455,7 @@ inline bool celix::impl::SharedPromiseState<void>::getValue() const { } template<typename T> -inline T celix::impl::SharedPromiseState<T>::moveOrGetValue() { +T celix::impl::SharedPromiseState<T>::moveOrGetValue() { std::unique_lock<std::mutex> lck{mutex}; waitForAndCheckData(lck, true); if constexpr (std::is_move_constructible_v<T>) { @@ -427,16 +467,25 @@ inline T celix::impl::SharedPromiseState<T>::moveOrGetValue() { } template<typename T> -inline tbb::task_arena celix::impl::SharedPromiseState<T>::getExecutor() const { +std::shared_ptr<celix::IExecutor> celix::impl::SharedPromiseState<T>::getExecutor() const { return executor; } -inline tbb::task_arena celix::impl::SharedPromiseState<void>::getExecutor() const { +inline std::shared_ptr<celix::IExecutor> celix::impl::SharedPromiseState<void>::getExecutor() const { return executor; } template<typename T> -inline void celix::impl::SharedPromiseState<T>::wait() const { +int celix::impl::SharedPromiseState<T>::getPriority() const { + return priority; +} + +inline int celix::impl::SharedPromiseState<void>::getPriority() const { + return priority; +} + +template<typename T> +void celix::impl::SharedPromiseState<T>::wait() const { std::unique_lock<std::mutex> lck{mutex}; cond.wait(lck, [this]{return done;}); } @@ -447,7 +496,7 @@ inline void celix::impl::SharedPromiseState<void>::wait() const { } template<typename T> -inline std::exception_ptr celix::impl::SharedPromiseState<T>::getFailure() const { +std::exception_ptr celix::impl::SharedPromiseState<T>::getFailure() const { std::unique_lock<std::mutex> lck{mutex}; waitForAndCheckData(lck, false); return exp; @@ -460,56 +509,67 @@ inline std::exception_ptr celix::impl::SharedPromiseState<void>::getFailure() co } template<typename T> -inline void celix::impl::SharedPromiseState<T>::resolveWith(std::shared_ptr<SharedPromiseState<T>> with) { - with->addOnResolve([this](std::optional<T> v, std::exception_ptr e) { +void celix::impl::SharedPromiseState<T>::resolveWith(std::shared_ptr<SharedPromiseState<T>> with) { + with->addOnResolve([s = self.lock()](std::optional<T> v, std::exception_ptr e) { if (v) { - tryResolve(std::move(*v)); + s->tryResolve(std::move(*v)); } else { - tryFail(std::move(e)); + s->tryFail(std::move(e)); } }); } inline void celix::impl::SharedPromiseState<void>::resolveWith(std::shared_ptr<SharedPromiseState<void>> with) { - with->addOnResolve([this](std::optional<std::exception_ptr> e) { + with->addOnResolve([s = self.lock()](std::optional<std::exception_ptr> e) { if (!e) { - tryResolve(); + s->tryResolve(); } else { - tryFail(std::move(*e)); + s->tryFail(std::move(*e)); } }); } template<typename T> template<typename Rep, typename Period> -inline std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::timeout(std::shared_ptr<SharedPromiseState<T>> state, std::chrono::duration<Rep, Period> duration) { - auto p = std::make_shared<celix::impl::SharedPromiseState<T>>(state->executor); +std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::timeout(std::shared_ptr<SharedPromiseState<T>> state, std::chrono::duration<Rep, Period> duration) { + auto p = celix::impl::SharedPromiseState<T>::create(state->executor, state->priority); p->resolveWith(state); - state->executor.execute([duration, p]{ - std::this_thread::sleep_for(duration); //TODO use scheduler instead of sleep on thread (using unnecessary resources) - p->tryFail(std::make_exception_ptr(celix::PromiseTimeoutException{})); - //TODO is a callback to deferred needed to abort ? + /*TODO use scheduler instead of sleep on thread (using unnecessary resources) + auto schedFuture = scheduler->schedule<Rep, Period>([]{ + p->tryFail(std::make_exception_ptr(celix::PromiseTimeoutException{}));}, duration); + p->addOnResolve([schedFuture]() { + schedFuture->cancel(); }); + */ + state->executor->execute([duration, p]{ + std::this_thread::sleep_for(duration); + p->tryFail(std::make_exception_ptr(celix::PromiseTimeoutException{})); + }, state->priority); return p; } template<typename Rep, typename Period> -inline std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::SharedPromiseState<void>::timeout(std::shared_ptr<SharedPromiseState<void>> state, std::chrono::duration<Rep, Period> duration) { - auto p = std::make_shared<celix::impl::SharedPromiseState<void>>(state->executor); +std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::SharedPromiseState<void>::timeout(std::shared_ptr<SharedPromiseState<void>> state, std::chrono::duration<Rep, Period> duration) { + auto p = celix::impl::SharedPromiseState<void>::create(state->executor, state->priority); p->resolveWith(state); - state->executor.execute([duration, p]{ - std::this_thread::sleep_for(duration); //TODO use scheduler instead of sleep on thread (using unnecessary resources) - p->tryFail(std::make_exception_ptr(celix::PromiseTimeoutException{})); - //TODO is a callback to deferred needed to abort ? + /*TODO use scheduler instead of sleep on thread (using unnecessary resources) + auto schedFuture = scheduler->schedule<Rep, Period>([]{ + p->tryFail(std::make_exception_ptr(celix::PromiseTimeoutException{}));}, duration); + p->addOnResolve([schedFuture]() { + schedFuture->cancel(); }); + */ + state->executor->execute([duration, p]{ + std::this_thread::sleep_for(duration); + p->tryFail(std::make_exception_ptr(celix::PromiseTimeoutException{})); + }, state->priority); return p; } template<typename T> template<typename Rep, typename Period> -inline std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::delay(std::chrono::duration<Rep, Period> duration) { - auto p = std::make_shared<celix::impl::SharedPromiseState<T>>(executor); - +std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::delay(std::chrono::duration<Rep, Period> duration) { + auto p = celix::impl::SharedPromiseState<T>::create(executor, priority); addOnResolve([p, duration](std::optional<T> v, std::exception_ptr e) { std::this_thread::sleep_for(duration); //TODO use scheduler instead of sleep on thread (using unnecessary resources) try { @@ -524,14 +584,12 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPr p->fail(std::current_exception()); } }); - return p; } template<typename Rep, typename Period> -inline std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::SharedPromiseState<void>::delay(std::chrono::duration<Rep, Period> duration) { - auto p = std::make_shared<celix::impl::SharedPromiseState<void>>(executor); - +std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::SharedPromiseState<void>::delay(std::chrono::duration<Rep, Period> duration) { + auto p = celix::impl::SharedPromiseState<void>::create(executor, priority); addOnResolve([p, duration](std::optional<std::exception_ptr> e) { std::this_thread::sleep_for(duration); //TODO use scheduler instead of sleep on thread (using unnecessary resources) try { @@ -546,17 +604,15 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::Share p->fail(std::current_exception()); } }); - return p; } template<typename T> -inline std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::recover(std::function<T()> recover) { +std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::recover(std::function<T()> recover) { if (!recover) { throw celix::PromiseInvocationException{"provided recover callback is not valid"}; } - auto p = std::make_shared<celix::impl::SharedPromiseState<T>>(executor); - + auto p = celix::impl::SharedPromiseState<T>::create(executor, priority); addOnResolve([p, recover = std::move(recover)](std::optional<T> v, const std::exception_ptr& /*e*/) { if (v) { p->resolve(std::move(*v)); @@ -575,7 +631,8 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::Share if (!recover) { throw celix::PromiseInvocationException{"provided recover callback is not valid"}; } - auto p = std::make_shared<celix::impl::SharedPromiseState<void>>(executor); + + auto p = celix::impl::SharedPromiseState<void>::create(executor, priority); addOnResolve([p, recover = std::move(recover)](std::optional<std::exception_ptr> e) { if (!e) { @@ -593,16 +650,16 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::Share } template<typename T> -inline std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::filter(std::function<bool(const T&)> predicate) { +std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::filter(std::function<bool(const T&)> predicate) { if (!predicate) { throw celix::PromiseInvocationException{"provided predicate callback is not valid"}; } - auto p = std::make_shared<celix::impl::SharedPromiseState<T>>(executor); - auto chainFunction = [this, p, predicate = std::move(predicate)] { - if (isSuccessfullyResolved()) { + auto p = celix::impl::SharedPromiseState<T>::create(executor, priority); + auto chainFunction = [s = self.lock(), p, predicate = std::move(predicate)] { + if (s->isSuccessfullyResolved()) { try { - if (predicate(getValue())) { - p->resolve(moveOrGetValue()); + if (predicate(s->getValue())) { + p->resolve(s->moveOrGetValue()); } else { throw celix::PromiseInvocationException{"predicate does not accept value"}; } @@ -610,7 +667,7 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPr p->fail(std::current_exception()); } } else { - p->fail(getFailure()); + p->fail(s->getFailure()); } }; addChain(std::move(chainFunction)); @@ -619,16 +676,16 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPr template<typename T> -inline std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::fallbackTo(std::shared_ptr<celix::impl::SharedPromiseState<T>> fallbackTo) { - auto p = std::make_shared<celix::impl::SharedPromiseState<T>>(executor); - auto chainFunction = [this, p, fallbackTo = std::move(fallbackTo)] { - if (isSuccessfullyResolved()) { - p->resolve(moveOrGetValue()); +std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::fallbackTo(std::shared_ptr<celix::impl::SharedPromiseState<T>> fallbackTo) { + auto p = celix::impl::SharedPromiseState<T>::create(executor, priority); + auto chainFunction = [s = self.lock(), p, fallbackTo = std::move(fallbackTo)] { + if (s->isSuccessfullyResolved()) { + p->resolve(s->moveOrGetValue()); } else { if (fallbackTo->isSuccessfullyResolved()) { p->resolve(fallbackTo->moveOrGetValue()); } else { - p->fail(getFailure()); + p->fail(s->getFailure()); } } }; @@ -637,17 +694,17 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPr } inline std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::SharedPromiseState<void>::fallbackTo(std::shared_ptr<celix::impl::SharedPromiseState<void>> fallbackTo) { - auto p = std::make_shared<celix::impl::SharedPromiseState<void>>(executor); - auto chainFunction = [this, p, fallbackTo = std::move(fallbackTo)] { - if (isSuccessfullyResolved()) { - getValue(); + auto p = celix::impl::SharedPromiseState<void>::create(executor, priority); + auto chainFunction = [s = self.lock(), p, fallbackTo = std::move(fallbackTo)] { + if (s->isSuccessfullyResolved()) { + s->getValue(); p->resolve(); } else { if (fallbackTo->isSuccessfullyResolved()) { fallbackTo->getValue(); p->resolve(); } else { - p->fail(getFailure()); + p->fail(s->getFailure()); } } }; @@ -656,18 +713,20 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::Share } template<typename T> -inline void celix::impl::SharedPromiseState<T>::addChain(std::function<void()> chainFunction) { +void celix::impl::SharedPromiseState<T>::addChain(std::function<void()> chainFunction) { std::function<void()> localChain{}; { std::lock_guard<std::mutex> lck{mutex}; if (!done) { - chain.push_back(std::move(chainFunction)); + chain.emplace_back([func = std::move(chainFunction)]() { + func(); + }); } else { localChain = std::move(chainFunction); } } if (localChain) { - localChain(); + executor->execute(std::move(localChain), priority); } } @@ -676,29 +735,31 @@ inline void celix::impl::SharedPromiseState<void>::addChain(std::function<void() { std::lock_guard<std::mutex> lck{mutex}; if (!done) { - chain.push_back(std::move(chainFunction)); + chain.emplace_back([func = std::move(chainFunction)]() { + func(); + }); } else { localChain = std::move(chainFunction); } } if (localChain) { - localChain(); + executor->execute(std::move(localChain), priority); } } template<typename T> template<typename R> -inline std::shared_ptr<celix::impl::SharedPromiseState<R>> celix::impl::SharedPromiseState<T>::map(std::function<R(T)> mapper) { +std::shared_ptr<celix::impl::SharedPromiseState<R>> celix::impl::SharedPromiseState<T>::map(std::function<R(T)> mapper) { if (!mapper) { throw celix::PromiseInvocationException("provided mapper is not valid"); } - auto p = std::make_shared<celix::impl::SharedPromiseState<R>>(executor); - auto chainFunction = [this, p, mapper = std::move(mapper)] { + auto p = celix::impl::SharedPromiseState<R>::create(executor, priority); + auto chainFunction = [s = self.lock(), p, mapper = std::move(mapper)] { try { - if (isSuccessfullyResolved()) { - p->resolve(mapper(moveOrGetValue())); + if (s->isSuccessfullyResolved()) { + p->resolve(mapper(s->moveOrGetValue())); } else { - p->fail(getFailure()); + p->fail(s->getFailure()); } } catch (...) { p->fail(std::current_exception()); @@ -709,18 +770,18 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<R>> celix::impl::SharedPr } template<typename R> -inline std::shared_ptr<celix::impl::SharedPromiseState<R>> celix::impl::SharedPromiseState<void>::map(std::function<R()> mapper) { +std::shared_ptr<celix::impl::SharedPromiseState<R>> celix::impl::SharedPromiseState<void>::map(std::function<R()> mapper) { if (!mapper) { throw celix::PromiseInvocationException("provided mapper is not valid"); } - auto p = std::make_shared<celix::impl::SharedPromiseState<R>>(executor); - auto chainFunction = [this, p, mapper = std::move(mapper)] { + auto p = celix::impl::SharedPromiseState<R>::create(executor, priority); + auto chainFunction = [s = self.lock(), p, mapper = std::move(mapper)] { try { - if (isSuccessfullyResolved()) { - getValue(); + if (s->isSuccessfullyResolved()) { + s->getValue(); p->resolve(mapper()); } else { - p->fail(getFailure()); + p->fail(s->getFailure()); } } catch (...) { p->fail(std::current_exception()); @@ -731,21 +792,21 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<R>> celix::impl::SharedPr } template<typename T> -inline std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::thenAccept(std::function<void(T)> consumer) { +std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::thenAccept(std::function<void(T)> consumer) { if (!consumer) { throw celix::PromiseInvocationException("provided consumer is not valid"); } - auto p = std::make_shared<celix::impl::SharedPromiseState<T>>(executor); - auto chainFunction = [this, p, consumer = std::move(consumer)] { - if (isSuccessfullyResolved()) { + auto p = celix::impl::SharedPromiseState<T>::create(executor, priority); + auto chainFunction = [s = self.lock(), p, consumer = std::move(consumer)] { + if (s->isSuccessfullyResolved()) { try { - consumer(getValue()); - p->resolve(moveOrGetValue()); + consumer(s->getValue()); + p->resolve(s->moveOrGetValue()); } catch (...) { p->fail(std::current_exception()); } } else { - p->fail(getFailure()); + p->fail(s->getFailure()); } }; addChain(std::move(chainFunction)); @@ -756,18 +817,18 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::Share if (!consumer) { throw celix::PromiseInvocationException("provided consumer is not valid"); } - auto p = std::make_shared<celix::impl::SharedPromiseState<void>>(executor); - auto chainFunction = [this, p, consumer = std::move(consumer)] { - if (isSuccessfullyResolved()) { + auto p = celix::impl::SharedPromiseState<void>::create(executor, priority); + auto chainFunction = [s = self.lock(), p, consumer = std::move(consumer)] { + if (s->isSuccessfullyResolved()) { try { - getValue(); + s->getValue(); consumer(); p->resolve(); } catch (...) { p->fail(std::current_exception()); } } else { - p->fail(getFailure()); + p->fail(s->getFailure()); } }; addChain(std::move(chainFunction)); @@ -775,28 +836,28 @@ inline std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::Share } template<typename T> -inline void celix::impl::SharedPromiseState<T>::addOnResolve(std::function<void(std::optional<T> val, std::exception_ptr exp)> callback) { - std::function<void()> task = [this, callback = std::move(callback)] { - std::exception_ptr e = nullptr; +void celix::impl::SharedPromiseState<T>::addOnResolve(std::function<void(std::optional<T> val, std::exception_ptr exp)> callback) { + std::function<void()> task = [s = self.lock(), callback = std::move(callback)] { + std::exception_ptr e; { - std::lock_guard<std::mutex> lck{mutex}; - e = exp; + std::lock_guard<std::mutex> lck{s->mutex}; + e = s->exp; } if(e) { callback({}, e); } else { - callback(getValue(), e); + callback(s->getValue(), e); } }; addChain(task); } inline void celix::impl::SharedPromiseState<void>::addOnResolve(std::function<void(std::optional<std::exception_ptr> exp)> callback) { - std::function<void()> task = [this, callback = std::move(callback)] { - std::exception_ptr e = nullptr; + std::function<void()> task = [s = self.lock(), callback = std::move(callback)] { + std::exception_ptr e; { - std::lock_guard<std::mutex> lck{mutex}; - e = exp; + std::lock_guard<std::mutex> lck{s->mutex}; + e = s->exp; } callback(e); }; @@ -804,19 +865,19 @@ inline void celix::impl::SharedPromiseState<void>::addOnResolve(std::function<vo } template<typename T> -inline void celix::impl::SharedPromiseState<T>::addOnSuccessConsumeCallback(std::function<void(T)> callback) { - std::function<void()> task = [this, callback = std::move(callback)] { - if (isSuccessfullyResolved()) { - callback(getValue()); +void celix::impl::SharedPromiseState<T>::addOnSuccessConsumeCallback(std::function<void(T)> callback) { + std::function<void()> task = [s = self.lock(), callback = std::move(callback)] { + if (s->isSuccessfullyResolved()) { + callback(s->getValue()); } }; addChain(task); } inline void celix::impl::SharedPromiseState<void>::addOnSuccessConsumeCallback(std::function<void()> callback) { - std::function<void()> task = [this, callback = std::move(callback)] { - if (isSuccessfullyResolved()) { - getValue(); + std::function<void()> task = [s = self.lock(), callback = std::move(callback)] { + if (s->isSuccessfullyResolved()) { + s->getValue(); callback(); } }; @@ -824,11 +885,11 @@ inline void celix::impl::SharedPromiseState<void>::addOnSuccessConsumeCallback(s } template<typename T> -inline void celix::impl::SharedPromiseState<T>::addOnFailureConsumeCallback(std::function<void(const std::exception&)> callback) { - std::function<void()> task = [this, callback = std::move(callback)] { - if (!isSuccessfullyResolved()) { +void celix::impl::SharedPromiseState<T>::addOnFailureConsumeCallback(std::function<void(const std::exception&)> callback) { + std::function<void()> task = [s = self.lock(), callback = std::move(callback)] { + if (!s->isSuccessfullyResolved()) { try { - std::rethrow_exception(getFailure()); + std::rethrow_exception(s->getFailure()); } catch (const std::exception &e) { callback(e); } catch (...) { @@ -842,10 +903,10 @@ inline void celix::impl::SharedPromiseState<T>::addOnFailureConsumeCallback(std: } inline void celix::impl::SharedPromiseState<void>::addOnFailureConsumeCallback(std::function<void(const std::exception&)> callback) { - std::function<void()> task = [this, callback = std::move(callback)] { - if (!isSuccessfullyResolved()) { + std::function<void()> task = [s = self.lock(), callback = std::move(callback)] { + if (!s->isSuccessfullyResolved()) { try { - std::rethrow_exception(getFailure()); + std::rethrow_exception(s->getFailure()); } catch (const std::exception &e) { callback(e); } catch (...) { @@ -859,7 +920,7 @@ inline void celix::impl::SharedPromiseState<void>::addOnFailureConsumeCallback(s } template<typename T> -inline void celix::impl::SharedPromiseState<T>::complete(std::unique_lock<std::mutex>& lck) { +void celix::impl::SharedPromiseState<T>::complete(std::unique_lock<std::mutex>& lck) { if (!lck.owns_lock()) { lck.lock(); } @@ -869,13 +930,15 @@ inline void celix::impl::SharedPromiseState<T>::complete(std::unique_lock<std::m done = true; cond.notify_all(); + while (!chain.empty()) { - std::vector<std::function<void()>> localChain{}; - localChain.swap(chain); + std::vector<std::function<void()>> localChains{}; + localChains.swap(chain); lck.unlock(); - for (auto &chainTask : localChain) { - executor.execute(chainTask); //TODO maybe use std::move? //TODO optimize if complete is already executor on executor? + for (auto &chainTask : localChains) { + executor->execute(std::move(chainTask), priority); } + localChains.clear(); lck.lock(); } } @@ -890,13 +953,15 @@ inline void celix::impl::SharedPromiseState<void>::complete(std::unique_lock<std done = true; cond.notify_all(); + while (!chain.empty()) { - std::vector<std::function<void()>> localChain{}; - localChain.swap(chain); + std::vector<std::function<void()>> localChains{}; + localChains.swap(chain); lck.unlock(); - for (auto &chainTask : localChain) { - executor.execute(chainTask); //TODO maybe use std::move? //TODO optimize if complete is already executor on executor? + for (auto &chainTask : localChains) { + executor->execute(std::move(chainTask), priority); } + localChains.clear(); lck.lock(); } } \ No newline at end of file diff --git a/misc/experimental/promise/cmake/FindTBB.cmake b/misc/experimental/promise/cmake/FindTBB.cmake deleted file mode 100644 index b01cba6..0000000 --- a/misc/experimental/promise/cmake/FindTBB.cmake +++ /dev/null @@ -1,418 +0,0 @@ -# - Find ThreadingBuildingBlocks include dirs and libraries -# Use this module by invoking find_package with the form: -# find_package(TBB -# [REQUIRED] # Fail with error if TBB is not found -# ) # -# Once done, this will define -# -# TBB_FOUND - system has TBB -# TBB_INCLUDE_DIRS - the TBB include directories -# TBB_LIBRARIES - TBB libraries to be lined, doesn't include malloc or -# malloc proxy -# TBB::tbb - imported target for the TBB library -# -# TBB_VERSION_MAJOR - Major Product Version Number -# TBB_VERSION_MINOR - Minor Product Version Number -# TBB_INTERFACE_VERSION - Engineering Focused Version Number -# TBB_COMPATIBLE_INTERFACE_VERSION - The oldest major interface version -# still supported. This uses the engineering -# focused interface version numbers. -# -# TBB_MALLOC_FOUND - system has TBB malloc library -# TBB_MALLOC_INCLUDE_DIRS - the TBB malloc include directories -# TBB_MALLOC_LIBRARIES - The TBB malloc libraries to be lined -# TBB::malloc - imported target for the TBB malloc library -# -# TBB_MALLOC_PROXY_FOUND - system has TBB malloc proxy library -# TBB_MALLOC_PROXY_INCLUDE_DIRS = the TBB malloc proxy include directories -# TBB_MALLOC_PROXY_LIBRARIES - The TBB malloc proxy libraries to be lined -# TBB::malloc_proxy - imported target for the TBB malloc proxy library -# -# -# This module reads hints about search locations from variables: -# ENV TBB_ARCH_PLATFORM - for eg. set it to "mic" for Xeon Phi builds -# ENV TBB_ROOT or just TBB_ROOT - root directory of tbb installation -# ENV TBB_BUILD_PREFIX - specifies the build prefix for user built tbb -# libraries. Should be specified with ENV TBB_ROOT -# and optionally... -# ENV TBB_BUILD_DIR - if build directory is different than ${TBB_ROOT}/build -# -# -# Modified by Robert Maynard from the original OGRE source -# -#------------------------------------------------------------------- -# This file is part of the CMake build system for OGRE -# (Object-oriented Graphics Rendering Engine) -# For the latest info, see http://www.ogre3d.org/ -# -# The contents of this file are placed in the public domain. Feel -# free to make use of it in any way you like. -#------------------------------------------------------------------- -# -#============================================================================= -# Copyright 2010-2012 Kitware, Inc. -# Copyright 2012 Rolf Eike Beer <[email protected]> -# -# Distributed under the OSI-approved BSD License (the "License"); -# see accompanying file Copyright.txt for details. -# -# This software is distributed WITHOUT ANY WARRANTY; without even the -# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -# See the License for more information. -#============================================================================= -# (To distribute this file outside of CMake, substitute the full -# License text for the above reference.) - - -#============================================================================= -# FindTBB helper functions and macros -# - -#==================================================== -# Fix the library path in case it is a linker script -#==================================================== -function(tbb_extract_real_library library real_library) - if(NOT UNIX OR NOT EXISTS ${library}) - set(${real_library} "${library}" PARENT_SCOPE) - return() - endif() - - #Read in the first 4 bytes and see if they are the ELF magic number - set(_elf_magic "7f454c46") - file(READ ${library} _hex_data OFFSET 0 LIMIT 4 HEX) - if(_hex_data STREQUAL _elf_magic) - #we have opened a elf binary so this is what - #we should link to - set(${real_library} "${library}" PARENT_SCOPE) - return() - endif() - - file(READ ${library} _data OFFSET 0 LIMIT 1024) - if("${_data}" MATCHES "INPUT \\(([^(]+)\\)") - #extract out the .so name from REGEX MATCH command - set(_proper_so_name "${CMAKE_MATCH_1}") - - #construct path to the real .so which is presumed to be in the same directory - #as the input file - get_filename_component(_so_dir "${library}" DIRECTORY) - set(${real_library} "${_so_dir}/${_proper_so_name}" PARENT_SCOPE) - else() - #unable to determine what this library is so just hope everything works - #and pass it unmodified. - set(${real_library} "${library}" PARENT_SCOPE) - endif() -endfunction() - -#=============================================== -# Do the final processing for the package find. -#=============================================== -macro(findpkg_finish PREFIX TARGET_NAME) - if (${PREFIX}_INCLUDE_DIR AND ${PREFIX}_LIBRARY) - set(${PREFIX}_FOUND TRUE) - set (${PREFIX}_INCLUDE_DIRS ${${PREFIX}_INCLUDE_DIR}) - set (${PREFIX}_LIBRARIES ${${PREFIX}_LIBRARY}) - else () - if (${PREFIX}_FIND_REQUIRED AND NOT ${PREFIX}_FIND_QUIETLY) - message(FATAL_ERROR "Required library ${PREFIX} not found.") - endif () - endif () - - if (NOT TARGET "TBB::${TARGET_NAME}") - if (${PREFIX}_LIBRARY_RELEASE) - tbb_extract_real_library(${${PREFIX}_LIBRARY_RELEASE} real_release) - endif () - if (${PREFIX}_LIBRARY_DEBUG) - tbb_extract_real_library(${${PREFIX}_LIBRARY_DEBUG} real_debug) - endif () - add_library(TBB::${TARGET_NAME} UNKNOWN IMPORTED) - set_target_properties(TBB::${TARGET_NAME} PROPERTIES - INTERFACE_INCLUDE_DIRECTORIES "${${PREFIX}_INCLUDE_DIR}") - if (${PREFIX}_LIBRARY_DEBUG AND ${PREFIX}_LIBRARY_RELEASE) - set_target_properties(TBB::${TARGET_NAME} PROPERTIES - IMPORTED_LOCATION "${real_release}" - IMPORTED_LOCATION_DEBUG "${real_debug}" - IMPORTED_LOCATION_RELEASE "${real_release}") - elseif (${PREFIX}_LIBRARY_RELEASE) - set_target_properties(TBB::${TARGET_NAME} PROPERTIES - IMPORTED_LOCATION "${real_release}") - elseif (${PREFIX}_LIBRARY_DEBUG) - set_target_properties(TBB::${TARGET_NAME} PROPERTIES - IMPORTED_LOCATION "${real_debug}") - endif () - endif () - - #mark the following variables as internal variables - mark_as_advanced(${PREFIX}_INCLUDE_DIR - ${PREFIX}_LIBRARY - ${PREFIX}_LIBRARY_DEBUG - ${PREFIX}_LIBRARY_RELEASE) -endmacro() - -#=============================================== -# Generate debug names from given release names -#=============================================== -macro(get_debug_names PREFIX) - foreach(i ${${PREFIX}}) - set(${PREFIX}_DEBUG ${${PREFIX}_DEBUG} ${i}d ${i}D ${i}_d ${i}_D ${i}_debug ${i}) - endforeach() -endmacro() - -#=============================================== -# See if we have env vars to help us find tbb -#=============================================== -macro(getenv_path VAR) - set(ENV_${VAR} $ENV{${VAR}}) - # replace won't work if var is blank - if (ENV_${VAR}) - string( REGEX REPLACE "\\\\" "/" ENV_${VAR} ${ENV_${VAR}} ) - endif () -endmacro() - -#=============================================== -# Couple a set of release AND debug libraries -#=============================================== -macro(make_library_set PREFIX) - if (${PREFIX}_RELEASE AND ${PREFIX}_DEBUG) - set(${PREFIX} optimized ${${PREFIX}_RELEASE} debug ${${PREFIX}_DEBUG}) - elseif (${PREFIX}_RELEASE) - set(${PREFIX} ${${PREFIX}_RELEASE}) - elseif (${PREFIX}_DEBUG) - set(${PREFIX} ${${PREFIX}_DEBUG}) - endif () -endmacro() - - -#============================================================================= -# Now to actually find TBB -# - -# Get path, convert backslashes as ${ENV_${var}} -getenv_path(TBB_ROOT) - -# initialize search paths -set(TBB_PREFIX_PATH ${TBB_ROOT} ${ENV_TBB_ROOT}) -set(TBB_INC_SEARCH_PATH "") -set(TBB_LIB_SEARCH_PATH "") - - -# If user built from sources -set(TBB_BUILD_PREFIX $ENV{TBB_BUILD_PREFIX}) -if (TBB_BUILD_PREFIX AND ENV_TBB_ROOT) - getenv_path(TBB_BUILD_DIR) - if (NOT ENV_TBB_BUILD_DIR) - set(ENV_TBB_BUILD_DIR ${ENV_TBB_ROOT}/build) - endif () - - # include directory under ${ENV_TBB_ROOT}/include - list(APPEND TBB_LIB_SEARCH_PATH - ${ENV_TBB_BUILD_DIR}/${TBB_BUILD_PREFIX}_release - ${ENV_TBB_BUILD_DIR}/${TBB_BUILD_PREFIX}_debug) -endif () - - -# For Windows, let's assume that the user might be using the precompiled -# TBB packages from the main website. These use a rather awkward directory -# structure (at least for automatically finding the right files) depending -# on platform and compiler, but we'll do our best to accommodate it. -# Not adding the same effort for the precompiled linux builds, though. Those -# have different versions for CC compiler versions and linux kernels which -# will never adequately match the user's setup, so there is no feasible way -# to detect the "best" version to use. The user will have to manually -# select the right files. (Chances are the distributions are shipping their -# custom version of tbb, anyway, so the problem is probably nonexistent.) -if (WIN32 AND MSVC) - set(COMPILER_PREFIX "vc7.1") - if (MSVC_VERSION EQUAL 1400) - set(COMPILER_PREFIX "vc8") - elseif(MSVC_VERSION EQUAL 1500) - set(COMPILER_PREFIX "vc9") - elseif(MSVC_VERSION EQUAL 1600) - set(COMPILER_PREFIX "vc10") - elseif(MSVC_VERSION EQUAL 1700) - set(COMPILER_PREFIX "vc11") - elseif(MSVC_VERSION EQUAL 1800) - set(COMPILER_PREFIX "vc12") - elseif(MSVC_VERSION EQUAL 1900) - set(COMPILER_PREFIX "vc14") - endif () - - # for each prefix path, add ia32/64\${COMPILER_PREFIX}\lib to the lib search path - foreach (dir IN LISTS TBB_PREFIX_PATH) - if (CMAKE_CL_64) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/ia64/${COMPILER_PREFIX}/lib) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/lib/ia64/${COMPILER_PREFIX}) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/intel64/${COMPILER_PREFIX}/lib) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/lib/intel64/${COMPILER_PREFIX}) - else () - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/ia32/${COMPILER_PREFIX}/lib) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/lib/ia32/${COMPILER_PREFIX}) - endif () - endforeach () -endif () - -# For OS X binary distribution, choose libc++ based libraries for Mavericks (10.9) -# and above and AppleClang -if (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND - NOT CMAKE_SYSTEM_VERSION VERSION_LESS 13.0) - set (USE_LIBCXX OFF) - cmake_policy(GET CMP0025 POLICY_VAR) - - if (POLICY_VAR STREQUAL "NEW") - if (CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang") - set (USE_LIBCXX ON) - endif () - else () - if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") - set (USE_LIBCXX ON) - endif () - endif () - - if (USE_LIBCXX) - foreach (dir IN LISTS TBB_PREFIX_PATH) - list (APPEND TBB_LIB_SEARCH_PATH ${dir}/lib/libc++ ${dir}/libc++/lib) - endforeach () - endif () -endif () - -# check compiler ABI -if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") - set(COMPILER_PREFIX) - if (NOT CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.7) - list(APPEND COMPILER_PREFIX "gcc4.7") - endif() - if (NOT CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.4) - list(APPEND COMPILER_PREFIX "gcc4.4") - endif() - list(APPEND COMPILER_PREFIX "gcc4.1") -elseif(CMAKE_CXX_COMPILER_ID MATCHES "Clang") - set(COMPILER_PREFIX) - if (NOT CMAKE_CXX_COMPILER_VERSION VERSION_LESS 3.6) - list(APPEND COMPILER_PREFIX "gcc4.7") - endif() - list(APPEND COMPILER_PREFIX "gcc4.4") -else() # Assume compatibility with 4.4 for other compilers - list(APPEND COMPILER_PREFIX "gcc4.4") -endif () - -# if platform architecture is explicitly specified -set(TBB_ARCH_PLATFORM $ENV{TBB_ARCH_PLATFORM}) -if (TBB_ARCH_PLATFORM) - foreach (dir IN LISTS TBB_PREFIX_PATH) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/${TBB_ARCH_PLATFORM}/lib) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/lib/${TBB_ARCH_PLATFORM}) - endforeach () -endif () - -foreach (dir IN LISTS TBB_PREFIX_PATH) - foreach (prefix IN LISTS COMPILER_PREFIX) - if (CMAKE_SIZEOF_VOID_P EQUAL 8) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/lib/intel64) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/lib/intel64/${prefix}) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/intel64/lib) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/intel64/${prefix}/lib) - else () - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/lib/ia32) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/lib/ia32/${prefix}) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/ia32/lib) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/ia32/${prefix}/lib) - endif () - endforeach() -endforeach () - -# add general search paths -foreach (dir IN LISTS TBB_PREFIX_PATH) - list(APPEND TBB_LIB_SEARCH_PATH ${dir}/lib ${dir}/Lib ${dir}/lib/tbb - ${dir}/Libs) - list(APPEND TBB_INC_SEARCH_PATH ${dir}/include ${dir}/Include - ${dir}/include/tbb) -endforeach () - -set(TBB_LIBRARY_NAMES tbb) -get_debug_names(TBB_LIBRARY_NAMES) - - -find_path(TBB_INCLUDE_DIR - NAMES tbb/tbb.h - PATHS ${TBB_INC_SEARCH_PATH}) - -find_library(TBB_LIBRARY_RELEASE - NAMES ${TBB_LIBRARY_NAMES} - PATHS ${TBB_LIB_SEARCH_PATH}) -find_library(TBB_LIBRARY_DEBUG - NAMES ${TBB_LIBRARY_NAMES_DEBUG} - PATHS ${TBB_LIB_SEARCH_PATH}) -make_library_set(TBB_LIBRARY) - -findpkg_finish(TBB tbb) - -#if we haven't found TBB no point on going any further -if (NOT TBB_FOUND) - return() -endif () - -#============================================================================= -# Look for TBB's malloc package -set(TBB_MALLOC_LIBRARY_NAMES tbbmalloc) -get_debug_names(TBB_MALLOC_LIBRARY_NAMES) - -find_path(TBB_MALLOC_INCLUDE_DIR - NAMES tbb/tbb.h - PATHS ${TBB_INC_SEARCH_PATH}) - -find_library(TBB_MALLOC_LIBRARY_RELEASE - NAMES ${TBB_MALLOC_LIBRARY_NAMES} - PATHS ${TBB_LIB_SEARCH_PATH}) -find_library(TBB_MALLOC_LIBRARY_DEBUG - NAMES ${TBB_MALLOC_LIBRARY_NAMES_DEBUG} - PATHS ${TBB_LIB_SEARCH_PATH}) -make_library_set(TBB_MALLOC_LIBRARY) - -findpkg_finish(TBB_MALLOC tbbmalloc) - -#============================================================================= -# Look for TBB's malloc proxy package -set(TBB_MALLOC_PROXY_LIBRARY_NAMES tbbmalloc_proxy) -get_debug_names(TBB_MALLOC_PROXY_LIBRARY_NAMES) - -find_path(TBB_MALLOC_PROXY_INCLUDE_DIR - NAMES tbb/tbbmalloc_proxy.h - PATHS ${TBB_INC_SEARCH_PATH}) - -find_library(TBB_MALLOC_PROXY_LIBRARY_RELEASE - NAMES ${TBB_MALLOC_PROXY_LIBRARY_NAMES} - PATHS ${TBB_LIB_SEARCH_PATH}) -find_library(TBB_MALLOC_PROXY_LIBRARY_DEBUG - NAMES ${TBB_MALLOC_PROXY_LIBRARY_NAMES_DEBUG} - PATHS ${TBB_LIB_SEARCH_PATH}) -make_library_set(TBB_MALLOC_PROXY_LIBRARY) - -findpkg_finish(TBB_MALLOC_PROXY tbbmalloc_proxy) - - -#============================================================================= -#parse all the version numbers from tbb -if(NOT TBB_VERSION) - - #only read the start of the file - file(STRINGS - "${TBB_INCLUDE_DIR}/tbb/tbb_stddef.h" - TBB_VERSION_CONTENTS - REGEX "VERSION") - - string(REGEX REPLACE - ".*#define TBB_VERSION_MAJOR ([0-9]+).*" "\\1" - TBB_VERSION_MAJOR "${TBB_VERSION_CONTENTS}") - - string(REGEX REPLACE - ".*#define TBB_VERSION_MINOR ([0-9]+).*" "\\1" - TBB_VERSION_MINOR "${TBB_VERSION_CONTENTS}") - - string(REGEX REPLACE - ".*#define TBB_INTERFACE_VERSION ([0-9]+).*" "\\1" - TBB_INTERFACE_VERSION "${TBB_VERSION_CONTENTS}") - - string(REGEX REPLACE - ".*#define TBB_COMPATIBLE_INTERFACE_VERSION ([0-9]+).*" "\\1" - TBB_COMPATIBLE_INTERFACE_VERSION "${TBB_VERSION_CONTENTS}") - -endif() diff --git a/misc/experimental/promise/gtest/src/PromiseTestSuite.cc b/misc/experimental/promise/gtest/src/PromiseTestSuite.cc index dbed789..15722a0 100644 --- a/misc/experimental/promise/gtest/src/PromiseTestSuite.cc +++ b/misc/experimental/promise/gtest/src/PromiseTestSuite.cc @@ -23,12 +23,36 @@ #include <utility> #include "celix/PromiseFactory.h" +#include "celix/DefaultExecutor.h" + +/** + * A special executor which introduces some sleeps to ensure some entropy during testing + */ +class ExecutorWithRandomPrePostSleep : public celix::DefaultExecutor { +public: + ExecutorWithRandomPrePostSleep() : celix::DefaultExecutor{std::launch::async} {} + + void execute(std::function<void()> task, int priority) override { + int roll1 = distribution(generator); //1-100 + int roll2 = distribution(generator); //1-100 + celix::DefaultExecutor::execute([roll1, roll2, task = std::move(task)] { + std::this_thread::sleep_for(std::chrono::milliseconds{roll1}); + task(); + std::this_thread::sleep_for(std::chrono::milliseconds{roll2}); + }, priority); + } + +private: + std::default_random_engine generator{}; + std::uniform_int_distribution<int> distribution{1,100}; +}; class PromiseTestSuite : public ::testing::Test { public: - ~PromiseTestSuite() override = default; + ~PromiseTestSuite() noexcept override = default; - celix::PromiseFactory factory{ tbb::task_arena{5, 1} }; + std::shared_ptr<celix::IExecutor> executor = std::make_shared<ExecutorWithRandomPrePostSleep>(); + std::shared_ptr<celix::PromiseFactory> factory = std::make_shared<celix::PromiseFactory>(executor); }; struct MovableInt { @@ -82,7 +106,7 @@ bool operator==( const char *c, const NonTrivialType &ntt) { #endif TEST_F(PromiseTestSuite, simplePromise) { - auto deferred = factory.deferred<NonTrivialType>(); + auto deferred = factory->deferred<NonTrivialType>(); std::thread t{[&deferred] () { std::this_thread::sleep_for(std::chrono::milliseconds{50}); deferred.resolve("test"); @@ -100,7 +124,7 @@ TEST_F(PromiseTestSuite, simplePromise) { } TEST_F(PromiseTestSuite, failingPromise) { - auto deferred = factory.deferred<long>(); + auto deferred = factory->deferred<long>(); auto cpy = deferred; std::thread t{[&deferred] () { deferred.fail(std::logic_error{"failing"}); @@ -112,7 +136,7 @@ TEST_F(PromiseTestSuite, failingPromise) { } TEST_F(PromiseTestSuite, failingPromiseWithExceptionPtr) { - auto deferred = factory.deferred<long>(); + auto deferred = factory->deferred<long>(); std::thread t{[&deferred]{ try { std::string{}.at(1); // this generates an std::out_of_range @@ -128,37 +152,45 @@ TEST_F(PromiseTestSuite, failingPromiseWithExceptionPtr) { } TEST_F(PromiseTestSuite, onSuccessHandling) { - auto deferred = factory.deferred<long>(); + auto deferred = factory->deferred<long>(); + std::mutex mutex{}; bool called = false; bool resolveCalled = false; auto p = deferred.getPromise() - .onSuccess([&called](long value) { + .onSuccess([&](long value) { + std::lock_guard<std::mutex> lck{mutex}; EXPECT_EQ(42, value); called = true; }) - .onResolve([&resolveCalled]() { + .onResolve([&]() { + std::lock_guard<std::mutex> lck{mutex}; resolveCalled = true; }); deferred.resolve(42); - p.wait(); + + executor->wait(); EXPECT_EQ(true, called); EXPECT_EQ(true, resolveCalled); } TEST_F(PromiseTestSuite, onFailureHandling) { - auto deferred = factory.deferred<long>(); + auto deferred = factory->deferred<long>(); + std::mutex mutex{}; bool successCalled = false; bool failureCalled = false; bool resolveCalled = false; auto p = deferred.getPromise() .onSuccess([&](long /*value*/) { + std::lock_guard<std::mutex> lck{mutex}; successCalled = true; }) .onFailure([&](const std::exception &e) { + std::lock_guard<std::mutex> lck{mutex}; failureCalled = true; std::cout << "got error: " << e.what() << std::endl; }) - .onResolve([&resolveCalled]() { + .onResolve([&]() { + std::lock_guard<std::mutex> lck{mutex}; resolveCalled = true; }); try { @@ -167,20 +199,24 @@ TEST_F(PromiseTestSuite, onFailureHandling) { } catch (...) { deferred.fail(std::current_exception()); } - p.wait(); + + executor->wait(); EXPECT_EQ(false, successCalled); EXPECT_EQ(true, failureCalled); EXPECT_EQ(true, resolveCalled); } TEST_F(PromiseTestSuite, resolveSuccessWith) { - auto deferred1 = factory.deferred<long>(); - auto deferred2 = factory.deferred<long>(); + auto deferred1 = factory->deferred<long>(); + auto deferred2 = factory->deferred<long>(); + std::mutex mutex{}; bool called = false; + deferred1.getPromise() - .onSuccess([&called](long value) { + .onSuccess([&](long value) { EXPECT_EQ(42, value); + std::lock_guard<std::mutex> lck{mutex}; called = true; }); @@ -189,22 +225,23 @@ TEST_F(PromiseTestSuite, resolveSuccessWith) { deferred2.resolveWith(deferred1.getPromise()); auto p = deferred2.getPromise(); deferred1.resolve(42); - p.wait(); + + executor->wait(); EXPECT_EQ(true, called); } TEST_F(PromiseTestSuite, resolveFailureWith) { - auto deferred1 = factory.deferred<long>(); - auto deferred2 = factory.deferred<long>(); + auto deferred1 = factory->deferred<long>(); + auto deferred2 = factory->deferred<long>(); + + std::mutex mutex{}; bool failureCalled = false; - bool successCalled = false; + deferred2.getPromise() - .onSuccess([&](long /*value*/) { - successCalled = true; - }) .onFailure([&](const std::exception &e) { - failureCalled = true; std::cout << "got error: " << e.what() << std::endl; + std::lock_guard<std::mutex> lck{mutex}; + failureCalled = true; }); //currently deferred1 will be resolved in thread, and onSuccess is trigger on the promise of deferred2 @@ -217,13 +254,13 @@ TEST_F(PromiseTestSuite, resolveFailureWith) { } catch (...) { deferred1.fail(std::current_exception()); } - p.wait(); - EXPECT_EQ(false, successCalled); + + executor->wait(); EXPECT_EQ(true, failureCalled); } TEST_F(PromiseTestSuite, resolveWithTimeout) { - auto deferred1 = factory.deferred<long>(); + auto deferred1 = factory->deferred<long>(); std::thread t{[&deferred1]{ std::this_thread::sleep_for(std::chrono::milliseconds{50}); try { @@ -233,24 +270,29 @@ TEST_F(PromiseTestSuite, resolveWithTimeout) { } }}; + std::mutex mutex{}; bool firstSuccessCalled = false; bool secondSuccessCalled = false; bool secondFailedCalled = false; auto p = deferred1.getPromise() - .onSuccess([&firstSuccessCalled](long value) { + .onSuccess([&](long value) { EXPECT_EQ(42, value); + std::lock_guard<std::mutex> lck{mutex}; firstSuccessCalled = true; }) .timeout(std::chrono::milliseconds{10}) - .onSuccess([&secondSuccessCalled](long value) { + .onSuccess([&](long value) { EXPECT_EQ(42, value); + std::lock_guard<std::mutex> lck{mutex}; secondSuccessCalled = true; }) - .onFailure([&secondFailedCalled](const std::exception&) { + .onFailure([&](const std::exception&) { + std::lock_guard<std::mutex> lck{mutex}; secondFailedCalled = true; }); + t.join(); - p.wait(); + executor->wait(); EXPECT_EQ(true, firstSuccessCalled); EXPECT_EQ(false, secondSuccessCalled); EXPECT_EQ(true, secondFailedCalled); @@ -259,56 +301,59 @@ TEST_F(PromiseTestSuite, resolveWithTimeout) { secondSuccessCalled = false; secondFailedCalled = false; auto p2 = deferred1.getPromise() - .onSuccess([&firstSuccessCalled](long value) { + .onSuccess([&](long value) { EXPECT_EQ(42, value); + std::lock_guard<std::mutex> lck{mutex}; firstSuccessCalled = true; }) - .timeout(std::chrono::milliseconds{50}) - .onSuccess([&secondSuccessCalled](long value) { + .timeout(std::chrono::milliseconds{250}) /*NOTE: more than the possible delay introduced by the executor*/ + .onSuccess([&](long value) { EXPECT_EQ(42, value); + std::lock_guard<std::mutex> lck{mutex}; secondSuccessCalled = true; }) - .onFailure([&secondFailedCalled](const std::exception&) { + .onFailure([&](const std::exception&) { + std::lock_guard<std::mutex> lck{mutex}; secondFailedCalled = true; }); - p2.wait(); + executor->wait(); EXPECT_EQ(true, firstSuccessCalled); EXPECT_EQ(true, secondSuccessCalled); EXPECT_EQ(false, secondFailedCalled); } TEST_F(PromiseTestSuite, resolveWithDelay) { - auto deferred1 = factory.deferred<long>(); + auto deferred1 = factory->deferred<long>(); + std::mutex mutex{}; bool successCalled = false; - bool failedCalled = false; auto t1 = std::chrono::system_clock::now(); std::chrono::system_clock::time_point t2; auto p = deferred1.getPromise() .delay(std::chrono::milliseconds{50}) - .onSuccess([&successCalled, &t2](long value) { + .onSuccess([&](long value) { EXPECT_EQ(42, value); - successCalled = true; + std::lock_guard<std::mutex> lck{mutex}; t2 = std::chrono::system_clock::now(); - }) - .onFailure([&failedCalled](const std::exception&) { - failedCalled = true; + successCalled = true; }); deferred1.resolve(42); - p.wait(); + + executor->wait(); EXPECT_EQ(true, successCalled); - EXPECT_EQ(false, failedCalled); auto durationInMs = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1); - EXPECT_GE(durationInMs, std::chrono::milliseconds{10}); + EXPECT_GE(durationInMs, std::chrono::milliseconds{50}); } TEST_F(PromiseTestSuite, resolveWithRecover) { - auto deferred1 = factory.deferred<long>(); + auto deferred1 = factory->deferred<long>(); + std::mutex mutex{}; bool successCalled = false; deferred1.getPromise() .recover([]{ return 42; }) - .onSuccess([&successCalled](long v) { + .onSuccess([&](long v) { EXPECT_EQ(42, v); + std::lock_guard<std::mutex> lck{mutex}; successCalled = true; }); try { @@ -316,11 +361,12 @@ TEST_F(PromiseTestSuite, resolveWithRecover) { } catch (...) { deferred1.fail(std::current_exception()); } + executor->wait(); EXPECT_EQ(true, successCalled); } TEST_F(PromiseTestSuite, chainAndMapResult) { - auto deferred1 = factory.deferred<long>(); + auto deferred1 = factory->deferred<long>(); std::thread t{[&deferred1]{ deferred1.resolve(42); }}; @@ -333,26 +379,32 @@ TEST_F(PromiseTestSuite, chainAndMapResult) { } TEST_F(PromiseTestSuite, chainWithThenAccept) { - auto deferred1 = factory.deferred<long>(); + auto deferred1 = factory->deferred<long>(); + std::mutex mutex{}; bool called = false; deferred1.getPromise() - .thenAccept([&called](long v){ + .thenAccept([&](long v){ EXPECT_EQ(42, v); + std::lock_guard<std::mutex> lck{mutex}; called = true; }); deferred1.resolve(42); + { + std::unique_lock<std::mutex> lck{mutex}; + } + executor->wait(); EXPECT_TRUE(called); } TEST_F(PromiseTestSuite, promiseWithFallbackTo) { - celix::Deferred<long> deferred1 = factory.deferred<long>(); + celix::Deferred<long> deferred1 = factory->deferred<long>(); try { throw std::logic_error("failure"); } catch (...) { deferred1.fail(std::current_exception()); } - auto deferred2 = factory.deferred<long>(); + auto deferred2 = factory->deferred<long>(); deferred2.resolve(42); @@ -361,7 +413,7 @@ TEST_F(PromiseTestSuite, promiseWithFallbackTo) { } TEST_F(PromiseTestSuite, promiseWithPredicate) { - auto deferred1 = factory.deferred<long>(); + auto deferred1 = factory->deferred<long>(); std::thread t1{[&deferred1]{ deferred1.resolve(42); }}; @@ -374,7 +426,7 @@ TEST_F(PromiseTestSuite, promiseWithPredicate) { TEST_F(PromiseTestSuite, outOfScopeUnresolvedPromises) { bool called = false; { - auto deferred1 = factory.deferred<long>(); + auto deferred1 = factory->deferred<long>(); deferred1.getPromise().onResolve([&]{ called = true; }); @@ -384,13 +436,13 @@ TEST_F(PromiseTestSuite, outOfScopeUnresolvedPromises) { } TEST_F(PromiseTestSuite, chainPromises) { - auto success = [](celix::Promise<long> p) -> celix::Promise<long> { + auto success = [&](celix::Promise<long> p) -> celix::Promise<long> { //TODO Promises::resolved(p.getValue() + p.getValue()) - celix::Deferred<long> result; + auto result = factory->deferred<long>(); result.resolve(p.getValue() + p.getValue()); return result.getPromise(); }; - celix::Deferred<long> initial; + auto initial = factory->deferred<long>(); initial.resolve(42); long result = initial.getPromise().then<long>(success).then<long>(success).getValue(); EXPECT_EQ(168, result); @@ -405,25 +457,24 @@ TEST_F(PromiseTestSuite, chainFailedPromises) { auto failed = [&called](const celix::Promise<long>& /*p*/) -> void { called = true; }; - celix::Deferred<long> deferred; + auto deferred = factory->deferred<long>(); deferred.fail(std::logic_error{"fail"}); deferred.getPromise().then<long>(success, failed).wait(); EXPECT_TRUE(called); } TEST_F(PromiseTestSuite, failedResolvedWithPromiseFactory) { - auto factory = celix::PromiseFactory{}; - auto p1 = factory.failed<long>(std::logic_error{"test"}); + auto p1 = factory->failed<long>(std::logic_error{"test"}); EXPECT_TRUE(p1.isDone()); EXPECT_NE(nullptr, p1.getFailure()); - auto p2 = factory.resolved(42); + auto p2 = factory->resolved(42); EXPECT_TRUE(p2.isDone()); EXPECT_EQ(42, p2.getValue()); } TEST_F(PromiseTestSuite, movableStruct) { - auto deferred = factory.deferred<MovableInt>(); + auto deferred = factory->deferred<MovableInt>(); std::thread t{[&deferred] () { std::this_thread::sleep_for(std::chrono::milliseconds{50}); deferred.resolve(42); @@ -435,7 +486,7 @@ TEST_F(PromiseTestSuite, movableStruct) { } TEST_F(PromiseTestSuite, movableStructTemporary) { - auto deferred = factory.deferred<MovableInt>(); + auto deferred = factory->deferred<MovableInt>(); std::thread t{[&deferred] () { std::this_thread::sleep_for(std::chrono::milliseconds{50}); deferred.resolve(42); @@ -445,7 +496,7 @@ TEST_F(PromiseTestSuite, movableStructTemporary) { } TEST_F(PromiseTestSuite, nonMovableStruct) { - auto deferred = factory.deferred<NonMovableInt>(); + auto deferred = factory->deferred<NonMovableInt>(); std::thread t{[&deferred] () { std::this_thread::sleep_for(std::chrono::milliseconds{50}); deferred.resolve(42); @@ -457,7 +508,7 @@ TEST_F(PromiseTestSuite, nonMovableStruct) { } TEST_F(PromiseTestSuite, nonMovableStructTemporary) { - auto deferred = factory.deferred<NonMovableInt>(); + auto deferred = factory->deferred<NonMovableInt>(); std::thread t{[&deferred] () { std::this_thread::sleep_for(std::chrono::milliseconds{50}); deferred.resolve(42); @@ -466,7 +517,18 @@ TEST_F(PromiseTestSuite, nonMovableStructTemporary) { t.join(); } +TEST_F(PromiseTestSuite, deferredTaskCall) { + auto promise = factory->deferredTask<long>([](auto deferred) { + deferred.resolve(42); + }); + EXPECT_EQ(42, promise.getValue()); +} + +TEST_F(PromiseTestSuite, getExecutorFromFactory) { + auto exec = factory->getExecutor(); + EXPECT_EQ(executor.get(), exec.get()); +} + #ifdef __clang__ #pragma clang diagnostic pop #endif - diff --git a/misc/experimental/promise/gtest/src/VoidPromiseTestSuite.cc b/misc/experimental/promise/gtest/src/VoidPromiseTestSuite.cc index 56d1f73..a33c809 100644 --- a/misc/experimental/promise/gtest/src/VoidPromiseTestSuite.cc +++ b/misc/experimental/promise/gtest/src/VoidPromiseTestSuite.cc @@ -25,9 +25,13 @@ class VoidPromiseTestSuite : public ::testing::Test { public: - ~VoidPromiseTestSuite() override = default; + ~VoidPromiseTestSuite() noexcept override { + //TODO improve, see PromiseTestSuite + executor->wait(); + } - celix::PromiseFactory factory{ tbb::task_arena{5, 1} }; + std::shared_ptr<celix::DefaultExecutor> executor = std::make_shared<celix::DefaultExecutor>(); + std::shared_ptr<celix::PromiseFactory> factory = std::make_shared<celix::PromiseFactory>(executor); }; #ifdef __clang__ @@ -36,7 +40,7 @@ public: #endif TEST_F(VoidPromiseTestSuite, simplePromise) { - auto deferred = factory.deferred<void>(); + auto deferred = factory->deferred<void>(); std::thread t{[&deferred] () { std::this_thread::sleep_for(std::chrono::milliseconds{50}); deferred.resolve(); @@ -51,7 +55,7 @@ TEST_F(VoidPromiseTestSuite, simplePromise) { } TEST_F(VoidPromiseTestSuite, failingPromise) { - auto deferred = factory.deferred<void>(); + auto deferred = factory->deferred<void>(); auto cpy = deferred; std::thread t{[&deferred] () { deferred.fail(std::logic_error{"failing"}); @@ -63,7 +67,7 @@ TEST_F(VoidPromiseTestSuite, failingPromise) { } TEST_F(VoidPromiseTestSuite, failingPromiseWithExceptionPtr) { - auto deferred = factory.deferred<void>(); + auto deferred = factory->deferred<void>(); std::thread t{[&deferred]{ try { std::string{}.at(1); // this generates an std::out_of_range @@ -79,7 +83,7 @@ TEST_F(VoidPromiseTestSuite, failingPromiseWithExceptionPtr) { } TEST_F(VoidPromiseTestSuite, onSuccessHandling) { - auto deferred = factory.deferred<void>(); + auto deferred = factory->deferred<void>(); bool called = false; bool resolveCalled = false; auto p = deferred.getPromise() @@ -96,7 +100,7 @@ TEST_F(VoidPromiseTestSuite, onSuccessHandling) { } TEST_F(VoidPromiseTestSuite, onFailureHandling) { - auto deferred = factory.deferred<void>(); + auto deferred = factory->deferred<void>(); bool successCalled = false; bool failureCalled = false; bool resolveCalled = false; @@ -124,8 +128,8 @@ TEST_F(VoidPromiseTestSuite, onFailureHandling) { } TEST_F(VoidPromiseTestSuite, resolveSuccessWith) { - auto deferred1 = factory.deferred<void>(); - auto deferred2 = factory.deferred<void>(); + auto deferred1 = factory->deferred<void>(); + auto deferred2 = factory->deferred<void>(); bool called = false; deferred1.getPromise() @@ -143,15 +147,18 @@ TEST_F(VoidPromiseTestSuite, resolveSuccessWith) { } TEST_F(VoidPromiseTestSuite, resolveFailureWith) { - auto deferred1 = factory.deferred<void>(); - auto deferred2 = factory.deferred<void>(); + auto deferred1 = factory->deferred<void>(); + auto deferred2 = factory->deferred<void>(); + std::mutex mutex{}; bool failureCalled = false; bool successCalled = false; deferred2.getPromise() .onSuccess([&]() { + std::lock_guard<std::mutex> lck{mutex}; successCalled = true; }) .onFailure([&](const std::exception &e) { + std::unique_lock<std::mutex> lck{mutex}; failureCalled = true; std::cout << "got error: " << e.what() << std::endl; }); @@ -166,13 +173,13 @@ TEST_F(VoidPromiseTestSuite, resolveFailureWith) { } catch (...) { deferred1.fail(std::current_exception()); } - p.wait(); + executor->wait(); EXPECT_EQ(false, successCalled); EXPECT_EQ(true, failureCalled); } TEST_F(VoidPromiseTestSuite, resolveWithTimeout) { - auto deferred1 = factory.deferred<void>(); + auto deferred1 = factory->deferred<void>(); std::thread t{[&deferred1]{ std::this_thread::sleep_for(std::chrono::milliseconds{50}); try { @@ -182,22 +189,26 @@ TEST_F(VoidPromiseTestSuite, resolveWithTimeout) { } }}; + std::mutex mutex{}; bool firstSuccessCalled = false; bool secondSuccessCalled = false; bool secondFailedCalled = false; auto p = deferred1.getPromise() - .onSuccess([&firstSuccessCalled]() { + .onSuccess([&]() { + std::lock_guard<std::mutex> lock{mutex}; firstSuccessCalled = true; }) .timeout(std::chrono::milliseconds{10}) - .onSuccess([&secondSuccessCalled]() { + .onSuccess([&]() { + std::lock_guard<std::mutex> lock{mutex}; secondSuccessCalled = true; }) - .onFailure([&secondFailedCalled](const std::exception&) { + .onFailure([&](const std::exception&) { + std::lock_guard<std::mutex> lock{mutex}; secondFailedCalled = true; }); t.join(); - p.wait(); + executor->wait(); EXPECT_EQ(true, firstSuccessCalled); EXPECT_EQ(false, secondSuccessCalled); EXPECT_EQ(true, secondFailedCalled); @@ -223,22 +234,26 @@ TEST_F(VoidPromiseTestSuite, resolveWithTimeout) { } TEST_F(VoidPromiseTestSuite, resolveWithDelay) { - auto deferred1 = factory.deferred<void>(); + auto deferred1 = factory->deferred<void>(); + + std::mutex mutex{}; bool successCalled = false; bool failedCalled = false; auto t1 = std::chrono::system_clock::now(); std::chrono::system_clock::time_point t2; auto p = deferred1.getPromise() .delay(std::chrono::milliseconds{50}) - .onSuccess([&successCalled, &t2]() { + .onSuccess([&]() { + std::lock_guard<std::mutex> lock{mutex}; successCalled = true; t2 = std::chrono::system_clock::now(); }) - .onFailure([&failedCalled](const std::exception&) { + .onFailure([&](const std::exception&) { + std::lock_guard<std::mutex> lock{mutex}; failedCalled = true; }); deferred1.resolve(); - p.wait(); + executor->wait(); EXPECT_EQ(true, successCalled); EXPECT_EQ(false, failedCalled); auto durationInMs = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1); @@ -247,11 +262,13 @@ TEST_F(VoidPromiseTestSuite, resolveWithDelay) { TEST_F(VoidPromiseTestSuite, resolveWithRecover) { - auto deferred1 = factory.deferred<void>(); + auto deferred1 = factory->deferred<void>(); + std::mutex mutex{}; bool successCalled = false; deferred1.getPromise() .recover([]{ return 42; }) - .onSuccess([&successCalled]() { + .onSuccess([&]() { + std::lock_guard<std::mutex> lock{mutex}; successCalled = true; }); try { @@ -259,11 +276,12 @@ TEST_F(VoidPromiseTestSuite, resolveWithRecover) { } catch (...) { deferred1.fail(std::current_exception()); } + executor->wait(); EXPECT_EQ(true, successCalled); } TEST_F(VoidPromiseTestSuite, chainAndMapResult) { - auto deferred1 = factory.deferred<void>(); + auto deferred1 = factory->deferred<void>(); std::thread t{[&deferred1]{ deferred1.resolve(); }}; @@ -272,29 +290,33 @@ TEST_F(VoidPromiseTestSuite, chainAndMapResult) { return 2; }).getValue(); t.join(); + executor->wait(); EXPECT_EQ(2, two); } TEST_F(VoidPromiseTestSuite, chainWithThenAccept) { - auto deferred1 = factory.deferred<void>(); + auto deferred1 = factory->deferred<void>(); + std::mutex mutex{}; bool called = false; deferred1.getPromise() - .thenAccept([&called](){ + .thenAccept([&](){ + std::lock_guard<std::mutex> lock{mutex}; called = true; }); deferred1.resolve(); + executor->wait(); EXPECT_TRUE(called); } TEST_F(VoidPromiseTestSuite, promiseWithFallbackTo) { - auto deferred1 = factory.deferred<void>(); + auto deferred1 = factory->deferred<void>(); try { throw std::logic_error("failure"); } catch (...) { deferred1.fail(std::current_exception()); } - auto deferred2 = factory.deferred<void>(); + auto deferred2 = factory->deferred<void>(); deferred2.resolve(); @@ -303,43 +325,49 @@ TEST_F(VoidPromiseTestSuite, promiseWithFallbackTo) { } TEST_F(VoidPromiseTestSuite, outOfScopeUnresolvedPromises) { + std::mutex mutex{}; bool called = false; { - auto deferred1 = factory.deferred<void>(); + auto deferred1 = factory->deferred<void>(); deferred1.getPromise().onResolve([&]{ + std::lock_guard<std::mutex> lock{mutex}; called = true; }); //promise and deferred out of scope } + executor->wait(); EXPECT_FALSE(called); } TEST_F(VoidPromiseTestSuite, chainPromises) { - auto success = [](celix::Promise<void> p) -> celix::Promise<long> { + auto success = [&](celix::Promise<void> p) -> celix::Promise<long> { //TODO Promises::resolved(p.getValue() + p.getValue()) - celix::Deferred<long> result; + auto result = factory->deferred<long>(); p.getValue(); result.resolve(42); return result.getPromise(); }; - celix::Deferred<void> initial; + auto initial = factory->deferred<void>(); initial.resolve(); long result = initial.getPromise().then<long>(success).getValue(); EXPECT_EQ(42, result); } TEST_F(VoidPromiseTestSuite, chainFailedPromises) { + std::mutex mutex{}; bool called = false; auto success = [](celix::Promise<void> p) -> celix::Promise<void> { //nop return p; }; - auto failed = [&called](const celix::Promise<void>& /*p*/) -> void { + auto failed = [&](const celix::Promise<void>& /*p*/) -> void { + std::lock_guard<std::mutex> lock{mutex}; called = true; }; - celix::Deferred<void> deferred; + auto deferred = factory->deferred<void>(); deferred.fail(std::logic_error{"fail"}); - deferred.getPromise().then<void>(success, failed).wait(); + deferred.getPromise().then<void>(success, failed); + executor->wait(); EXPECT_TRUE(called); } @@ -354,6 +382,18 @@ TEST_F(VoidPromiseTestSuite, failedResolvedWithPromiseFactory) { EXPECT_TRUE(p2.getValue()); } +TEST_F(VoidPromiseTestSuite, deferredTaskCall) { + auto t1 = std::chrono::system_clock::now(); + auto promise = factory->deferredTask<void>([](auto deferred) { + std::this_thread::sleep_for(std::chrono::milliseconds{12}); + deferred.resolve(); + }); + promise.wait(); + auto t2 = std::chrono::system_clock::now(); + auto durationInMs = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1); + EXPECT_GT(durationInMs, std::chrono::milliseconds{10}); +} + #ifdef __clang__ #pragma clang diagnostic pop #endif diff --git a/misc/experimental/promise/src/PromiseExamples.cc b/misc/experimental/promise/src/PromiseExamples.cc index de7d5f7..88c2833 100644 --- a/misc/experimental/promise/src/PromiseExamples.cc +++ b/misc/experimental/promise/src/PromiseExamples.cc @@ -20,7 +20,7 @@ #include <thread> #include <iostream> -#include "celix/Deferred.h" +#include "celix/PromiseFactory.h" static long calc_fib(long n) { long m = 1; @@ -33,39 +33,32 @@ static long calc_fib(long n) { return m; } -celix::Promise<long> fib(long n) { - auto deferred = celix::Deferred<long>{}; - - if (n <= 0) { - deferred.fail(std::logic_error{"argument must be positive"}); - } else if (n < 10 ) { +celix::Promise<long> fib(celix::PromiseFactory& factory, long n) { + return factory.deferredTask<long>([n](auto deferred) { deferred.resolve(calc_fib(n)); - } else { - std::thread t{[deferred, n] () mutable { - deferred.resolve(calc_fib(n)); - }}; - t.detach(); - } - - return deferred.getPromise(); + }); } int main() { - auto p1 = fib(39); - p1.timeout(std::chrono::milliseconds{100}) - .onSuccess([](long val) { - std::cout << "Success p1 : " << val << std::endl; - }) - .onFailure([](const std::exception& e) { - std::cerr << "Failure p1 : " << e.what() << std::endl; - }); - auto p2 = fib(1000000000); - p2.timeout(std::chrono::milliseconds {100}).onSuccess([](long val) { - std::cout << "Success p2 : " << std::to_string(val) << std::endl; - }) - .onFailure([](const std::exception& e) { - std::cerr << "Failure p2 : " << e.what() << std::endl; - }); - p1.wait(); - p2.wait(); + celix::PromiseFactory factory{}; + + fib(factory, 1000000000) + .timeout(std::chrono::milliseconds {100}) + .onSuccess([](long val) { + std::cout << "Success p1 : " << std::to_string(val) << std::endl; + }) + .onFailure([](const std::exception& e) { + std::cerr << "Failure p1 : " << e.what() << std::endl; + }); + + fib(factory, 39) + .timeout(std::chrono::milliseconds{100}) + .onSuccess([](long val) { + std::cout << "Success p2 : " << std::to_string(val) << std::endl; + }) + .onFailure([](const std::exception& e) { + std::cerr << "Failure p2 : " << e.what() << std::endl; + }); + + //NOTE the program can only exit if the executor is done executing all task (even the timeout version). }
