RobertIndie commented on code in PR #299:
URL: https://github.com/apache/pulsar-client-cpp/pull/299#discussion_r1252465638
##########
lib/Future.h:
##########
@@ -19,162 +19,133 @@
#ifndef LIB_FUTURE_H_
#define LIB_FUTURE_H_
-#include <condition_variable>
+#include <atomic>
+#include <chrono>
#include <functional>
+#include <future>
#include <list>
#include <memory>
#include <mutex>
-
-using Lock = std::unique_lock<std::mutex>;
+#include <thread>
+#include <utility>
namespace pulsar {
template <typename Result, typename Type>
-struct InternalState {
- std::mutex mutex;
- std::condition_variable condition;
- Result result;
- Type value;
- bool complete;
-
- std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
public:
- typedef std::function<void(Result, const Type&)> ListenerCallback;
-
- Future& addListener(ListenerCallback callback) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
-
- if (state->complete) {
- lock.unlock();
- callback(state->result, state->value);
- } else {
- state->listeners.push_back(callback);
- }
+ using Listener = std::function<void(Result, const Type &)>;
+ using Pair = std::pair<Result, Type>;
+ using Lock = std::unique_lock<std::mutex>;
- return *this;
- }
+ // NOTE: Add the constructor explicitly just to be compatible with GCC 4.8
+ InternalState() {}
- Result get(Type& result) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
+ void addListener(Listener listener) {
+ Lock lock{mutex_};
+ listeners_.emplace_back(listener);
+ lock.unlock();
- if (!state->complete) {
- // Wait for result
- while (!state->complete) {
- state->condition.wait(lock);
- }
+ if (completed()) {
+ Type value;
+ Result result = get(value);
+ triggerListeners(result, value);
}
-
- result = state->value;
- return state->result;
}
- template <typename Duration>
- bool get(Result& res, Type& value, Duration d) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
-
- if (!state->complete) {
- // Wait for result
- while (!state->complete) {
- if (!state->condition.wait_for(lock, d, [&state] { return
state->complete; })) {
- // Timeout while waiting for the future to complete
- return false;
- }
- }
+ bool complete(Result result, const Type &value) {
+ bool expected = false;
+ if (!completed_.compare_exchange_strong(expected, true)) {
+ return false;
}
-
- value = state->value;
- res = state->result;
+ triggerListeners(result, value);
+ promise_.set_value(std::make_pair(result, value));
return true;
}
- private:
- typedef std::shared_ptr<InternalState<Result, Type> > InternalStatePtr;
- Future(InternalStatePtr state) : state_(state) {}
+ bool completed() const noexcept { return completed_; }
- std::shared_ptr<InternalState<Result, Type> > state_;
-
- template <typename U, typename V>
- friend class Promise;
-};
+ Result get(Type &result) {
+ const auto &pair = future_.get();
+ result = pair.second;
+ return pair.first;
+ }
-template <typename Result, typename Type>
-class Promise {
- public:
- Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {}
+ // Only for test
Review Comment:
```suggestion
// Only public for test
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]