BewareMyPower commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741562539
##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
namespace pulsar {
template <typename Result, typename Type>
-struct InternalState {
- std::mutex mutex;
- std::condition_variable condition;
- Result result;
- Type value;
- bool complete;
-
- std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
public:
- typedef std::function<void(Result, const Type&)> ListenerCallback;
+ using ListenerCallback = std::function<void(Result, const Type&)>;
- Future& addListener(ListenerCallback callback) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
+ // There's a bug about the defaulted default constructor for GCC < 4.9.1,
so we cannot use
+ // `InternalState() = default` here.
+ InternalState() {}
+ InternalState(const InternalState&) = delete;
+ InternalState& operator=(const InternalState&) = delete;
- if (state->complete) {
+ static Result defaultResult() {
+ static Result result;
+ return result;
+ }
+
+ static Type defaultValue() {
+ static Type value;
+ return value;
+ }
+
+ bool completed() const noexcept { return completed_; }
+
+ void addListener(const ListenerCallback& callback) {
+ Lock lock(mutex_);
+ if (completed_) {
+ const auto result = result_;
+ const auto value = value_;
lock.unlock();
- callback(state->result, state->value);
+ callback(result, value);
} else {
- state->listeners.push_back(callback);
+ listeners_.emplace_back(callback);
}
+ }
- return *this;
+ Result wait(Type& value) {
+ Lock lock(mutex_);
+ while (!completed_) {
+ condition_.wait(lock);
+ }
+ value = value_;
+ return result_;
}
- Result get(Type& result) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
+ bool complete(const Type& value) {
+ if (completed_) {
Review comment:
Yes, it's right.
##########
File path: pulsar-client-cpp/lib/ClientConnection.cc
##########
@@ -1620,7 +1620,12 @@ Future<Result, MessageId>
ClientConnection::newGetLastMessageId(uint64_t consume
pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId,
promise));
lock.unlock();
- sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId),
requestId);
+ sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId),
requestId)
Review comment:
I agree. I'll remove the unnecessary refactoring of this class but still
add some changes.
##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
namespace pulsar {
template <typename Result, typename Type>
-struct InternalState {
- std::mutex mutex;
- std::condition_variable condition;
- Result result;
- Type value;
- bool complete;
-
- std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
public:
- typedef std::function<void(Result, const Type&)> ListenerCallback;
+ using ListenerCallback = std::function<void(Result, const Type&)>;
- Future& addListener(ListenerCallback callback) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
+ // There's a bug about the defaulted default constructor for GCC < 4.9.1,
so we cannot use
+ // `InternalState() = default` here.
+ InternalState() {}
+ InternalState(const InternalState&) = delete;
+ InternalState& operator=(const InternalState&) = delete;
- if (state->complete) {
+ static Result defaultResult() {
+ static Result result;
+ return result;
+ }
+
+ static Type defaultValue() {
+ static Type value;
+ return value;
+ }
+
+ bool completed() const noexcept { return completed_; }
+
+ void addListener(const ListenerCallback& callback) {
+ Lock lock(mutex_);
+ if (completed_) {
+ const auto result = result_;
+ const auto value = value_;
lock.unlock();
- callback(state->result, state->value);
+ callback(result, value);
} else {
- state->listeners.push_back(callback);
+ listeners_.emplace_back(callback);
}
+ }
- return *this;
+ Result wait(Type& value) {
+ Lock lock(mutex_);
+ while (!completed_) {
+ condition_.wait(lock);
+ }
+ value = value_;
+ return result_;
}
- Result get(Type& result) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
+ bool complete(const Type& value) {
+ if (completed_) {
+ return false;
+ }
+
+ Lock lock(mutex_);
+ value_ = value;
+ completed_ = true;
+ auto listeners = std::move(listeners_);
+ lock.unlock();
- if (!state->complete) {
- // Wait for result
- while (!state->complete) {
- state->condition.wait(lock);
- }
+ for (auto& callback : listeners) {
Review comment:
The `listeners` is a local variable and moved from the `listeners_`
field. And the move operation is protected by `mutex`.
##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
namespace pulsar {
template <typename Result, typename Type>
-struct InternalState {
- std::mutex mutex;
- std::condition_variable condition;
- Result result;
- Type value;
- bool complete;
-
- std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
public:
- typedef std::function<void(Result, const Type&)> ListenerCallback;
+ using ListenerCallback = std::function<void(Result, const Type&)>;
- Future& addListener(ListenerCallback callback) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
+ // There's a bug about the defaulted default constructor for GCC < 4.9.1,
so we cannot use
+ // `InternalState() = default` here.
+ InternalState() {}
+ InternalState(const InternalState&) = delete;
+ InternalState& operator=(const InternalState&) = delete;
- if (state->complete) {
+ static Result defaultResult() {
+ static Result result;
+ return result;
+ }
+
+ static Type defaultValue() {
+ static Type value;
+ return value;
+ }
+
+ bool completed() const noexcept { return completed_; }
+
+ void addListener(const ListenerCallback& callback) {
+ Lock lock(mutex_);
+ if (completed_) {
+ const auto result = result_;
+ const auto value = value_;
lock.unlock();
- callback(state->result, state->value);
+ callback(result, value);
} else {
- state->listeners.push_back(callback);
+ listeners_.emplace_back(callback);
}
+ }
- return *this;
+ Result wait(Type& value) {
+ Lock lock(mutex_);
+ while (!completed_) {
+ condition_.wait(lock);
+ }
+ value = value_;
+ return result_;
}
- Result get(Type& result) {
- InternalState<Result, Type>* state = state_.get();
- Lock lock(state->mutex);
+ bool complete(const Type& value) {
+ if (completed_) {
+ return false;
+ }
+
+ Lock lock(mutex_);
+ value_ = value;
+ completed_ = true;
+ auto listeners = std::move(listeners_);
+ lock.unlock();
- if (!state->complete) {
- // Wait for result
- while (!state->complete) {
- state->condition.wait(lock);
- }
+ for (auto& callback : listeners) {
Review comment:
Yes. So I just revert the unnecessary changes and only move the
iteration of listeners outside the locked block.
##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
}
state->value = value;
- state->result = Result();
+ state->result = DEFAULT_RESULT;
state->complete = true;
- typename std::list<ListenerCallback>::iterator it;
- for (it = state->listeners.begin(); it != state->listeners.end();
++it) {
- ListenerCallback& callback = *it;
- callback(state->result, state->value);
+ const auto listeners = std::move(state->listeners);
+
+ lock.unlock();
+
+ for (auto& callback : listeners) {
+ callback(DEFAULT_RESULT, value);
}
state->listeners.clear();
Review comment:
It's right. I just forgot to remove it. (I have removed it from
`setFailed`)
##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
}
state->value = value;
- state->result = Result();
+ state->result = DEFAULT_RESULT;
state->complete = true;
- typename std::list<ListenerCallback>::iterator it;
- for (it = state->listeners.begin(); it != state->listeners.end();
++it) {
- ListenerCallback& callback = *it;
- callback(state->result, state->value);
+ const auto listeners = std::move(state->listeners);
+
+ lock.unlock();
+
+ for (auto& callback : listeners) {
+ callback(DEFAULT_RESULT, value);
}
state->listeners.clear();
Review comment:
Sounds reasonable. Ideally we should not use the object that has been
moved. I'll adopt the swap way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]