[
https://issues.apache.org/jira/browse/ARROW-18431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17645836#comment-17645836
]
Pau Garcia Rodriguez commented on ARROW-18431:
----------------------------------------------
I managed to reproduce it with this simple program:
{code:cpp}
#include <iostream>
#include <optional>
#include <vector>
#include <arrow/array.h>
#include <arrow/builder.h>
#include <arrow/compute/api.h>
#include <arrow/compute/api_vector.h>
#include <arrow/compute/exec/exec_plan.h>
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/util/vector.h>
namespace cp = ::arrow::compute;
template<typename TYPE, typename = typename
std::enable_if<arrow::is_number_type<TYPE>::value |
arrow::is_boolean_type<TYPE>::value |
arrow::is_temporal_type<TYPE>::value>::type>
arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(const
std::vector<typename TYPE::c_type> &values)
{
using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
ArrowBuilderType builder;
ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
ARROW_RETURN_NOT_OK(builder.AppendValues(values));
return builder.Finish();
}
class TestBatchReader : public arrow::RecordBatchReader
{
public:
TestBatchReader() : exhausted_{false},
schema_(std::make_shared<arrow::Schema>(arrow::FieldVector{arrow::field("a",
arrow::int32())}))
{}
std::shared_ptr<arrow::Schema> schema() const override
{
return schema_;
}
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch> *batch) override
{
if (not exhausted_)
{
exhausted_ = true;
ARROW_ASSIGN_OR_RAISE(auto data_sample,
GetArrayDataSample<arrow::Int32Type>({0}));
*batch = arrow::RecordBatch::Make(schema_, 1, {data_sample});
}
else
{
*batch = nullptr;
}
return arrow::Status::OK();
}
private:
bool exhausted_;
std::shared_ptr<arrow::Schema> schema_;
};
arrow::Result<std::function<arrow::Future<std::optional<cp::ExecBatch>>()>>
makeSourceGenerator(std::shared_ptr<arrow::RecordBatchReader> stream)
{
ARROW_ASSIGN_OR_RAISE(auto io_executor, arrow::internal::ThreadPool::Make(1));
ARROW_ASSIGN_OR_RAISE(auto soruceGenerator, cp::MakeReaderGenerator(stream,
io_executor.get()));
return [io_executor, soruceGenerator] { return soruceGenerator(); };
}
arrow::Status test()
{
auto test_reader = std::make_shared<TestBatchReader>();
ARROW_ASSIGN_OR_RAISE(auto source_generator,
makeSourceGenerator(test_reader));
auto function_option =
std::make_shared<cp::CountOptions>(cp::CountOptions::CountMode::ALL);
arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
auto sequence = cp::Declaration::Sequence(
{
{"source", cp::SourceNodeOptions{test_reader->schema(),
source_generator}},
{"aggregate", cp::AggregateNodeOptions{{cp::Aggregate{"hash_count",
function_option, "a", "total"}}, {"a"}}},
{"sink", cp::SinkNodeOptions{&sink_gen}}
}
);
ARROW_ASSIGN_OR_RAISE(auto plan, cp::ExecPlan::Make());
ARROW_RETURN_NOT_OK(sequence.AddToPlan(plan.get()));
ARROW_RETURN_NOT_OK(plan->Validate());
std::cout << "Start producing" << std::endl;
ARROW_RETURN_NOT_OK(plan->StartProducing());
bool finished = false;
while (not finished)
{
auto future = sink_gen();
ARROW_ASSIGN_OR_RAISE(auto maybe_batch, future.result());
finished = maybe_batch.has_value();
}
std::cout << "Stop producing" << std::endl;
plan->StopProducing();
std::cout << "Waiting to finish..." << std::endl;
auto future = plan->finished();
future.Wait();
std::cout << "Finished" << std::endl;
return future.status();
}
int main ()
{
bool working = true;
while (working)
{
auto status = test();
working = status.ok();
if (not working)
{
std::cout << "Test failed: " << status.message() << std::endl;
}
}
}
{code}
The program is just an infinite loop that makes and executes the same simple
plan with just a single row of input. Usually after a few iterations the
program enters in a deadlock waiting for the plan to finish.
I hope this helps to find the root cause.
> Acero's Execution Plan never finishes.
> --------------------------------------
>
> Key: ARROW-18431
> URL: https://issues.apache.org/jira/browse/ARROW-18431
> Project: Apache Arrow
> Issue Type: Bug
> Components: C++
> Affects Versions: 10.0.0
> Reporter: Pau Garcia Rodriguez
> Assignee: Weston Pace
> Priority: Major
>
> We have observed that sometimes an execution plan with a small input never
> finishes (the future returned by the ExecPlan::finished() method is never
> marked as finished), even though the generator in the sink node is exhausted
> and has returned nullopt.
> This issue seems to happen at random, the same plan with the same input
> sometimes works (the plan is marked finished) and sometimes it doesn't. Since
> the ExecPlanImpl destructor forces the executing thread to wait for the plan
> to finish (when the plan has not yet finished) we enter in a deadlock waiting
> for a plan that never finishes.
> Since this has only happened with small inputs and not in a deterministic
> way, we believe the issue might be in the ExecPlan::StartProducing method.
> Our hypothesis is that after the plan starts producing on each node, each
> node schedules their tasks and they are immediately finished (due to the
> small input) and somehow the callback that marks the future finished_
> finished is never executed.
>
> {code:java}
> Status StartProducing() {
> ...
> Future<> scheduler_finished =
> util::AsyncTaskScheduler::Make([this(util::AsyncTaskScheduler*
> async_scheduler) {
> ...
> scheduler_finished.AddCallback([this](const Status& st) {
> finished_.MarkFinished(st);});
> ...
> }{code}
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)