pitrou commented on code in PR #13651:
URL: https://github.com/apache/arrow/pull/13651#discussion_r925300980


##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -69,17 +69,16 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
   /// e.g. make an array of thread-locals off this.
   size_t max_concurrency() const;
 
-  /// \brief Add a future to the plan's task group.
+  /// \brief Starts an external task

Review Comment:
   ```suggestion
     /// \brief Start an external task
   ```



##########
cpp/src/arrow/util/async_util.h:
##########
@@ -120,8 +120,12 @@ class ARROW_EXPORT AsyncTaskGroup {
   Status AddTaskIfNotEnded(std::function<Result<Future<>>()> task);
   /// Add a task that has already been started
   Status AddTask(const Future<>& task);
-  /// Same as AddTask but doesn't add the task if End() has been called.
-  Status AddTaskIfNotEnded(const Future<>& task);
+  /// \brief Attempts to add a task that has already been started to this 
group's tracking

Review Comment:
   ```suggestion
     /// \brief Attempt to add a task that has already been started to this 
group's tracking
   ```



##########
cpp/src/arrow/util/future.cc:
##########
@@ -339,8 +339,8 @@ class ConcreteFutureImpl : public FutureImpl {
       if (waiter_ != nullptr) {
         waiter_->MarkFutureFinishedUnlocked(waiter_arg_, state);
       }
+      cv_.notify_all();

Review Comment:
   Can you add a comment so we remember why we chose to notify while locked?



##########
cpp/src/arrow/util/future_test.cc:
##########
@@ -526,6 +527,23 @@ TEST(FutureStressTest, TryAddCallback) {
   }
 }
 
+TEST(FutureStressTest, DeleteAfterWait) {
+  constexpr int kNumTasks = 100;
+  for (int i = 0; i < kNumTasks; i++) {
+    {
+      std::unique_ptr<Future<>> future =
+          internal::make_unique<Future<>>(Future<>::Make());
+      std::thread t([&]() {
+        SleepABit();
+        future->MarkFinished();
+      });
+      ASSERT_TRUE(future->Wait(100));

Review Comment:
   100 seconds sounds a bit long... ?



##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -139,13 +147,18 @@ struct SourceNode : ExecNode {
                      options);
                })
                    .Then(
-                       [=](int total_batches) {
+                       [this, scan_task](int total_batches) mutable {
+                         std::unique_lock<std::mutex> lock(mutex_);
+                         bool should_mark_finished = !finished_.is_finished();
+                         lock.unlock();

Review Comment:
   I'm curious, why is the locking necessary? Inspecting a Future should be 
thread-safe.



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -69,17 +69,16 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
   /// e.g. make an array of thread-locals off this.
   size_t max_concurrency() const;
 
-  /// \brief Add a future to the plan's task group.
+  /// \brief Starts an external task
   ///
-  /// \param fut The future to add
+  /// This should be avoided if possible.  It is kept in for now for legacy
+  /// purposes.  This should be called before the external task is started.  If
+  /// a future is returned then it should be marked complete when the external
+  /// task has finished.
   ///
-  /// Use this when interfacing with anything that returns a future (such as 
IO), but
-  /// prefer ScheduleTask/StartTaskGroup inside of ExecNodes.
-  /// The below API interfaces with the scheduler to add tasks to the task 
group. Tasks
-  /// should be added sparingly! Prefer just doing the work immediately rather 
than adding
-  /// a task for it. Tasks are used in pipeline breakers that may output many 
more rows
-  /// than they received (such as a full outer join).
-  Status AddFuture(Future<> fut);
+  /// \return nullopt if the plan has already ended, otherwise this returns
+  ///         a future that must be completed when the external task finishes
+  Result<util::optional<Future<>>> BeginExternalTask();

Review Comment:
   I'm not sure the `optional` is necessary since you could also return an 
invalid (default-initialized) Future.



##########
cpp/src/arrow/util/future_test.cc:
##########
@@ -526,6 +527,23 @@ TEST(FutureStressTest, TryAddCallback) {
   }
 }
 
+TEST(FutureStressTest, DeleteAfterWait) {

Review Comment:
   There are a couple timeouts on CI in `threading-utility-test`, I'm assuming 
this is because of this test? Can you try to find out the issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to