pitrou commented on a change in pull request #10205:
URL: https://github.com/apache/arrow/pull/10205#discussion_r629340021



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -78,28 +110,32 @@ struct ContinueFuture {
   template <typename ContinueFunc, typename... Args,
             typename ContinueResult = result_of_t<ContinueFunc && (Args && 
...)>,
             typename NextFuture = ForReturn<ContinueResult>>
-  typename std::enable_if<!std::is_void<ContinueResult>::value &&
-                          !is_future<ContinueResult>::value>::type
+  typename std::enable_if<
+      !std::is_void<ContinueResult>::value && 
!is_future<ContinueResult>::value &&
+      (!NextFuture::is_empty || std::is_same<ContinueResult, 
Status>::value)>::type

Review comment:
       You should add comments before each of these overloads because it's 
getting very hard to guess what their role is.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -56,6 +47,47 @@ struct is_future<Future<T>> : std::true_type {};
 template <typename Signature>
 using result_of_t = typename std::result_of<Signature>::type;
 
+template <typename Source, typename Dest>
+typename std::enable_if<Source::is_empty>::type Propagate(Source* source, Dest 
dest) {

Review comment:
       Is this used somewhere?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -322,56 +341,98 @@ class ARROW_MUST_USE_TYPE Future {
     return impl_->Wait(seconds);
   }
 
-  // Producer API
+ protected:
+  void InitializeFromResult(Result<ValueType> res) {
+    if (ARROW_PREDICT_TRUE(res.ok())) {
+      impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
+    } else {
+      impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
+    }
+    SetResult(std::move(res));
+  }
 
-  /// \brief Producer API: mark Future finished
-  ///
-  /// The Future's result is set to `res`.
-  void MarkFinished(Result<ValueType> res) { DoMarkFinished(std::move(res)); }
+  void Initialize() { impl_ = FutureImpl::Make(); }
 
-  /// \brief Mark a Future<> completed with the provided Status.
-  template <typename E = ValueType, typename = typename std::enable_if<
-                                        std::is_same<E, 
detail::Empty>::value>::type>
-  void MarkFinished(Status s = Status::OK()) {
-    return DoMarkFinished(E::ToResult(std::move(s)));
+  Result<ValueType>* GetResult() const {
+    return static_cast<Result<ValueType>*>(impl_->result_.get());
   }
 
-  /// \brief Producer API: instantiate a valid Future
-  ///
-  /// The Future's state is initialized with PENDING.  If you are creating a 
future with
-  /// this method you must ensure that future is eventually completed (with 
success or
-  /// failure).  Creating a future, returning it, and never completing the 
future can lead
-  /// to memory leaks (for example, see Loop).
-  static Future Make() {
-    Future fut;
-    fut.impl_ = FutureImpl::Make();
-    return fut;
+  void SetResult(Result<ValueType> res) {
+    impl_->result_ = {new Result<ValueType>(std::move(res)),
+                      [](void* p) { delete static_cast<Result<ValueType>*>(p); 
}};
   }
 
-  /// \brief Producer API: instantiate a finished Future
-  static Future MakeFinished(Result<ValueType> res) {
-    Future fut;
-    if (ARROW_PREDICT_TRUE(res.ok())) {
-      fut.impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
+  void DoMarkFinished(Result<ValueType> res) {
+    SetResult(std::move(res));
+
+    if (ARROW_PREDICT_TRUE(GetResult()->ok())) {
+      impl_->MarkFinished();
     } else {
-      fut.impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
+      impl_->MarkFailed();
     }
-    fut.SetResult(std::move(res));
-    return fut;
   }
 
-  /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = typename std::enable_if<
-                                        std::is_same<E, 
detail::Empty>::value>::type>
-  static Future<> MakeFinished(Status s = Status::OK()) {
-    return MakeFinished(E::ToResult(std::move(s)));
+  void CheckValid() const {
+#ifndef NDEBUG
+    if (!is_valid()) {
+      Status::Invalid("Invalid Future (default-initialized?)").Abort();
+    }
+#endif
+  }
+
+  explicit FutureBase(std::shared_ptr<FutureImpl> impl) : 
impl_(std::move(impl)) {}
+
+  std::shared_ptr<FutureImpl> impl_;
+
+  friend class FutureWaiter;
+  friend struct detail::ContinueFuture;
+};
+
+template <typename T>
+class ARROW_MUST_USE_TYPE Future : public FutureBase<T> {
+ public:
+  using ValueType = T;
+  using SyncType = Result<T>;
+  static constexpr bool is_empty = false;
+
+  Future() = default;
+
+  /// \brief Returns an rvalue to the result.  This method is potentially 
unsafe
+  ///
+  /// The future is not the unique owner of the result, copies of a future will
+  /// also point to the same result.  You must make sure that no other copies
+  /// of the future exist.  Attempts to add callbacks after you move the result
+  /// will result in undefined behavior.
+  Result<ValueType>&& MoveResult() {
+    FutureBase<T>::Wait();
+    return std::move(*FutureBase<T>::GetResult());
+  }
+
+  /// \brief Wait for the Future to complete and return its Result (or Status 
for an empty
+  /// future)
+  ///
+  /// This method is useful for general purpose code converting from async to 
sync where T
+  /// is a template parameter and may be empty.
+  const SyncType& to_sync() const { return FutureBase<T>::result(); }

Review comment:
       So this is just the `result()` method? Is there an actual need in 
introducing this?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -322,56 +341,98 @@ class ARROW_MUST_USE_TYPE Future {
     return impl_->Wait(seconds);
   }
 
-  // Producer API
+ protected:
+  void InitializeFromResult(Result<ValueType> res) {
+    if (ARROW_PREDICT_TRUE(res.ok())) {
+      impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
+    } else {
+      impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
+    }
+    SetResult(std::move(res));
+  }
 
-  /// \brief Producer API: mark Future finished
-  ///
-  /// The Future's result is set to `res`.
-  void MarkFinished(Result<ValueType> res) { DoMarkFinished(std::move(res)); }
+  void Initialize() { impl_ = FutureImpl::Make(); }
 
-  /// \brief Mark a Future<> completed with the provided Status.
-  template <typename E = ValueType, typename = typename std::enable_if<
-                                        std::is_same<E, 
detail::Empty>::value>::type>
-  void MarkFinished(Status s = Status::OK()) {
-    return DoMarkFinished(E::ToResult(std::move(s)));
+  Result<ValueType>* GetResult() const {
+    return static_cast<Result<ValueType>*>(impl_->result_.get());
   }
 
-  /// \brief Producer API: instantiate a valid Future
-  ///
-  /// The Future's state is initialized with PENDING.  If you are creating a 
future with
-  /// this method you must ensure that future is eventually completed (with 
success or
-  /// failure).  Creating a future, returning it, and never completing the 
future can lead
-  /// to memory leaks (for example, see Loop).
-  static Future Make() {
-    Future fut;
-    fut.impl_ = FutureImpl::Make();
-    return fut;
+  void SetResult(Result<ValueType> res) {
+    impl_->result_ = {new Result<ValueType>(std::move(res)),
+                      [](void* p) { delete static_cast<Result<ValueType>*>(p); 
}};
   }
 
-  /// \brief Producer API: instantiate a finished Future
-  static Future MakeFinished(Result<ValueType> res) {
-    Future fut;
-    if (ARROW_PREDICT_TRUE(res.ok())) {
-      fut.impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
+  void DoMarkFinished(Result<ValueType> res) {
+    SetResult(std::move(res));
+
+    if (ARROW_PREDICT_TRUE(GetResult()->ok())) {
+      impl_->MarkFinished();
     } else {
-      fut.impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
+      impl_->MarkFailed();
     }
-    fut.SetResult(std::move(res));
-    return fut;
   }
 
-  /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = typename std::enable_if<
-                                        std::is_same<E, 
detail::Empty>::value>::type>
-  static Future<> MakeFinished(Status s = Status::OK()) {
-    return MakeFinished(E::ToResult(std::move(s)));
+  void CheckValid() const {
+#ifndef NDEBUG
+    if (!is_valid()) {
+      Status::Invalid("Invalid Future (default-initialized?)").Abort();
+    }
+#endif
+  }
+
+  explicit FutureBase(std::shared_ptr<FutureImpl> impl) : 
impl_(std::move(impl)) {}
+
+  std::shared_ptr<FutureImpl> impl_;
+
+  friend class FutureWaiter;
+  friend struct detail::ContinueFuture;
+};
+
+template <typename T>
+class ARROW_MUST_USE_TYPE Future : public FutureBase<T> {
+ public:
+  using ValueType = T;
+  using SyncType = Result<T>;

Review comment:
       Call this `ResultType`?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -56,6 +47,47 @@ struct is_future<Future<T>> : std::true_type {};
 template <typename Signature>
 using result_of_t = typename std::result_of<Signature>::type;
 
+template <typename Source, typename Dest>
+typename std::enable_if<Source::is_empty>::type Propagate(Source* source, Dest 
dest) {
+  struct MarkNextFinished {
+    void operator()(const Status& status) && { next.MarkFinished(status); }
+    Dest next;
+  };
+  source->AddCallback(MarkNextFinished{std::move(dest)});
+}
+
+template <typename Source, typename Dest, bool SourceEmpty = Source::is_empty,
+          bool DestEmpty = Dest::is_empty>
+struct MarkNextFinished {};
+
+template <typename Source, typename Dest>
+struct MarkNextFinished<Source, Dest, true, false> {
+  void operator()(const Status& status) && { next.MarkFinished(status); }

Review comment:
       Is this one valid? If the status is ok, then `next` needs an actual 
`Result` object.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -78,28 +110,32 @@ struct ContinueFuture {
   template <typename ContinueFunc, typename... Args,
             typename ContinueResult = result_of_t<ContinueFunc && (Args && 
...)>,
             typename NextFuture = ForReturn<ContinueResult>>
-  typename std::enable_if<!std::is_void<ContinueResult>::value &&
-                          !is_future<ContinueResult>::value>::type
+  typename std::enable_if<
+      !std::is_void<ContinueResult>::value && 
!is_future<ContinueResult>::value &&
+      (!NextFuture::is_empty || std::is_same<ContinueResult, 
Status>::value)>::type
   operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
     next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...));
   }
 
+  template <typename ContinueFunc, typename... Args,
+            typename ContinueResult = result_of_t<ContinueFunc && (Args && 
...)>,
+            typename NextFuture = ForReturn<ContinueResult>>
+  typename std::enable_if<!std::is_void<ContinueResult>::value &&
+                          !is_future<ContinueResult>::value && 
NextFuture::is_empty &&
+                          !std::is_same<ContinueResult, Status>::value>::type
+  operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
+    
next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...).status());

Review comment:
       Hmm... so why not create a specialized helper such as:
   ```c++
   void ForwardResult(const Result<T>&, Future<U>* fut);
   void ForwardResult(const Result<T>&, Future<>* fut);
   void ForwardResult(const Status&, Future<>* fut);
   ```
   

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -691,34 +876,41 @@ struct Continue {
   }
 };
 
-template <typename T = detail::Empty>
+template <typename T = internal::Empty>
 util::optional<T> Break(T break_value = {}) {
   return util::optional<T>{std::move(break_value)};
 }
 
-template <typename T = detail::Empty>
+template <typename T = internal::Empty>
 using ControlFlow = util::optional<T>;
 
+template <typename T>
+void ForwardControlResult(const Result<ControlFlow<T>>& result, Future<T> 
sink) {
+  sink.MarkFinished(**result);
+}
+template <>
+inline void ForwardControlResult(const Result<ControlFlow<>>& result, Future<> 
sink) {
+  sink.MarkFinished();

Review comment:
       I'm a bit skeptical that we have to introduce this yet another 
result-to-future forwarding primitive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to