[ 
https://issues.apache.org/jira/browse/ARROW-18431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17645665#comment-17645665
 ] 

Pau Garcia Rodriguez commented on ARROW-18431:
----------------------------------------------

Hello, thank you for your quick answer.

 

The explicit plan we use is as follows:
{code:java}
ExecPlan with 3 nodes:
:SinkNode{}
  :GroupByNode{keys=["bearer.cell"], aggregates=[
        hash_sum(interference-events-samples),
  ]}
    :SourceNode{} {code}
The SourceNode is using a readerGenerator from a RecordBatchReader. This last 
reader comes from a dataset scanner (the dataset just contains a single parquet 
file).

We are using the dataset scanner instead of the scan node because in earlier 
versions we saw it was faster than using the scan node. I believe this has 
changed and there's been improvements in recent versions and maybe we should 
just use the scan node again.

As a matter of fact, analysing the OpenTelemetry traces I've seen another plan 
printed:
{code:java}
        plan: ExecPlan with 4 nodes:
:SinkNode{}
  :ProjectNode{projection=[bearer.cell, interference-events-samples, 
__fragment_index, __batch_index, __last_in_fragment, __filename]}
    :FilterNode{filter=((timestamp >= 2017-05-05 06:00:00.000000000) and 
(timestamp <= 2017-05-05 08:59:49.999999000))}
      :SourceNode{} {code}
Could it be that the dataset scanner implementation is in fact another Acero 
plan ? (I have no idea of the implementation  details sorry). If so, we could 
just use a single plan with the scan node and forget the dataset scanner.

As for the parquet file we are using, the metadata header is as follows:
{code:java}
File Name: data_1661435498522945115.29477.parquet
Version: 2.6
Created By: parquet-cpp-arrow version 9.0.0
Total rows: 470
Number of RowGroups: 1
Number of Real Columns: 12
Number of Columns: 12
Number of Selected Columns: 12
Column 0: whateverthisis (BYTE_ARRAY / String / UTF8)
Column 1: bearer.cell (BYTE_ARRAY / String / UTF8)
Column 2: timestamp (INT64 / Timestamp(isAdjustedToUTC=true, 
timeUnit=nanoseconds, is_from_converted_type=false, 
force_set_converted_type=false))
Column 3: bearer.rat (BYTE_ARRAY / String / UTF8)
Column 4: cqi (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
Column 5: cqi-samples (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
Column 6: ta (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
Column 7: ta-samples (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
Column 8: interference-events-samples (INT64 / Int(bitWidth=64, isSigned=true) 
/ INT_64)
Column 9: poor_signal-events-samples (INT64 / Int(bitWidth=64, isSigned=true) / 
INT_64)
Column 10: ping_pong-events-samples (INT64 / Int(bitWidth=64, isSigned=true) / 
INT_64)
Column 11: events-samples (INT64 / Int(bitWidth=64, isSigned=true) / INT_64)
--- Row Group: 0 ---
--- Total Bytes: 44362 ---
--- Total Compressed Bytes: 20399 ---
--- Rows: 470 --- {code}
As you can see it's only 470 rows, so the execution plan finishes very quickly.

 

I hope this information helps. From our side we'll replace the dataset scanner 
and record batch reader shenanigans with the scan node to see if the deadlock 
does not occur or at least have a cleaner code.

 

Best regards,

Pau.

> Acero's Execution Plan never finishes.
> --------------------------------------
>
>                 Key: ARROW-18431
>                 URL: https://issues.apache.org/jira/browse/ARROW-18431
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: C++
>    Affects Versions: 10.0.0
>            Reporter: Pau Garcia Rodriguez
>            Assignee: Weston Pace
>            Priority: Major
>
> We have observed that sometimes an execution plan with a small input never 
> finishes (the future returned by the ExecPlan::finished() method is never 
> marked as finished), even though the generator in the sink node is exhausted 
> and has returned nullopt.
> This issue seems to happen at random, the same plan with the same input 
> sometimes works (the plan is marked finished) and sometimes it doesn't. Since 
> the ExecPlanImpl destructor forces the executing thread to wait for the plan 
> to finish (when the plan has not yet finished) we enter in a deadlock waiting 
> for a plan that never finishes.
> Since this has only happened with small inputs and not in a deterministic 
> way, we believe the issue might be in the ExecPlan::StartProducing method.
> Our hypothesis is that after the plan starts producing on each node, each 
> node schedules their tasks and they are  immediately finished (due to the 
> small input) and somehow the callback that marks the future finished_ 
> finished is never executed.
>  
> {code:java}
> Status StartProducing() {
>   ...
>   Future<> scheduler_finished =   
> util::AsyncTaskScheduler::Make([this(util::AsyncTaskScheduler* 
> async_scheduler) {
>   ...
>   scheduler_finished.AddCallback([this](const Status& st) { 
> finished_.MarkFinished(st);});
> ...
> }{code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to