Zoltán Borók-Nagy created IMPALA-13404:
------------------------------------------
Summary: Implement adaptive parallelization in IcebergDeleteBuilder
Key: IMPALA-13404
URL: https://issues.apache.org/jira/browse/IMPALA-13404
Project: IMPALA
Issue Type: Improvement
Reporter: Zoltán Borók-Nagy
IcebergDeleteBuilder uses a single thread to build the data structures for
IcebergDeleteNode.
In most cases it is OK as it can run in parallel with the SCAN operators of the
data files that don't have associated delete files:
{noformat}
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / BUILD
without / \
deletes SCAN SCAN
^ datafiles deletes
| with deletes. ^
| |
RUNS RUNS{noformat}
But if that SCAN operator is finished, and the BUILD is not ready, we are
potentially blocking #MT_DOP threads:
{noformat}
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / BUILD
without / \
deletes SCAN SCAN
^ datafiles deletes
| with deletes. ^
| ^ |
| | |
FINISHED BLOCKED RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL
and its left side are optimized out:
{noformat}
ArithmeticExpr(ADD)
/ \
/ \
/ \
record_count AGGREGATE
of all COUNT(*)
datafiles |
without ANTI JOIN
deletes / BUILD
(compile-time / \
constant) SCAN SCAN
datafiles deletes
with deletes ^
^ |
| |
BLOCKED RUNS
(MT_DOP threads) (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked,
and dynamically increase its thread count. Possibly some enhancments are
required on Impala's ThreadPool to allow increasing the number of worker
threads.
IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It
cannot directly work on the RowBatch object that it gets from the
FragmentInstanceState's ExecInternal() function, as it is being released
immediately after IcebergDeleteBuilder's Send() returns. Luckily RowBatch
already has an AcquireState() method that we can use.
IcebergDeleteBuilder could create a fresh new empty RowBatch object that would
acquire the state of the incoming RowBatch in Send(), hence
FragmentInstanceState wouldn't release its resources when Send() returns.
Then ProcessBuildBatch could work on its own RowBatch instance, then invoke
RowBatch:Release() in the end.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)