[ 
https://issues.apache.org/jira/browse/ARROW-18431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17645836#comment-17645836
 ] 

Pau Garcia Rodriguez commented on ARROW-18431:
----------------------------------------------

I managed to reproduce it with this simple program:
{code:cpp}
#include <iostream>
#include <optional>
#include <vector>

#include <arrow/array.h>
#include <arrow/builder.h>
#include <arrow/compute/api.h>
#include <arrow/compute/api_vector.h>
#include <arrow/compute/exec/exec_plan.h>
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/util/vector.h>

namespace cp = ::arrow::compute;

template<typename TYPE, typename = typename 
std::enable_if<arrow::is_number_type<TYPE>::value |                             
                                                         
arrow::is_boolean_type<TYPE>::value |                                           
                                                             
arrow::is_temporal_type<TYPE>::value>::type>
arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(const 
std::vector<typename TYPE::c_type> &values)
{
  using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;

  ArrowBuilderType builder;

  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
  ARROW_RETURN_NOT_OK(builder.AppendValues(values));

  return builder.Finish();
}

class TestBatchReader : public arrow::RecordBatchReader
{
  public:
    TestBatchReader() : exhausted_{false}, 
schema_(std::make_shared<arrow::Schema>(arrow::FieldVector{arrow::field("a", 
arrow::int32())}))
    {}

    std::shared_ptr<arrow::Schema> schema() const override
    {
      return schema_;
    }

    arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch> *batch) override
    {
      if (not exhausted_) 
      {
        exhausted_ = true;
        ARROW_ASSIGN_OR_RAISE(auto data_sample, 
GetArrayDataSample<arrow::Int32Type>({0}));
        *batch = arrow::RecordBatch::Make(schema_, 1, {data_sample});
      }
      else
      {
        *batch = nullptr;
      }

      return arrow::Status::OK();
    }

  private:
    bool exhausted_;
    std::shared_ptr<arrow::Schema> schema_;
};

arrow::Result<std::function<arrow::Future<std::optional<cp::ExecBatch>>()>>
    makeSourceGenerator(std::shared_ptr<arrow::RecordBatchReader> stream)
{
  ARROW_ASSIGN_OR_RAISE(auto io_executor, arrow::internal::ThreadPool::Make(1));
  ARROW_ASSIGN_OR_RAISE(auto soruceGenerator, cp::MakeReaderGenerator(stream, 
io_executor.get()));

  return [io_executor, soruceGenerator] { return soruceGenerator(); };
}


arrow::Status test()
{
  auto test_reader = std::make_shared<TestBatchReader>();
  ARROW_ASSIGN_OR_RAISE(auto source_generator, 
makeSourceGenerator(test_reader));

  auto function_option = 
std::make_shared<cp::CountOptions>(cp::CountOptions::CountMode::ALL);
  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

  auto sequence = cp::Declaration::Sequence(
    {
      {"source", cp::SourceNodeOptions{test_reader->schema(), 
source_generator}},
      {"aggregate", cp::AggregateNodeOptions{{cp::Aggregate{"hash_count", 
function_option, "a", "total"}}, {"a"}}},
      {"sink", cp::SinkNodeOptions{&sink_gen}}
    }
  );

  ARROW_ASSIGN_OR_RAISE(auto plan, cp::ExecPlan::Make());

  ARROW_RETURN_NOT_OK(sequence.AddToPlan(plan.get()));
  ARROW_RETURN_NOT_OK(plan->Validate());

  std::cout << "Start producing" << std::endl;
  ARROW_RETURN_NOT_OK(plan->StartProducing());

  bool finished = false;
  while (not finished)
  {
    auto future = sink_gen();
    ARROW_ASSIGN_OR_RAISE(auto maybe_batch, future.result());

    finished = maybe_batch.has_value();
  }

  std::cout << "Stop producing" << std::endl;
  plan->StopProducing();

  std::cout << "Waiting to finish..." << std::endl;
  auto future = plan->finished();
  future.Wait();

  std::cout << "Finished" << std::endl;
  return future.status(); 
}


int main ()
{
  bool working = true;
  while (working)
  {
    auto status = test();

    working = status.ok();
    if (not working)
    {
      std::cout << "Test failed: " << status.message() << std::endl;
    }
  }
}
{code}
 

The program is just an infinite loop that makes and executes the same simple 
plan with just a single row of input. Usually after a few iterations the 
program enters in a deadlock waiting for the plan to finish.

I hope this helps to find the root cause.

> Acero's Execution Plan never finishes.
> --------------------------------------
>
>                 Key: ARROW-18431
>                 URL: https://issues.apache.org/jira/browse/ARROW-18431
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: C++
>    Affects Versions: 10.0.0
>            Reporter: Pau Garcia Rodriguez
>            Assignee: Weston Pace
>            Priority: Major
>
> We have observed that sometimes an execution plan with a small input never 
> finishes (the future returned by the ExecPlan::finished() method is never 
> marked as finished), even though the generator in the sink node is exhausted 
> and has returned nullopt.
> This issue seems to happen at random, the same plan with the same input 
> sometimes works (the plan is marked finished) and sometimes it doesn't. Since 
> the ExecPlanImpl destructor forces the executing thread to wait for the plan 
> to finish (when the plan has not yet finished) we enter in a deadlock waiting 
> for a plan that never finishes.
> Since this has only happened with small inputs and not in a deterministic 
> way, we believe the issue might be in the ExecPlan::StartProducing method.
> Our hypothesis is that after the plan starts producing on each node, each 
> node schedules their tasks and they are  immediately finished (due to the 
> small input) and somehow the callback that marks the future finished_ 
> finished is never executed.
>  
> {code:java}
> Status StartProducing() {
>   ...
>   Future<> scheduler_finished =   
> util::AsyncTaskScheduler::Make([this(util::AsyncTaskScheduler* 
> async_scheduler) {
>   ...
>   scheduler_finished.AddCallback([this](const Status& st) { 
> finished_.MarkFinished(st);});
> ...
> }{code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to