RobertIndie commented on code in PR #299:
URL: https://github.com/apache/pulsar-client-cpp/pull/299#discussion_r1251858690


##########
lib/Future.h:
##########
@@ -19,162 +19,124 @@
 #ifndef LIB_FUTURE_H_
 #define LIB_FUTURE_H_
 
-#include <condition_variable>
+#include <atomic>
 #include <functional>
+#include <future>
 #include <list>
 #include <memory>
 #include <mutex>
-
-using Lock = std::unique_lock<std::mutex>;
+#include <utility>
 
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
-
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
-
-        if (state->complete) {
-            lock.unlock();
-            callback(state->result, state->value);
+    using Listener = std::function<void(Result, const Type &)>;
+    using Pair = std::pair<Result, Type>;
+
+    // NOTE: Add the constructor explicitly just to be compatible with GCC 4.8
+    InternalState() {}
+
+    void addListener(Listener listener) {
+        if (completed()) {
+            // Allow get_future() being called multiple times, only the 1st 
time will wait() be called to wait
+            // until all previous listeners are done.
+            try {
+                listenersPromise_.get_future().wait();
+            } catch (const std::future_error &e) {
+                if (e.code() != std::future_errc::future_already_retrieved) {
+                    throw e;
+                }
+            }
+            listener(future_.get().first, future_.get().second);
         } else {
-            state->listeners.push_back(callback);
+            std::lock_guard<std::mutex> lock{mutex_};
+            listeners_.emplace_back(listener);

Review Comment:
   It seems to be a race condition here.
   * If one thread named A is going into the `addListener` and finds that it's 
not completed. 
   * But then another thread named B completes the future and locks this mutex. 
   * Thread A is blocking until all previous listeners are executed.
   * Thread B executes all listeners and unlock the mutex.
   * Thread A get the lock. And push the listener to the list. But this 
listener wouldn't be executed anymore.
   
   This will result in the loss of the listener in some cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to