bkietz commented on a change in pull request #10205:
URL: https://github.com/apache/arrow/pull/10205#discussion_r625876665



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -78,28 +102,31 @@ struct ContinueFuture {
   template <typename ContinueFunc, typename... Args,
             typename ContinueResult = result_of_t<ContinueFunc && (Args && 
...)>,
             typename NextFuture = ForReturn<ContinueResult>>
-  typename std::enable_if<!std::is_void<ContinueResult>::value &&
-                          !is_future<ContinueResult>::value>::type
+  typename std::enable_if<
+      !std::is_void<ContinueResult>::value && 
!is_future<ContinueResult>::value &&
+      (!NextFuture::is_empty || std::is_same<ContinueResult, 
Status>::value)>::type
   operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
     next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...));
   }
 
+  template <typename ContinueFunc, typename... Args,
+            typename ContinueResult = result_of_t<ContinueFunc && (Args && 
...)>,
+            typename NextFuture = ForReturn<ContinueResult>>
+  typename std::enable_if<!std::is_void<ContinueResult>::value &&
+                          !is_future<ContinueResult>::value && 
NextFuture::is_empty &&
+                          !std::is_same<ContinueResult, Status>::value>::type
+  operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
+    
next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...).status());
+  }
+
   template <typename ContinueFunc, typename... Args,
             typename ContinueResult = result_of_t<ContinueFunc && (Args && 
...)>,
             typename NextFuture = ForReturn<ContinueResult>>
   typename std::enable_if<is_future<ContinueResult>::value>::type operator()(
       NextFuture next, ContinueFunc&& f, Args&&... a) const {
     ContinueResult signal_to_complete_next =
         std::forward<ContinueFunc>(f)(std::forward<Args>(a)...);
-
-    struct MarkNextFinished {
-      void operator()(const Result<typename ContinueResult::ValueType>& 
result) && {
-        next.MarkFinished(result);
-      }
-      NextFuture next;
-    };
-
-    signal_to_complete_next.AddCallback(MarkNextFinished{std::move(next)});
+    Propagate<ContinueResult, NextFuture>(signal_to_complete_next, 
std::move(next));

Review comment:
       It might be more clear to only extract a callback factory, rather than 
calling AddCallback in Propagate:
   ```suggestion
       auto callback = MakeMarkNextFinished<ContinueResult>(std::move(next));
       signal_to_complete_next.AddCallback(std::move(callback));
   ```

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -56,6 +47,39 @@ struct is_future<Future<T>> : std::true_type {};
 template <typename Signature>
 using result_of_t = typename std::result_of<Signature>::type;
 
+template <typename Source, typename Dest>
+typename std::enable_if<Source::is_empty>::type Propagate(Source& source, Dest 
dest) {

Review comment:
       Style nit: mutable references should be avoided
   ```suggestion
   typename std::enable_if<Source::is_empty>::type Propagate(Source* source, 
Dest dest) {
   ```

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -227,14 +228,14 @@ class ARROW_EXPORT SerialExecutor : public Executor {
   std::shared_ptr<State> state_;
 
   template <typename T>
-  Result<T> Run(TopLevelTask<T> initial_task) {
+  Future<T> Run(TopLevelTask<T> initial_task) {
     auto final_fut = std::move(initial_task)(this);
     if (final_fut.is_finished()) {
-      return final_fut.result();
+      return final_fut;
     }
-    final_fut.AddCallback([this](const Result<T>&) { MarkFinished(); });
+    final_fut.AddCallback([this](...) { MarkFinished(); });

Review comment:
       `...` can only be used to ignore trivially copyable/destructible 
arguments, so I'd revert this
   ```suggestion
       final_fut.AddCallback([this](const Result<T>&) { MarkFinished(); });
   ```

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -691,20 +862,29 @@ struct Continue {
   }
 };
 
-template <typename T = detail::Empty>
+template <typename T = internal::Empty>
 util::optional<T> Break(T break_value = {}) {
   return util::optional<T>{std::move(break_value)};
 }
 
-template <typename T = detail::Empty>
+template <typename T = internal::Empty>
 using ControlFlow = util::optional<T>;
 
+template <typename T>
+void ForwardControlResult(const Result<ControlFlow<T>>& result, Future<T>& 
sink) {

Review comment:
       ```suggestion
   void ForwardControlResult(const Result<ControlFlow<T>>& result, Future<T>* 
sink) {
   ```

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -189,11 +189,7 @@ TEST_P(TestRunSynchronously, SpawnMoreNested) {
   auto top_level_task = [&](Executor* executor) -> Future<> {
     auto fut_a = DeferNotOk(executor->Submit([&] { nested_ran++; }));
     auto fut_b = DeferNotOk(executor->Submit([&] { nested_ran++; }));
-    return AllComplete({fut_a, fut_b})
-        .Then([&](const Result<arrow::detail::Empty>& result) {
-          nested_ran++;
-          return result;
-        });
+    return AllComplete({fut_a, fut_b}).Then([&]() { nested_ran++; });

Review comment:
       you could add the following assertion to the body of Then:
   ```c++
       using OnSuccessArg =
           typename std::decay<internal::call_traits::argument_type<0, 
OnSuccess>>::type;
       static_assert(
           !std::is_same<OnSuccessArg, typename 
EnsureResult<OnSuccessArg>::type>::value,
           "OnSuccess' argument should not be a Result");
   ```
   
   Currently `call_traits::argument_type` breaks when trying to examine an 
ellipsis, so the following wouldn't compile anymore:
   ```c++
   int_future.Then([](...) {});
   ```
   But that'd probably be an acceptable loss




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to