westonpace commented on a change in pull request #10664:
URL: https://github.com/apache/arrow/pull/10664#discussion_r665732675



##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -65,16 +61,24 @@ class ARROW_EXPORT ExecPlan : public 
std::enable_shared_from_this<ExecPlan> {
 
   Status Validate();
 
-  /// Start producing on all nodes
+  /// \brief Start producing on all nodes
   ///
   /// Nodes are started in reverse topological order, such that any node
   /// is started before all of its inputs.
   Status StartProducing();
 
+  /// \brief Stop producing on all nodes
+  ///
+  /// Nodes are stopped topological order, such that any node

Review comment:
       **in** topological order

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -234,7 +241,7 @@ Result<std::vector<ExecBatch>> StartAndCollect(
   auto maybe_collected = CollectAsyncGenerator(gen).result();
   ARROW_ASSIGN_OR_RAISE(auto collected, maybe_collected);
 
-  plan->StopProducing();
+  plan->finished().Wait();

Review comment:
       ASSERT_FINISHES_OK

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -144,7 +144,14 @@ TEST(ExecPlan, DummyStartProducing) {
   // Note that any correct reverse topological order may do
   ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", 
"process1",
                                      "source2", "source1"));
-  ASSERT_EQ(t.stopped.size(), 0);
+
+  plan->StopProducing();
+  plan->finished().Wait();
+  ASSERT_THAT(t.stopped, ElementsAre("source1", "source2", "process1", 
"process2",

Review comment:
       Do you need the same comment as above?  Is "source2", "source1", ... 
valid too?

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1102,7 +1102,7 @@ static Result<std::vector<compute::ExecBatch>> 
StartAndCollect(
   auto maybe_collected = CollectAsyncGenerator(gen).result();
   ARROW_ASSIGN_OR_RAISE(auto collected, maybe_collected);
 
-  plan->StopProducing();
+  plan->finished().Wait();

Review comment:
       ASSERT_FINISHES_OK

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -272,6 +272,8 @@ class ConcreteFutureImpl : public FutureImpl {
         return true;
       case ShouldSchedule::IfUnfinished:
         return !in_add_callback;
+      case ShouldSchedule::IfDifferentExecutor:
+        return !callback_record.options.executor->OwnsThisThread();

Review comment:
       Is this speculative?  Or is it actually needed?

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be 
called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { 
NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";
+
+    CallbackOptions options;
+    if (auto executor = plan()->exec_context()->executor()) {
+      // These options will transfer execution to the desired Executor if 
necessary.
+      // This can happen for in-memory scans where batches didn't require
+      // any CPU work to decode. Otherwise, parsing etc should have already
+      // been placed us on the desired Executor and no queues will be pushed 
to.
+      options.executor = executor;
+      options.should_schedule = ShouldSchedule::IfDifferentExecutor;
     }
 
-    finished_fut_ =
-        Loop([this] {
-          std::unique_lock<std::mutex> lock(mutex_);
-          int seq = next_batch_index_++;
-          if (finished_) {
-            return Future<ControlFlow<int>>::MakeFinished(Break(seq));
-          }
-          lock.unlock();
-
-          return generator_().Then(
-              [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!batch || finished_) {
-                  finished_ = true;
-                  return Break(seq);
-                }
-                lock.unlock();
-
-                // TODO check if we are on the desired Executor and transfer 
if not.
-                // This can happen for in-memory scans where batches didn't 
require
-                // any CPU work to decode. Otherwise, parsing etc should have 
already
-                // been placed us on the thread pool
-                outputs_[0]->InputReceived(this, seq, *batch);
-                return Continue();
-              },
-              [=](const Status& error) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!finished_) {
-                  finished_ = true;
+    finished_ = Loop([this, options] {
+                  std::unique_lock<std::mutex> lock(mutex_);
+                  int seq = batch_count_++;
+                  if (stop_requested_) {
+                    return Future<ControlFlow<int>>::MakeFinished(Break(seq));
+                  }
                   lock.unlock();
-                  // unless we were already finished, push the error to our 
output
-                  // XXX is this correct? Is it reasonable for a consumer to
-                  // ignore errors from a finished producer?
-                  outputs_[0]->ErrorReceived(this, error);
-                }
-                return Break(seq);
-              });
-        }).Then([&](int seq) {
-          /// XXX this is probably redundant: do we always call InputFinished 
after
-          /// ErrorReceived or will ErrorRecieved be sufficient?
-          outputs_[0]->InputFinished(this, seq);
-        });
+
+                  return generator_().Then(
+                      [=](const util::optional<ExecBatch>& batch) -> 
ControlFlow<int> {
+                        std::unique_lock<std::mutex> lock(mutex_);
+                        if (!batch || stop_requested_) {

Review comment:
       Nit: but `IsIterationEnd(batch)` may capture the intent more.

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -332,6 +339,38 @@ TEST(ExecPlanExecution, StressSourceSink) {
   }
 }
 
+TEST(ExecPlanExecution, StressSourceSinkStopped) {
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      int num_batches = slow && !parallel ? 30 : 300;
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+
+      auto random_data = MakeRandomBatches(
+          schema({field("a", int32()), field("b", boolean())}), num_batches);
+
+      ASSERT_OK_AND_ASSIGN(auto source, MakeTestSourceNode(plan.get(), 
"source",
+                                                           random_data, 
parallel, slow));
+
+      auto sink_gen = MakeSinkNode(source, "sink");
+
+      ASSERT_OK(plan->Validate());
+      ASSERT_OK(plan->StartProducing());
+
+      auto maybe_first_batch = sink_gen().result();
+
+      plan->StopProducing();
+      plan->finished().Wait();

Review comment:
       ASSERT_FINISHES_OK

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -485,35 +528,12 @@ class Future {
   ///
   /// In this example `fut` falls out of scope but is not destroyed because it 
holds a
   /// cyclic reference to itself through the callback.
-  template <typename OnComplete>
-  typename 
std::enable_if<!detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete,
-              CallbackOptions opts = CallbackOptions::Defaults()) const {
+  template <typename OnComplete, typename Callback = 
WrapOnComplete<OnComplete>>
+  void AddCallback(OnComplete on_complete,
+                   CallbackOptions opts = CallbackOptions::Defaults()) const {
     // We know impl_ will not be dangling when invoking callbacks because at 
least one
     // thread will be waiting for MarkFinished to return. Thus it's safe to 
keep a
     // weak reference to impl_ here
-    struct Callback {
-      void operator()(const FutureImpl& impl) && {
-        std::move(on_complete)(*impl.CastResult<ValueType>());
-      }
-      OnComplete on_complete;
-    };
-    impl_->AddCallback(Callback{std::move(on_complete)}, opts);
-  }
-
-  /// Overload for callbacks accepting a Status
-  template <typename OnComplete>
-  typename std::enable_if<detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete,

Review comment:
       Thanks for getting rid of these repetitive overloads

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be 
called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { 
NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";
+
+    CallbackOptions options;
+    if (auto executor = plan()->exec_context()->executor()) {
+      // These options will transfer execution to the desired Executor if 
necessary.
+      // This can happen for in-memory scans where batches didn't require
+      // any CPU work to decode. Otherwise, parsing etc should have already
+      // been placed us on the desired Executor and no queues will be pushed 
to.
+      options.executor = executor;
+      options.should_schedule = ShouldSchedule::IfDifferentExecutor;

Review comment:
       Why wouldn't `ShouldSchedule::Always` work?

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -220,58 +240,61 @@ struct SourceNode : ExecNode {
 
   const char* kind_name() override { return "SourceNode"; }
 
-  static void NoInputs() { DCHECK(false) << "no inputs; this should never be 
called"; }
-  void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); }
-  void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
-  void InputFinished(ExecNode*, int) override { NoInputs(); }
+  [[noreturn]] static void NoInputs() {
+    DCHECK(false) << "no inputs; this should never be called";
+    std::abort();
+  }
+  [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { 
NoInputs(); }
+  [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); }
+  [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); }
 
   Status StartProducing() override {
-    if (finished_) {
-      return Status::Invalid("Restarted SourceNode '", label(), "'");
+    DCHECK(!stop_requested_) << "Restarted SourceNode";
+
+    CallbackOptions options;
+    if (auto executor = plan()->exec_context()->executor()) {
+      // These options will transfer execution to the desired Executor if 
necessary.
+      // This can happen for in-memory scans where batches didn't require
+      // any CPU work to decode. Otherwise, parsing etc should have already
+      // been placed us on the desired Executor and no queues will be pushed 
to.
+      options.executor = executor;
+      options.should_schedule = ShouldSchedule::IfDifferentExecutor;
     }
 
-    finished_fut_ =
-        Loop([this] {
-          std::unique_lock<std::mutex> lock(mutex_);
-          int seq = next_batch_index_++;
-          if (finished_) {
-            return Future<ControlFlow<int>>::MakeFinished(Break(seq));
-          }
-          lock.unlock();
-
-          return generator_().Then(
-              [=](const util::optional<ExecBatch>& batch) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!batch || finished_) {
-                  finished_ = true;
-                  return Break(seq);
-                }
-                lock.unlock();
-
-                // TODO check if we are on the desired Executor and transfer 
if not.
-                // This can happen for in-memory scans where batches didn't 
require
-                // any CPU work to decode. Otherwise, parsing etc should have 
already
-                // been placed us on the thread pool
-                outputs_[0]->InputReceived(this, seq, *batch);
-                return Continue();
-              },
-              [=](const Status& error) -> ControlFlow<int> {
-                std::unique_lock<std::mutex> lock(mutex_);
-                if (!finished_) {
-                  finished_ = true;
+    finished_ = Loop([this, options] {

Review comment:
       Rather than use `Loop` directly it seems you could use `MakeTransferred` 
and `VisitAsyncGenerator`.  You might need to add `ErrorVisitor` and 
`stop_callback` support to `VisitAsyncGenerator` but then it would be more 
univerally supported.

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -332,6 +339,38 @@ TEST(ExecPlanExecution, StressSourceSink) {
   }
 }
 
+TEST(ExecPlanExecution, StressSourceSinkStopped) {
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      int num_batches = slow && !parallel ? 30 : 300;
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+
+      auto random_data = MakeRandomBatches(
+          schema({field("a", int32()), field("b", boolean())}), num_batches);
+
+      ASSERT_OK_AND_ASSIGN(auto source, MakeTestSourceNode(plan.get(), 
"source",
+                                                           random_data, 
parallel, slow));
+
+      auto sink_gen = MakeSinkNode(source, "sink");
+
+      ASSERT_OK(plan->Validate());
+      ASSERT_OK(plan->StartProducing());
+
+      auto maybe_first_batch = sink_gen().result();

Review comment:
       ASSERT_FINISHES_OK_AND_ASSIGN

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +585,90 @@ Result<EnumeratedRecordBatchGenerator> 
AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& 
options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, 
options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = 
batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 
1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, 
std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 
2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, 
cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, 
compute::ExecPlan::Make(exec_context.get()));
+
+  ARROW_ASSIGN_OR_RAISE(auto scan, MakeScanNode(plan.get(), dataset_, 
scan_options_));
+
+  ARROW_ASSIGN_OR_RAISE(auto filter,
+                        compute::MakeFilterNode(scan, "filter", 
scan_options_->filter));
+
+  auto exprs = scan_options_->projection.call()->arguments;
+  exprs.push_back(compute::field_ref("__fragment_index"));
+  exprs.push_back(compute::field_ref("__batch_index"));
+  exprs.push_back(compute::field_ref("__last_in_fragment"));
+  ARROW_ASSIGN_OR_RAISE(auto project,
+                        compute::MakeProjectNode(filter, "project", 
std::move(exprs)));
+
+  AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen =
+      compute::MakeSinkNode(project, "sink");
+
+  RETURN_NOT_OK(plan->StartProducing());
+
+  auto options = scan_options_;
+  ARROW_ASSIGN_OR_RAISE(auto fragments_it, 
dataset_->GetFragments(scan_options_->filter));
+  ARROW_ASSIGN_OR_RAISE(auto fragments, fragments_it.ToVector());
+  auto shared_fragments = 
std::make_shared<FragmentVector>(std::move(fragments));
+
+  // If the generator is destroyed before being completely drained, inform plan
+  std::shared_ptr<void> stop_producing{
+      nullptr, [plan, exec_context](...) {
+        bool not_finished_yet = plan->finished().TryAddCallback([&] {

Review comment:
       I don't know that `TryAddCallback` will prevent you from calling 
`StopProducing` twice if that is what you are trying to avoid here.  Can 
`StopProducing` be idempotent?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -604,11 +585,90 @@ Result<EnumeratedRecordBatchGenerator> 
AsyncScanner::ScanBatchesUnorderedAsync()
   return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
 }
 
+namespace {
+Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
+    const util::optional<compute::ExecBatch>& batch, const ScanOptions& 
options,
+    const FragmentVector& fragments) {
+  int num_fields = options.projected_schema->num_fields();
+
+  ArrayVector columns(num_fields);
+  for (size_t i = 0; i < columns.size(); ++i) {
+    const Datum& value = batch->values[i];
+    if (value.is_array()) {
+      columns[i] = value.make_array();
+      continue;
+    }
+    ARROW_ASSIGN_OR_RAISE(
+        columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, 
options.pool));
+  }
+
+  EnumeratedRecordBatch out;
+  out.fragment.index = 
batch->values[num_fields].scalar_as<Int32Scalar>().value;
+  out.fragment.value = fragments[out.fragment.index];
+  out.fragment.last = false;  // ignored during reordering
+
+  out.record_batch.index = batch->values[num_fields + 
1].scalar_as<Int32Scalar>().value;
+  out.record_batch.value =
+      RecordBatch::Make(options.projected_schema, batch->length, 
std::move(columns));
+  out.record_batch.last = batch->values[num_fields + 
2].scalar_as<BooleanScalar>().value;
+
+  return out;
+}
+}  // namespace
+
 Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
     internal::Executor* cpu_executor) {
-  ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
-  return ScanBatchesUnorderedAsyncImpl(scan_options_, std::move(fragment_gen),
-                                       cpu_executor);
+  if (!scan_options_->use_threads) {
+    cpu_executor = nullptr;
+  }
+
+  auto exec_context =
+      std::make_shared<compute::ExecContext>(scan_options_->pool, 
cpu_executor);
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, 
compute::ExecPlan::Make(exec_context.get()));
+
+  ARROW_ASSIGN_OR_RAISE(auto scan, MakeScanNode(plan.get(), dataset_, 
scan_options_));
+
+  ARROW_ASSIGN_OR_RAISE(auto filter,
+                        compute::MakeFilterNode(scan, "filter", 
scan_options_->filter));
+
+  auto exprs = scan_options_->projection.call()->arguments;
+  exprs.push_back(compute::field_ref("__fragment_index"));
+  exprs.push_back(compute::field_ref("__batch_index"));
+  exprs.push_back(compute::field_ref("__last_in_fragment"));
+  ARROW_ASSIGN_OR_RAISE(auto project,
+                        compute::MakeProjectNode(filter, "project", 
std::move(exprs)));
+
+  AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen =
+      compute::MakeSinkNode(project, "sink");
+
+  RETURN_NOT_OK(plan->StartProducing());
+
+  auto options = scan_options_;
+  ARROW_ASSIGN_OR_RAISE(auto fragments_it, 
dataset_->GetFragments(scan_options_->filter));
+  ARROW_ASSIGN_OR_RAISE(auto fragments, fragments_it.ToVector());
+  auto shared_fragments = 
std::make_shared<FragmentVector>(std::move(fragments));
+
+  // If the generator is destroyed before being completely drained, inform plan
+  std::shared_ptr<void> stop_producing{

Review comment:
       I don't think this is the first time this has come up.  Maybe a helper 
function `AsyncGenerator<T> WithShutdown(AsyncGenerator<T>, Callback)`

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -282,20 +305,21 @@ struct SourceNode : ExecNode {
 
   void StopProducing(ExecNode* output) override {
     DCHECK_EQ(output, outputs_[0]);
-    {
-      std::unique_lock<std::mutex> lock(mutex_);
-      finished_ = true;
-    }
-    finished_fut_.Wait();
+    StopProducing();
   }
 
-  void StopProducing() override { StopProducing(outputs_[0]); }
+  void StopProducing() override {
+    std::unique_lock<std::mutex> lock(mutex_);
+    stop_requested_ = true;
+  }
+
+  Future<> finished() override { return finished_; }
 
  private:
   std::mutex mutex_;
-  bool finished_{false};
-  int next_batch_index_{0};
-  Future<> finished_fut_ = Future<>::MakeFinished();
+  bool stop_requested_{false};

Review comment:
       Instead of exposing a `StopProducing` you might need to take in a stop 
token.  If you don't then how will you handle the following...
   
   A pyarrow user runs a to_table on some dataset.  First it has to do 
inspection, then it has to actually do the scan.  At any point the user might 
press Ctrl-C to cancel the thing.
   
   We can support it with stop token by setting the Ctrl-C stop token handler 
and then passing that stop token down to the inspection call and the scan call.
   
   As a minor benefit you can get rid of all the locks here because they are 
inside the stop token itself.

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -144,7 +144,14 @@ TEST(ExecPlan, DummyStartProducing) {
   // Note that any correct reverse topological order may do
   ASSERT_THAT(t.started, ElementsAre("sink", "process3", "process2", 
"process1",
                                      "source2", "source1"));
-  ASSERT_EQ(t.stopped.size(), 0);
+
+  plan->StopProducing();
+  plan->finished().Wait();

Review comment:
       There are some helpful macros for working with futures in 
`future_util.h`.  You might prefer `ASSERT_FINISHES_OK` so a bug doesn't hang 
indefinitely




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to