Taepper opened a new issue, #47642:
URL: https://github.com/apache/arrow/issues/47642

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   This can easily be reproduced by creating an `ExecPlan` with an `ExecNode` 
that throws an exception during `StartProducing`. The easiest for this to 
appear is a `SourceNode` with a generator that throws and does not await 
anything.
   
   I believe that this is the relevant code section:
   
   
`/Users/ataeppe/.conan2/p/b/arrowacf6fafc6086d/b/src/cpp/src/arrow/util/async_util.cc:462-473`:
   ```
   Future<> AsyncTaskScheduler::Make(FnOnce<Status(AsyncTaskScheduler*)> 
initial_task,
                                     FnOnce<void(const Status&)> abort_callback,
                                     StopToken stop_token) {
     util::tracing::Span span;
     auto scope = START_SCOPED_SPAN_SV(span, 
"AsyncTaskScheduler::InitialTask"sv);
     auto scheduler = 
std::make_unique<AsyncTaskSchedulerImpl>(std::move(stop_token),
                                                               
std::move(abort_callback));
     Status initial_task_st = std::move(initial_task)(scheduler.get());
     scheduler->OnTaskFinished(std::move(initial_task_st));
     // Keep scheduler alive until finished
     return scheduler->OnFinished().Then([scheduler = std::move(scheduler)] {});
   }
   ```
   
   In debug mode this assert triggers:
   
   ```
     ~AsyncTaskSchedulerImpl() {
       DCHECK_EQ(running_tasks_, 0) << " scheduler destroyed while tasks still 
running";
     }
   ```
   
   In release mode the arrow plan will never be able to destruct as a 
`StopProducing` is not enough to make the future returned by `finished` return 
anymore.
   
   
   This is a minimal reproducable example:
   
   ```
   #include <arrow/acero/exec_plan.h>
   #include <arrow/acero/options.h>
   
   #include <gtest/gtest.h>
   
   using arrow::acero::SourceNodeOptions;
   using arrow::acero::AsyncExecBatchGenerator;
   using arrow::acero::Declaration;
   
   TEST(ReproduceHangingProblem, Test){
      AsyncExecBatchGenerator generator = []() -> 
arrow::Future<std::optional<arrow::ExecBatch>>{
         throw std::runtime_error("some exception");
      };
   
      auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector{});
      SourceNodeOptions source_options{schema, generator};
      Declaration dec("source", source_options);
      auto result = DeclarationToTable(dec);
   }
   ```
   
   ### Component(s)
   
   C++


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to