This is an automated email from the ASF dual-hosted git repository. pnoltes pushed a commit to branch feature/promise_timeout_memleak in repository https://gitbox.apache.org/repos/asf/celix.git
commit e5ef9dd8a65e52df5bd221d48d89cb24c5c063d1 Author: Pepijn Noltes <[email protected]> AuthorDate: Mon Sep 6 17:07:09 2021 +0200 Refactors promises to ensure unresolved promises can be handled with a timeout. --- .../src/TestExportImportRemoteServiceFactory.cc | 5 +- libs/promises/CMakeLists.txt | 1 + libs/promises/README.md | 33 +-- libs/promises/api/celix/DefaultScheduledExecutor.h | 12 +- libs/promises/api/celix/Deferred.h | 25 ++- libs/promises/api/celix/IScheduledExecutor.h | 4 +- libs/promises/api/celix/Promise.h | 238 ++++++++++++++++----- libs/promises/api/celix/PromiseFactory.h | 76 ++++--- libs/promises/api/celix/impl/SharedPromiseState.h | 93 +++++--- .../cmake/CelixPromisesConfigVersion.cmake | 2 +- libs/promises/gtest/src/PromisesTestSuite.cc | 10 +- libs/promises/gtest/src/VoidPromisesTestSuite.cc | 15 +- libs/promises/src/PromiseExamples.cc | 4 +- 13 files changed, 357 insertions(+), 161 deletions(-) diff --git a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc index d8a6962..a0b7d9a 100644 --- a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc +++ b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc @@ -90,15 +90,12 @@ public: logHelper.error(msg); deferred.fail(celix::rsa::RemoteServicesException{msg}); } - return deferred.getPromise().timeout(INVOKE_TIMEOUT).onFailure([this, invokeId](const auto& /*exp*/) { + return deferred.getPromise().setTimeout(INVOKE_TIMEOUT).onFailure([this, invokeId](const auto& /*exp*/) { std::lock_guard l{mutex}; - - //TODO fixme -> timeout can leak auto it = deferreds.find(invokeId); if (it != deferreds.end()) { it->second.fail(celix::rsa::RemoteServicesException{"workaround for unresovled promsise with timeout chain memleak"}); } - deferreds.erase(invokeId); }); } diff --git a/libs/promises/CMakeLists.txt b/libs/promises/CMakeLists.txt index 86784e3..ff715d9 100644 --- a/libs/promises/CMakeLists.txt +++ b/libs/promises/CMakeLists.txt @@ -37,6 +37,7 @@ else () endif () if (PROMISES OR PROMISES_STANDALONE) + message("The Celix::Promises implementation is still experiment and the api and behaviour will probably still change.") find_package(Threads) add_library(Promises INTERFACE) diff --git a/libs/promises/README.md b/libs/promises/README.md index 4e3c7bf..3197cec 100644 --- a/libs/promises/README.md +++ b/libs/promises/README.md @@ -18,7 +18,7 @@ NOTE: this implementation is still experiment and the api and behaviour will pro ## Usage ```C++ -//PromiseExample.cc +//src/PromiseExample.cc #include <iostream> #include "celix/PromiseFactory.h" @@ -34,47 +34,56 @@ static long calc_fib(long n) { } celix::Promise<long> fib(celix::PromiseFactory& factory, long n) { - return factory.deferredTask<long>([n](celix::Deferred deferred) { + return factory.deferredTask<long>([n](auto deferred) { deferred.resolve(calc_fib(n)); }); } int main() { celix::PromiseFactory factory{}; - + fib(factory, 1000000000) - .timeout(std::chrono::milliseconds {100}) + .setTimeout(std::chrono::milliseconds {100}) .onSuccess([](long val) { std::cout << "Success promise 1 : " << std::to_string(val) << std::endl; }) .onFailure([](const std::exception& e) { std::cerr << "Failure promise 1 : " << e.what() << std::endl; }); - + fib(factory, 39) - .timeout(std::chrono::milliseconds{100}) + .setTimeout(std::chrono::milliseconds{100}) .onSuccess([](long val) { std::cout << "Success promise 2 : " << std::to_string(val) << std::endl; }) .onFailure([](const std::exception& e) { std::cerr << "Failure promise 2 : " << e.what() << std::endl; }); - + //NOTE the program can only exit if the executor in the PromiseFactory is done executing all tasks. } ``` +```cmake +#CMakeLists.txt +find_package(Celix REQUIRED) +add_executable(PromiseExamples src/PromiseExamples.cc) +target_link_libraries(PromiseExamples PRIVATE Celix::Promises) +``` + ## Differences with OSGi Promises & Java -1. There is no singleton default executor. A PromiseFactory can be constructed argument-less to create a default executor, but this executor is then bound to the lifecycle of the PromiseFactory. If celix::IExecutor is injected in the PromiseFactory, it is up to user to control the complete lifecycle of the executor (e.g. by providing this in a ThreadExecutionModel bundle and ensuring this is started early (and as result stopped late). -1. The default constructor for celix::Deferred has been removed. A celix:Deferred can only be created through a PromiseFactory. This is done because the promise concept is heavily bound with the execution abstraction and thus a execution model. Creating a Deferred without a explicit executor is not desirable. -1. The PromiseFactory also has a deferredTask method. This is a convenient method create a Deferred, execute a task async to resolve the Deferred and return a Promise of the created Deferred in one call. -1. The celix::IExecutor abstraction has a priority argument (and as result also the calls in PromiseFactory, etc). -1. The IExecutor has a added wait() method. This can be used to ensure a executor is done executing the tasks backlog. +1. Promises must always be resolved, otherwise the Celix::Promises library will leak memory. To support this more easily the `Promise::setTimeout` method can be used to set a timeout on the current promise. +2. There is no singleton default executor. A PromiseFactory can be constructed argument-less to create a default executor, but this executor is then bound to the lifecycle of the PromiseFactory. If celix::IExecutor is injected in the PromiseFactory, it is up to user to control the complete lifecycle of the executor (e.g. by providing this in a ThreadExecutionModel bundle and ensuring this is started early (and as result stopped late). +3. The default constructor for celix::Deferred has been removed. A celix:Deferred can only be created through a PromiseFactory. This is done because the promise concept is heavily bound with the execution abstraction and thus a execution model. Creating a Deferred without a explicit executor is not desirable. +4. The PromiseFactory also has a deferredTask method. This is a convenient method create a Deferred, execute a task async to resolve the Deferred and return a Promise of the created Deferred in one call. +5. The celix::IExecutor abstraction has a priority argument (and as result also the calls in PromiseFactory, etc). +6. The IExecutor has a added wait() method. This can be used to ensure a executor is done executing the tasks backlog. ## Open Issues & TODOs +- Documentation not complete - PromiseFactory is not complete yet - The static helper class Promises is not implemented yet (e.g. all/any) - Promise::flatMap not implemented yet diff --git a/libs/promises/api/celix/DefaultScheduledExecutor.h b/libs/promises/api/celix/DefaultScheduledExecutor.h index 5bb02de..d15a57c 100644 --- a/libs/promises/api/celix/DefaultScheduledExecutor.h +++ b/libs/promises/api/celix/DefaultScheduledExecutor.h @@ -52,7 +52,10 @@ namespace celix { void cancel() override { std::lock_guard lock{mutex}; - cancelled = true; + if (!done) { + cancelled = true; + done = true; + } cond.notify_all(); } @@ -108,9 +111,12 @@ namespace celix { void wait() override { bool done = false; while (!done) { - std::lock_guard<std::mutex> lck{mutex}; + std::unique_lock lck{mutex}; removeCompletedFutures(); done = futures.empty(); + if (!done) { + cond.wait_for(lck, std::chrono::milliseconds{1}); + } } } private: @@ -121,6 +127,7 @@ namespace celix { if (it->first->isDone()) { //remove scheduled future it = futures.erase(it); + cond.notify_all(); } else { ++it; } @@ -128,6 +135,7 @@ namespace celix { } std::mutex mutex{}; //protect futures. + std::condition_variable cond{}; std::unordered_map<std::shared_ptr<DefaultDelayedScheduledFuture>, std::future<void>> futures{}; }; } \ No newline at end of file diff --git a/libs/promises/api/celix/Deferred.h b/libs/promises/api/celix/Deferred.h index a23556b..c00a970 100644 --- a/libs/promises/api/celix/Deferred.h +++ b/libs/promises/api/celix/Deferred.h @@ -104,7 +104,7 @@ namespace celix { void resolve(T&& value); void resolve(const T& value); - //NOTE not part of the spec.. update to resolveWith with a return celix::Promise<void> ?? + //TODO update to resolveWith with a return celix::Promise<void> ?? /** * Resolve the Promise associated with this Deferred with the specified Promise. * <p/> @@ -193,6 +193,11 @@ namespace celix { */ void resolve(); + //TODO return Promise<void> + template<typename U> + void resolveWith(celix::Promise<U> with); + + //TODO return Promise<void> void resolveWith(celix::Promise<void> with); private: @@ -250,7 +255,8 @@ void celix::Deferred<T>::resolveWith(celix::Promise<U> with) { }); } -inline void celix::Deferred<void>::resolveWith(celix::Promise<void> with) { +template<typename U> +inline void celix::Deferred<void>::resolveWith(celix::Promise<U> with) { with.onResolve([s = state, with] { if (with.isSuccessfullyResolved()) { with.getValue(); @@ -261,6 +267,21 @@ inline void celix::Deferred<void>::resolveWith(celix::Promise<void> with) { }); } +inline void celix::Deferred<void>::resolveWith(celix::Promise<void> with) { + with.onSuccess([s = state->getSelf()]() { + auto l = s.lock(); + if (l) { + l->resolve(); + } + }); + with.onFailure([s = state->getSelf()](const std::exception& e) { + auto l = s.lock(); + if (l) { + l->fail(e); + } + }); +} + template<typename T> void celix::Deferred<T>::resolve(T&& value) { state->resolve(std::forward<T>(value)); diff --git a/libs/promises/api/celix/IScheduledExecutor.h b/libs/promises/api/celix/IScheduledExecutor.h index e3a9302..89861cc 100644 --- a/libs/promises/api/celix/IScheduledExecutor.h +++ b/libs/promises/api/celix/IScheduledExecutor.h @@ -33,9 +33,9 @@ namespace celix { public: virtual ~IScheduledFuture() noexcept = default; - virtual bool isCancelled() const = 0; + [[nodiscard]] virtual bool isCancelled() const = 0; - virtual bool isDone() const = 0; + [[nodiscard]] virtual bool isDone() const = 0; virtual void cancel() = 0; }; diff --git a/libs/promises/api/celix/Promise.h b/libs/promises/api/celix/Promise.h index c100d54..1ce6b32 100644 --- a/libs/promises/api/celix/Promise.h +++ b/libs/promises/api/celix/Promise.h @@ -24,7 +24,8 @@ namespace celix { /** - * A Promise of a value. + * @brief A Promise of a value. + * * <p> * A Promise represents a future value. It handles the interactions for * asynchronous processing. A {@link Deferred} object can be used to create a @@ -39,15 +40,8 @@ namespace celix { * {@link #then(Success, Failure) chaining} can be repeated any number of times, * even after the Promise has been resolved. * <p> - * Example callback usage: - * - * <pre> - * celix::Promise<std::string> foo{}; - * foo.onResolve([]{ std::cout << "resolved" << std::endl; }); - * </pre> * - * - * @tparam <T> The value type associated with this Promise. + * @tparam T The value type associated with this Promise. * @ThreadSafe */ template<typename T> @@ -57,13 +51,8 @@ namespace celix { explicit Promise(std::shared_ptr<celix::impl::SharedPromiseState<T>> s); -// ~Promise() { -// //TODO maybe make a special detach call to state if the count is 1 -// //state->detachIfNeeded(state); //create a callback with ref to self if share_ptr count is 1 -// } - /** - * Returns whether this Promise has been resolved. + * @brief Returns whether this Promise has been resolved. * * <p> * This Promise may be successfully resolved or resolved with a failure. @@ -73,11 +62,10 @@ namespace celix { */ [[nodiscard]] bool isDone() const; - // /** - * Returns whether this Promise has been resolved and whether it resolved successfully. - * NOTE although not part of the OSGi spec, IMO this is clearer than (isDone() && !getFailure()) + * @brief Returns whether this Promise has been resolved and whether it resolved successfully. * + * NOTE this function is not part of the OSGi spec. * * @return {@code true} if this Promise was resolved successfully. * {@code false} if this Promise is unresolved or resolved with a failure. @@ -85,7 +73,7 @@ namespace celix { [[nodiscard]] bool isSuccessfullyResolved() const; /** - * Returns the failure of this Promise. + * @brief Returns the failure of this Promise. * * <p> * If this Promise is not {@link #isDone() resolved}, this method must block @@ -104,7 +92,7 @@ namespace celix { [[nodiscard]] std::exception_ptr getFailure() const; /** - * Returns the value of this Promise. + * @brief Returns the reference of the value of this Promise. * * <p> * If this Promise is not {@link #isDone() resolved}, this method must block @@ -124,12 +112,57 @@ namespace celix { * waiting. */ T& getValue(); + + /** + * @brief Returns the const reference of the value of this Promise. + * + * <p> + * If this Promise is not {@link #isDone() resolved}, this method must block + * and wait for this Promise to be resolved before completing. + * + * <p> + * If this Promise was successfully resolved, this method returns with the + * value of this Promise. If this Promise was resolved with a failure, this + * method must throw an {@code InvocationTargetException} with the + * {@link #getFailure() failure exception} as the cause. + * + * @return The value of this resolved Promise. + * @throws InvocationTargetException If this Promise was resolved with a + * failure. The cause of the {@code InvocationTargetException} is + * the failure exception. + * @throws InterruptedException If the current thread was interrupted while + * waiting. + */ const T& getValue() const; + /** + * @brief Returns the (moved) value of this Promise. + * + * NOTE this function is not part of the OSGi spec. + * + * If T has a move constructor, the value is moved from the Promise. Otherwise a copy of the value is returned. + * + * <p> + * If this Promise is not {@link #isDone() resolved}, this method must block + * and wait for this Promise to be resolved before completing. + * + * <p> + * If this Promise was successfully resolved, this method returns with the + * value of this Promise. If this Promise was resolved with a failure, this + * method must throw an {@code InvocationTargetException} with the + * {@link #getFailure() failure exception} as the cause. + * + * @return The value of this resolved Promise. + * @throws InvocationTargetException If this Promise was resolved with a + * failure. The cause of the {@code InvocationTargetException} is + * the failure exception. + * @throws InterruptedException If the current thread was interrupted while + * waiting. + */ [[nodiscard]] T moveOrGetValue(); /** - * Wait till the promise is resolved. + * @brief Wait till the promise is resolved. * * <p> * If this Promise is not {@link #isDone() resolved}, this method must block @@ -139,9 +172,11 @@ namespace celix { void wait() const; //NOTE not part of the OSGI promise, wait till resolved (used in testing) /** - * Register a callback to be called with the result of this Promise when - * this Promise is resolved successfully. The callback will not be called if - * this Promise is resolved with a failure. + * @brief Register a callback to be called with the result of this Promise when + * this Promise is resolved successfully. + * + * The callback will not be called if this Promise is resolved with a failure. + * * <p> * This method may be called at any time including before and after this * Promise has been resolved. @@ -163,9 +198,11 @@ namespace celix { Promise<T>& onSuccess(std::function<void(T)> success); /** - * Register a callback to be called with the failure for this Promise when - * this Promise is resolved with a failure. The callback will not be called - * if this Promise is resolved successfully. + * @brief Register a callback to be called with the failure for this Promise when + * this Promise is resolved with a failure. + * + * The callback will not be called if this Promise is resolved successfully. + * * <p> * This method may be called at any time including before and after this * Promise has been resolved. @@ -187,7 +224,8 @@ namespace celix { Promise<T>& onFailure(std::function<void(const std::exception&)> failure); /** - * Register a callback to be called when this Promise is resolved. + * @brief Register a callback to be called when this Promise is resolved. + * * <p/> * The specified callback is called when this Promise is resolved either successfully or with a failure. * <p/> @@ -207,7 +245,8 @@ namespace celix { /** - * Recover from a failure of this Promise with a recovery value. + * @brief Recover from a failure of this Promise with a recovery value. + * * <p/> * If this Promise is successfully resolved, the returned Promise will be resolved with the value of this Promise. * <p/> @@ -228,15 +267,23 @@ namespace celix { */ [[nodiscard]] Promise<T> recover(std::function<T()> recover); - /** - * Chain a new Promise to this Promise with a consumer callback that receives the value of this Promise when it is successfully resolved. + * @brief Chain a new Promise to this Promise with a consumer callback that receives the value of this + * Promise when it is successfully resolved. + * * The specified Consumer is called when this Promise is resolved successfully. * - * This method returns a new Promise which is chained to this Promise. The returned Promise must be resolved when this Promise is resolved after the specified callback is executed. If the callback throws an exception, the returned Promise is failed with that exception. Otherwise the returned Promise is resolved with the success value from this Promise. + * This method returns a new Promise which is chained to this Promise. The returned Promise must be resolved + * when this Promise is resolved after the specified callback is executed. If the callback throws an exception, + * the returned Promise is failed with that exception. Otherwise the returned Promise is resolved with the + * success value from this Promise. + * * This method may be called at any time including before and after this Promise has been resolved. - * Resolving this Promise happens-before any registered callback is called. That is, in a registered callback, isDone() must return true and getValue() and getFailure() must not block. - * A callback may be called on a different thread than the thread which registered the callback. So the callback must be thread safe but can rely upon that the registration of the callback happens-before the registered callback is called. + * Resolving this Promise happens-before any registered callback is called. That is, in a registered callback, + * isDone() must return true and getValue() and getFailure() must not block. + * A callback may be called on a different thread than the thread which registered the callback. + * So the callback must be thread safe but can rely upon that the registration of the callback + * happens-before the registered callback is called. * * @param the consumer callback * @returns A new Promise which is chained to this Promise. The returned Promise must be resolved when this Promise is resolved after the specified Consumer is executed. @@ -244,13 +291,16 @@ namespace celix { [[nodiscard]] Promise<T> thenAccept(std::function<void(T)> consumer); /** - * Fall back to the value of the specified Promise if this Promise fails. + * @brief Fall back to the value of the specified Promise if this Promise fails. + * * <p/> - * If this Promise is successfully resolved, the returned Promise will be resolved with the value of this Promise. + * If this Promise is successfully resolved, the returned Promise will be resolved with the value of this + * Promise. * <p/> - * If this Promise is resolved with a failure, the successful result of the specified Promise is used to resolve the - * returned Promise. If the specified Promise is resolved with a failure, the returned Promise will be failed with - * the failure of this Promise rather than the failure of the specified Promise. + * + * If this Promise is resolved with a failure, the successful result of the specified Promise is used to + * resolve the returned Promise. If the specified Promise is resolved with a failure, the returned Promise + * will be failed with the failure of this Promise rather than the failure of the specified Promise. * <p/> * This method may be called at any time including before and after this Promise has been resolved. * @@ -261,12 +311,14 @@ namespace celix { [[nodiscard]] Promise<T> fallbackTo(celix::Promise<T> fallback); /** - * Map the value of this Promise. + * @brief Map the value of this Promise. + * * <p/> * If this Promise is successfully resolved, the returned Promise will be resolved with the value of specified * Function as applied to the value of this Promise. If the specified Function throws an exception, the returned * Promise will be failed with the exception. * <p/> + * * If this Promise is resolved with a failure, the returned Promise will be failed with that failure. * <p/> * This method may be called at any time including before and after this Promise has been resolved. @@ -280,15 +332,18 @@ namespace celix { [[nodiscard]] celix::Promise<R> map(std::function<R(T)> mapper); /** - * Filter the value of this Promise. + * @brief Filter the value of this Promise. + * * <p/> - * If this Promise is successfully resolved, the returned Promise will either be resolved with the value of this - * Promise if the specified Predicate accepts that value or failed with a NoSuchElementException if the specified - * Predicate does not accept that value. If the specified Predicate throws an exception, the returned Promise will - * be failed with the exception. + * If this Promise is successfully resolved, the returned Promise will either be resolved with the value + * of this Promise if the specified Predicate accepts that value or failed with a NoSuchElementException + * if the specified Predicate does not accept that value. If the specified Predicate throws an exception, + * the returned Promise will be failed with the exception. + * * <p/> * If this Promise is resolved with a failure, the returned Promise will be failed with that failure. * <p/> + * * This method may be called at any time including before and after this Promise has been resolved. * * @param predicate The Predicate to evaluate the value of this Promise. @@ -297,7 +352,8 @@ namespace celix { [[nodiscard]] Promise<T> filter(std::function<bool(T)> predicate); /** - * Time out the resolution of this Promise. + * @brief Time out the resolution of this Promise. + * * <p> * If this Promise is successfully resolved before the timeout, the returned * Promise is resolved with the value of this Promise. If this Promise is @@ -315,7 +371,25 @@ namespace celix { [[nodiscard]] Promise<T> timeout(std::chrono::duration<Rep, Period> duration); /** - * Delay after the resolution of this Promise. + * @brief set timeout for this promise. + * + * Fails this Promise if it not successfully resolved before the timeout. + * + * @note Note that the Promise::setTimeout is different from Promise::timeout, because + * Promise::setTimeout updates the current promise instead of returning a new promise with a timeout. + * + * @note Promise::setTimeout is not part of the OSGi Promises specification. + * + * @param duration The time to wait. Zero and negative + * time is treated as an immediate timeout. + * @return The current promise. + */ + template<typename Rep, typename Period> + Promise<T>& setTimeout(std::chrono::duration<Rep, Period> duration); + + /** + * @brief Delay after the resolution of this Promise. + * * <p> * Once this Promise is resolved, resolve the returned Promise with this * Promise after the specified delay. @@ -329,13 +403,16 @@ namespace celix { [[nodiscard]] Promise<T> delay(std::chrono::duration<Rep, Period> duration); /** - * FlatMap the value of this Promise. + * @brief FlatMap the value of this Promise. + * * <p/> * If this Promise is successfully resolved, the returned Promise will be resolved with the Promise from the - * specified Function as applied to the value of this Promise. If the specified Function throws an exception, the - * returned Promise will be failed with the exception. + * specified Function as applied to the value of this Promise. If the specified Function throws an exception, + * the returned Promise will be failed with the exception. + * * <p/> * If this Promise is resolved with a failure, the returned Promise will be failed with that failure. + * * <p/> * This method may be called at any time including before and after this Promise has been resolved. * @@ -349,24 +426,30 @@ namespace celix { // [[nodiscard]] celix::Promise<R> flatMap(std::function<celix::Promise<R>(T)> mapper); /** - * Chain a new Promise to this Promise with success and failure callbacks. + * @brief Chain a new Promise to this Promise with success and failure callbacks. + * * <p/> - * The specified success callback is called when this Promise is successfully resolved and the specified failure - * callback is called when this Promise is resolved with a failure. + * The specified success callback is called when this Promise is successfully resolved and the specified + * failure callback is called when this Promise is resolved with a failure. + * * <p/> * This method returns a new Promise which is chained to this Promise. The returned Promise must be resolved when * this Promise is resolved after the specified success or failure callback is executed. The result of the executed * callback must be used to resolve the returned Promise. Multiple calls to this method can be used to create a * chain of promises which are resolved in sequence. + * * <p/> * If this Promise is successfully resolved, the success callback is executed and the result Promise, if any, or * thrown exception is used to resolve the returned Promise from this method. If this Promise is resolved with a * failure, the failure callback is executed and the returned Promise from this method is failed. + * * <p/> * This method may be called at any time including before and after this Promise has been resolved. + * * <p/> * Resolving this Promise happens-before any registered callback is called. That is, in a registered callback, * isDone() must return true and getValue() and getFailure() must not block. + * * <p/> * A callback may be called on a different thread than the thread which registered the callback. So the callback * must be thread safe but can rely upon that the registration of the callback happens-before the registered @@ -384,14 +467,14 @@ namespace celix { [[nodiscard]] celix::Promise<U> then(std::function<celix::Promise<U>(celix::Promise<T>)> success, std::function<void(celix::Promise<T>)> failure = {}); /** - * Convenience operator calling getValue() + * @brief Convenience operator calling getValue() */ constexpr const T& operator*() const { return this->getValue(); } /** - * Convenience operator calling getValue() + * @brief Convenience operator calling getValue() */ constexpr T& operator*() @@ -402,6 +485,27 @@ namespace celix { friend class Promise<void>; }; + + /** + * @brief A Promise specification for void. + * + * <p> + * A Promise represents a future value. It handles the interactions for + * asynchronous processing. A {@link Deferred} object can be used to create a + * Promise and later resolve the Promise. A Promise is used by the caller of an + * asynchronous function to get the result or handle the error. The caller can + * either get a callback when the Promise is resolved with a value or an error, + * or the Promise can be used in chaining. In chaining, callbacks are provided + * that receive the resolved Promise, and a new Promise is generated that + * resolves based upon the result of a callback. + * <p> + * Both {@link #onResolve(Runnable) callbacks} and + * {@link #then(Success, Failure) chaining} can be repeated any number of times, + * even after the Promise has been resolved. + * <p> + * + * @ThreadSafe + */ template<> class Promise<void> { public: @@ -438,6 +542,9 @@ namespace celix { [[nodiscard]] Promise<void> timeout(std::chrono::duration<Rep, Period> duration); template<typename Rep, typename Period> + Promise<void>& setTimeout(std::chrono::duration<Rep, Period> duration); + + template<typename Rep, typename Period> [[nodiscard]] Promise<void> delay(std::chrono::duration<Rep, Period> duration); template<typename U> @@ -541,12 +648,25 @@ inline celix::Promise<void>& celix::Promise<void>::onResolve(std::function<void( template<typename T> template<typename Rep, typename Period> inline celix::Promise<T> celix::Promise<T>::timeout(std::chrono::duration<Rep, Period> duration) { - return celix::Promise<T>{celix::impl::SharedPromiseState<T>::timeout(state, duration)}; + return celix::Promise<T>{state->timeout(duration)}; } template<typename Rep, typename Period> inline celix::Promise<void> celix::Promise<void>::timeout(std::chrono::duration<Rep, Period> duration) { - return celix::Promise<void>{celix::impl::SharedPromiseState<void>::timeout(state, duration)}; + return celix::Promise<void>{state->timeout(duration)}; +} + +template<typename T> +template<typename Rep, typename Period> +inline celix::Promise<T>& celix::Promise<T>::setTimeout(std::chrono::duration<Rep, Period> duration) { + state->template setTimeout(duration); + return *this; +} + +template<typename Rep, typename Period> +inline celix::Promise<void>& celix::Promise<void>::setTimeout(std::chrono::duration<Rep, Period> duration) { + state->template setTimeout(duration); + return *this; } template<typename T> @@ -623,7 +743,7 @@ inline celix::Promise<U> celix::Promise<T>::then(std::function<celix::Promise<U> if (s->isSuccessfullyResolved()) { try { auto tmpPromise = success(celix::Promise<T>{s}); - p->resolveWith(tmpPromise.state); + p->resolveWith(*tmpPromise.state); } catch (...) { //failure(); TODO not sure if this needs to be called p->fail(std::current_exception()); @@ -648,7 +768,7 @@ inline celix::Promise<U> celix::Promise<void>::then(std::function<celix::Promise if (s->isSuccessfullyResolved()) { try { auto tmpPromise = success(celix::Promise<void>{s}); - p->resolveWith(tmpPromise.state); + p->resolveWith(*tmpPromise.state); } catch (...) { //failure(); TODO not sure if this needs to be called p->fail(std::current_exception()); diff --git a/libs/promises/api/celix/PromiseFactory.h b/libs/promises/api/celix/PromiseFactory.h index 97ce134..934db9e 100644 --- a/libs/promises/api/celix/PromiseFactory.h +++ b/libs/promises/api/celix/PromiseFactory.h @@ -30,9 +30,8 @@ namespace celix { //TODO documentation class PromiseFactory { public: - PromiseFactory(); explicit PromiseFactory( - std::shared_ptr<celix::IExecutor> _executor, + std::shared_ptr<celix::IExecutor> _executor = std::make_shared<celix::DefaultExecutor>(), std::shared_ptr<celix::IScheduledExecutor> _scheduledExecutor = std::make_shared<celix::DefaultScheduledExecutor>()); ~PromiseFactory() noexcept; @@ -43,31 +42,30 @@ namespace celix { PromiseFactory& operator=(const PromiseFactory&) = default; template<typename T> - [[nodiscard]] celix::Deferred<T> deferred(int priority = 0) const; + [[nodiscard]] celix::Deferred<T> deferred(int priority = 0); template<typename T> - [[nodiscard]] celix::Promise<T> deferredTask(std::function<void(celix::Deferred<T>)> task, int priority = 0) const; + [[nodiscard]] celix::Promise<T> deferredTask(std::function<void(celix::Deferred<T>)> task, int priority = 0); template<typename T> - [[nodiscard]] celix::Promise<T> failed(const std::exception& e, int priority = 0) const; + [[nodiscard]] celix::Promise<T> failed(const std::exception& e, int priority = 0); template<typename T> - [[nodiscard]] celix::Promise<T> failed(std::exception_ptr ptr, int priority = 0) const; + [[nodiscard]] celix::Promise<T> failed(std::exception_ptr ptr, int priority = 0); template<typename T> - [[nodiscard]] celix::Promise<T> resolved(T&& value) const; + [[nodiscard]] celix::Promise<T> resolved(T&& value); template<typename T> - [[nodiscard]] celix::Promise<T> resolvedWithPrio(T&& value, int priority) const; + [[nodiscard]] celix::Promise<T> resolvedWithPrio(T&& value, int priority); - [[nodiscard]] celix::Promise<void> resolved() const; + [[nodiscard]] celix::Promise<void> resolved(); - [[nodiscard]] celix::Promise<void> resolvedWithPrio(int priority) const; + [[nodiscard]] celix::Promise<void> resolvedWithPrio(int priority); [[nodiscard]] std::shared_ptr<celix::IExecutor> getExecutor() const; - //TODO - //[[nodiscard]] std::shared_ptr<celix::IScheduledExecutor> getScheduledExecutor() const; + [[nodiscard]] std::shared_ptr<celix::IScheduledExecutor> getScheduledExecutor() const; /** * @brief Wait (block) until all tasks for the executor and scheduled executor are completed @@ -84,11 +82,6 @@ namespace celix { Implementation *********************************************************************************/ -inline celix::PromiseFactory::PromiseFactory() : - executor{std::make_shared<celix::DefaultExecutor>()}, - scheduledExecutor{std::make_shared<celix::DefaultScheduledExecutor>()} {} - - inline celix::PromiseFactory::PromiseFactory( std::shared_ptr<celix::IExecutor> _executor, std::shared_ptr<celix::IScheduledExecutor> _scheduledExecutor) : @@ -97,16 +90,17 @@ inline celix::PromiseFactory::PromiseFactory( inline celix::PromiseFactory::~PromiseFactory() noexcept { //ensure that the executors tasks are empty before allowing the to be deallocated. - wait(); + wait(); //TODO wait or directly fail promises? } template<typename T> -celix::Deferred<T> celix::PromiseFactory::deferred(int priority) const { - return celix::Deferred<T>{celix::impl::SharedPromiseState<T>::create(executor, scheduledExecutor, priority)}; +celix::Deferred<T> celix::PromiseFactory::deferred(int priority) { + auto state = celix::impl::SharedPromiseState<T>::create(executor, scheduledExecutor, priority); + return celix::Deferred<T>{state}; } template<typename T> -[[nodiscard]] celix::Promise<T> celix::PromiseFactory::deferredTask(std::function<void(celix::Deferred<T>)> task, int priority) const { +[[nodiscard]] celix::Promise<T> celix::PromiseFactory::deferredTask(std::function<void(celix::Deferred<T>)> task, int priority) { auto def = deferred<T>(priority); executor->execute(priority, [def, task=std::move(task)]{ task(def); @@ -115,45 +109,49 @@ template<typename T> } template<typename T> -celix::Promise<T> celix::PromiseFactory::failed(const std::exception &e, int priority) const { - auto p = celix::impl::SharedPromiseState<T>::create(executor, scheduledExecutor, priority); - p->fail(e); - return celix::Promise<T>{p}; +celix::Promise<T> celix::PromiseFactory::failed(const std::exception &e, int priority) { + auto def = deferred<T>(priority); + def.fail(e); + return def.getPromise(); } template<typename T> -celix::Promise<T> celix::PromiseFactory::failed(std::exception_ptr ptr, int priority) const { - auto p = celix::impl::SharedPromiseState<T>::create(executor, priority); - p->fail(ptr); - return celix::Promise<T>{p}; +celix::Promise<T> celix::PromiseFactory::failed(std::exception_ptr ptr, int priority) { + auto def = deferred<T>(priority); + def.fail(ptr); + return def.getPromise(); } template<typename T> -celix::Promise<T> celix::PromiseFactory::resolved(T &&value) const { +celix::Promise<T> celix::PromiseFactory::resolved(T &&value) { return resolvedWithPrio<T>(std::forward<T>(value), 0); } template<typename T> -celix::Promise<T> celix::PromiseFactory::resolvedWithPrio(T &&value, int priority) const { - auto p = celix::impl::SharedPromiseState<T>::create(executor, scheduledExecutor, priority); - p->resolve(std::forward<T>(value)); - return celix::Promise<T>{p}; +celix::Promise<T> celix::PromiseFactory::resolvedWithPrio(T &&value, int priority) { + auto def = deferred<T>(priority); + def.resolve(std::forward<T>(value)); + return def.getPromise(); } -inline celix::Promise<void> celix::PromiseFactory::resolved() const { +inline celix::Promise<void> celix::PromiseFactory::resolved() { return resolvedWithPrio(0); } -inline celix::Promise<void> celix::PromiseFactory::resolvedWithPrio(int priority) const { - auto p = celix::impl::SharedPromiseState<void>::create(executor, scheduledExecutor, priority); - p->resolve(); - return celix::Promise<void>{p}; +inline celix::Promise<void> celix::PromiseFactory::resolvedWithPrio(int priority) { + auto def = deferred<void>(priority); + def.resolve(); + return def.getPromise(); } inline std::shared_ptr<celix::IExecutor> celix::PromiseFactory::getExecutor() const { return executor; } +inline std::shared_ptr<celix::IScheduledExecutor> celix::PromiseFactory::getScheduledExecutor() const { + return scheduledExecutor; +} + inline void celix::PromiseFactory::wait() { scheduledExecutor->wait(); executor->wait(); diff --git a/libs/promises/api/celix/impl/SharedPromiseState.h b/libs/promises/api/celix/impl/SharedPromiseState.h index 370cdee..b9edd3c 100644 --- a/libs/promises/api/celix/impl/SharedPromiseState.h +++ b/libs/promises/api/celix/impl/SharedPromiseState.h @@ -51,7 +51,7 @@ namespace celix::impl { void resolve(const T& value); template<typename U> - void resolveWith(std::shared_ptr<SharedPromiseState<U>> with); + void resolveWith(SharedPromiseState<U>& with); void fail(std::exception_ptr e); @@ -93,16 +93,16 @@ namespace celix::impl { [[nodiscard]] std::shared_ptr<SharedPromiseState<T>> fallbackTo(std::shared_ptr<SharedPromiseState<T>> fallbackTo); - void resolveWith(std::shared_ptr<SharedPromiseState<T>> with); - template<typename R> [[nodiscard]] std::shared_ptr<SharedPromiseState<R>> map(std::function<R(T)> mapper); [[nodiscard]] std::shared_ptr<SharedPromiseState<T>> thenAccept(std::function<void(T)> consumer); template<typename Rep, typename Period> - [[nodiscard]] static std::shared_ptr<SharedPromiseState<T>> - timeout(std::shared_ptr<SharedPromiseState<T>> state, std::chrono::duration<Rep, Period> duration); + [[nodiscard]] std::shared_ptr<SharedPromiseState<T>> timeout(std::chrono::duration<Rep, Period> duration); + + template<typename Rep, typename Period> + std::shared_ptr<SharedPromiseState<T>> setTimeout(std::chrono::duration<Rep, Period> duration); void addChain(std::function<void()> chainFunction); @@ -111,6 +111,8 @@ namespace celix::impl { [[nodiscard]] std::shared_ptr<celix::IScheduledExecutor> getScheduledExecutor() const; int getPriority() const; + + [[nodiscard]] std::weak_ptr<SharedPromiseState<T>> getSelf() const; private: explicit SharedPromiseState(std::shared_ptr<celix::IExecutor> _executor, std::shared_ptr<celix::IScheduledExecutor> _scheduledExecutor, int _priority); @@ -146,7 +148,7 @@ namespace celix::impl { public: static std::shared_ptr<SharedPromiseState<void>> create(std::shared_ptr<celix::IExecutor> _executor, std::shared_ptr<celix::IScheduledExecutor> _scheduledExecutor, int priority); - ~SharedPromiseState() noexcept = default; + virtual ~SharedPromiseState() noexcept = default; void resolve(); @@ -163,7 +165,7 @@ namespace celix::impl { void wait() const; - bool isDone() const; + [[nodiscard]] bool isDone() const; bool isSuccessfullyResolved() const; @@ -180,7 +182,8 @@ namespace celix::impl { std::shared_ptr<SharedPromiseState<void>> fallbackTo(std::shared_ptr<SharedPromiseState<void>> fallbackTo); - void resolveWith(std::shared_ptr<SharedPromiseState<void>> with); + template<typename U> + void resolveWith(SharedPromiseState<U>& with); template<typename R> std::shared_ptr<SharedPromiseState<R>> map(std::function<R(void)> mapper); @@ -188,8 +191,10 @@ namespace celix::impl { std::shared_ptr<SharedPromiseState<void>> thenAccept(std::function<void()> consumer); template<typename Rep, typename Period> - static std::shared_ptr<SharedPromiseState<void>> - timeout(std::shared_ptr<SharedPromiseState<void>> state, std::chrono::duration<Rep, Period> duration); + [[nodiscard]] std::shared_ptr<SharedPromiseState<void>> timeout(std::chrono::duration<Rep, Period> duration); + + template<typename Rep, typename Period> + std::shared_ptr<SharedPromiseState<void>> setTimeout(std::chrono::duration<Rep, Period> duration); void addChain(std::function<void()> chainFunction); @@ -198,6 +203,8 @@ namespace celix::impl { [[nodiscard]] std::shared_ptr<celix::IScheduledExecutor> getScheduledExecutor() const; int getPriority() const; + + [[nodiscard]] std::weak_ptr<SharedPromiseState<void>> getSelf() const; private: explicit SharedPromiseState(std::shared_ptr<celix::IExecutor> _executor, std::shared_ptr<celix::IScheduledExecutor> _scheduledExecutor, int _priority); @@ -260,6 +267,15 @@ inline void celix::impl::SharedPromiseState<void>::setSelf(std::weak_ptr<SharedP } template<typename T> +std::weak_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::getSelf() const { + return self; +} + +inline std::weak_ptr<celix::impl::SharedPromiseState<void>> celix::impl::SharedPromiseState<void>::getSelf() const { + return self; +} + +template<typename T> void celix::impl::SharedPromiseState<T>::resolve(T&& value) { std::unique_lock<std::mutex> lck{mutex}; if (done) { @@ -525,8 +541,9 @@ inline std::exception_ptr celix::impl::SharedPromiseState<void>::getFailure() co } template<typename T> -void celix::impl::SharedPromiseState<T>::resolveWith(std::shared_ptr<SharedPromiseState<T>> with) { - with->addOnResolve([s = self.lock()](std::optional<T> v, std::exception_ptr e) { +template<typename U> +void celix::impl::SharedPromiseState<T>::resolveWith(SharedPromiseState<U>& with) { + with.addOnResolve([s = self.lock()](std::optional<U> v, std::exception_ptr e) { if (v) { s->tryResolve(std::move(*v)); } else { @@ -535,11 +552,12 @@ void celix::impl::SharedPromiseState<T>::resolveWith(std::shared_ptr<SharedPromi }); } -inline void celix::impl::SharedPromiseState<void>::resolveWith(std::shared_ptr<SharedPromiseState<void>> with) { - with->addOnResolve([s = self.lock()](std::optional<std::exception_ptr> e) { +template<typename U> +inline void celix::impl::SharedPromiseState<void>::resolveWith(SharedPromiseState<U>& with) { + with.addOnResolve([s = self.lock()](std::optional<std::exception_ptr> e) { if (!e) { s->tryResolve(); - } else { + } else if (s) { s->tryFail(std::move(*e)); } }); @@ -547,29 +565,42 @@ inline void celix::impl::SharedPromiseState<void>::resolveWith(std::shared_ptr<S template<typename T> template<typename Rep, typename Period> -std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::timeout(std::shared_ptr<SharedPromiseState<T>> state, std::chrono::duration<Rep, Period> duration) { - auto p = celix::impl::SharedPromiseState<T>::create(state->executor, state->scheduledExecutor, state->priority); - p->resolveWith(state); - auto schedFuture = p->scheduledExecutor->schedule(p->priority, duration, [p]{ - p->tryFail(std::make_exception_ptr(celix::PromiseTimeoutException{})); +std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::timeout(std::chrono::duration<Rep, Period> duration) { + auto promise = celix::impl::SharedPromiseState<T>::create(executor, scheduledExecutor, priority); + promise->resolveWith(*this); + promise->setTimeout(duration); + return promise; +} + +template<typename Rep, typename Period> +std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::SharedPromiseState<void>::timeout(std::chrono::duration<Rep, Period> duration) { + auto promise = celix::impl::SharedPromiseState<void>::create(executor, scheduledExecutor, priority); + promise->resolveWith(*this); + promise->setTimeout(duration); + return promise; +} + +template<typename T> +template<typename Rep, typename Period> +std::shared_ptr<celix::impl::SharedPromiseState<T>> celix::impl::SharedPromiseState<T>::setTimeout(std::chrono::duration<Rep, Period> duration) { + auto schedFuture = scheduledExecutor->schedule(priority, duration, [s = self.lock()]{ + s->tryFail(std::make_exception_ptr(celix::PromiseTimeoutException{})); }); - p->addOnSuccessConsumeCallback([schedFuture](T /*val*/){ - schedFuture->cancel(); + addChain([sf = std::move(schedFuture)] { + sf->cancel(); }); - return p; + return self.lock(); } template<typename Rep, typename Period> -std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::SharedPromiseState<void>::timeout(std::shared_ptr<SharedPromiseState<void>> state, std::chrono::duration<Rep, Period> duration) { - auto p = celix::impl::SharedPromiseState<void>::create(state->executor, state->scheduledExecutor, state->priority); - p->resolveWith(state); - auto schedFuture = p->scheduledExecutor->schedule(p->priority, duration, [p]{ - p->tryFail(std::make_exception_ptr(celix::PromiseTimeoutException{})); +std::shared_ptr<celix::impl::SharedPromiseState<void>> celix::impl::SharedPromiseState<void>::setTimeout(std::chrono::duration<Rep, Period> duration) { + auto schedFuture = scheduledExecutor->schedule(priority, duration, [s = self.lock()]{ + s->tryFail(std::make_exception_ptr(celix::PromiseTimeoutException{})); }); - p->addOnSuccessConsumeCallback([schedFuture]{ - schedFuture->cancel(); + addChain([sf = std::move(schedFuture)] { + sf->cancel(); }); - return p; + return self.lock(); } template<typename T> diff --git a/libs/promises/cmake/CelixPromisesConfigVersion.cmake b/libs/promises/cmake/CelixPromisesConfigVersion.cmake index bd21a83..0fd639c 100644 --- a/libs/promises/cmake/CelixPromisesConfigVersion.cmake +++ b/libs/promises/cmake/CelixPromisesConfigVersion.cmake @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -set(PACKAGE_VERSION "0.0.1") +set(PACKAGE_VERSION "0.1.0") # Check whether the requested PACKAGE_FIND_VERSION is compatible if("${PACKAGE_VERSION}" VERSION_LESS "${PACKAGE_FIND_VERSION}") diff --git a/libs/promises/gtest/src/PromisesTestSuite.cc b/libs/promises/gtest/src/PromisesTestSuite.cc index fee3a86..ced4d90 100644 --- a/libs/promises/gtest/src/PromisesTestSuite.cc +++ b/libs/promises/gtest/src/PromisesTestSuite.cc @@ -268,7 +268,7 @@ TEST_F(PromiseTestSuite, resolveWithTimeout) { EXPECT_EQ(42, value); firstSuccessCalled = true; }) - .timeout(std::chrono::milliseconds{10}) + .timeout(std::chrono::milliseconds{5}) .onSuccess([&](long value) { EXPECT_EQ(42, value); secondSuccessCalled = true; @@ -311,6 +311,13 @@ TEST_F(PromiseTestSuite, resolveWithTimeout) { } #endif +TEST_F(PromiseTestSuite, resolveWithSetTimeout) { + auto promise = factory->deferred<int>().getPromise().setTimeout(std::chrono::milliseconds{5}); + factory->wait(); + EXPECT_TRUE(promise.isDone()); + EXPECT_FALSE(promise.isSuccessfullyResolved()); +} + TEST_F(PromiseTestSuite, resolveWithDelay) { auto deferred1 = factory->deferred<long>(); std::atomic<bool> successCalled = false; @@ -332,7 +339,6 @@ TEST_F(PromiseTestSuite, resolveWithDelay) { EXPECT_GE(durationInMs, std::chrono::milliseconds{50}); } - TEST_F(PromiseTestSuite, resolveWithRecover) { auto deferred1 = factory->deferred<long>(); std::atomic<bool> successCalled = false; diff --git a/libs/promises/gtest/src/VoidPromisesTestSuite.cc b/libs/promises/gtest/src/VoidPromisesTestSuite.cc index f4ab814..be016b4 100644 --- a/libs/promises/gtest/src/VoidPromisesTestSuite.cc +++ b/libs/promises/gtest/src/VoidPromisesTestSuite.cc @@ -128,7 +128,7 @@ TEST_F(VoidPromiseTestSuite, onFailureHandling) { TEST_F(VoidPromiseTestSuite, resolveSuccessWith) { auto deferred1 = factory->deferred<void>(); - auto deferred2 = factory->deferred<void>(); + auto deferred2 = factory->deferred<long>(); bool called = false; deferred1.getPromise() @@ -138,9 +138,8 @@ TEST_F(VoidPromiseTestSuite, resolveSuccessWith) { //currently deferred1 will be resolved in thread, and onSuccess is trigger on the promise of deferred2 //now resolving deferred2 with the promise of deferred1 - deferred2.resolveWith(deferred1.getPromise()); - auto p = deferred2.getPromise(); - deferred1.resolve(); + deferred1.resolveWith(deferred2.getPromise()); + deferred2.resolve(1); factory->wait(); EXPECT_EQ(true, called); } @@ -233,6 +232,13 @@ TEST_F(VoidPromiseTestSuite, resolveWithTimeout) { } #endif +TEST_F(VoidPromiseTestSuite, resolveWithSetTimeout) { + auto promise = factory->deferred<void>().getPromise().setTimeout(std::chrono::milliseconds{5}); + factory->wait(); + EXPECT_TRUE(promise.isDone()); + EXPECT_FALSE(promise.isSuccessfullyResolved()); +} + TEST_F(VoidPromiseTestSuite, resolveWithDelay) { auto deferred1 = factory->deferred<void>(); @@ -258,7 +264,6 @@ TEST_F(VoidPromiseTestSuite, resolveWithDelay) { EXPECT_GE(durationInMs, std::chrono::milliseconds{10}); } - TEST_F(VoidPromiseTestSuite, resolveWithRecover) { auto deferred1 = factory->deferred<void>(); std::atomic<bool> successCalled = false; diff --git a/libs/promises/src/PromiseExamples.cc b/libs/promises/src/PromiseExamples.cc index ee40a17..0208e5e 100644 --- a/libs/promises/src/PromiseExamples.cc +++ b/libs/promises/src/PromiseExamples.cc @@ -42,7 +42,7 @@ int main() { celix::PromiseFactory factory{}; fib(factory, 1000000000) - .timeout(std::chrono::milliseconds {100}) + .setTimeout(std::chrono::milliseconds {100}) .onSuccess([](long val) { std::cout << "Success promise 1 : " << std::to_string(val) << std::endl; }) @@ -51,7 +51,7 @@ int main() { }); fib(factory, 39) - .timeout(std::chrono::milliseconds{100}) + .setTimeout(std::chrono::milliseconds{100}) .onSuccess([](long val) { std::cout << "Success promise 2 : " << std::to_string(val) << std::endl; })
