[
https://issues.apache.org/jira/browse/IMPALA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zoltán Borók-Nagy updated IMPALA-13404:
---------------------------------------
Description:
IcebergDeleteBuilder uses a single thread to build the data structures for
IcebergDeleteNode.
In most cases it is OK as it can run in parallel with the SCAN operators of the
data files that don't have associated delete files:
{noformat}
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / BUILD
without / \
deletes SCAN SCAN
^ datafiles deletes
| with deletes. ^
| ^ |
| | |
RUNS NOT STARTED RUNS{noformat}
But if that SCAN operator is finished, and the BUILD is not ready, we are
potentially blocking #MT_DOP threads:
{noformat}
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / BUILD
without / \
deletes SCAN SCAN
^ datafiles deletes
| with deletes. ^
| ^ |
| | |
FINISHED BLOCKED RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL
and its left side are optimized out:
{noformat}
ArithmeticExpr(ADD)
/ \
/ \
/ \
record_count AGGREGATE
of all COUNT(*)
datafiles |
without ANTI JOIN
deletes / BUILD
^ / \
| SCAN SCAN
| datafiles deletes
| with deletes ^
compile-time ^ |
constant | |
BLOCKED RUNS
(MT_DOP threads) (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked,
and dynamically increase its thread count. Possibly some enhancments are
required on Impala's ThreadPool to allow increasing the number of worker
threads.
IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It
cannot directly work on the RowBatch object that it gets from the
FragmentInstanceState's ExecInternal() function, as it is being released
immediately after IcebergDeleteBuilder's Send() returns. Unfortunately we
cannot use RowBatch's AcquireState() either due to a bug, see:
[https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205]
Also, row batches can reference memory of other row batches, which means we
cannot process and free them in parallel without deep copying the incoming row
batches. Since deep copying is expensive and needs to happen on the "main"
thread of the builder, we cannot gain anything from that.
We need to find a way to quickly extract and copy information from the incoming
row batches then we could insert into the different roaring bitmaps in parallel.
was:
IcebergDeleteBuilder uses a single thread to build the data structures for
IcebergDeleteNode.
In most cases it is OK as it can run in parallel with the SCAN operators of the
data files that don't have associated delete files:
{noformat}
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / BUILD
without / \
deletes SCAN SCAN
^ datafiles deletes
| with deletes. ^
| ^ |
| | |
RUNS NOT STARTED RUNS{noformat}
But if that SCAN operator is finished, and the BUILD is not ready, we are
potentially blocking #MT_DOP threads:
{noformat}
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / BUILD
without / \
deletes SCAN SCAN
^ datafiles deletes
| with deletes. ^
| ^ |
| | |
FINISHED BLOCKED RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL
and its left side are optimized out:
{noformat}
ArithmeticExpr(ADD)
/ \
/ \
/ \
record_count AGGREGATE
of all COUNT(*)
datafiles |
without ANTI JOIN
deletes / BUILD
^ / \
| SCAN SCAN
| datafiles deletes
| with deletes ^
compile-time ^ |
constant | |
BLOCKED RUNS
(MT_DOP threads) (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked,
and dynamically increase its thread count. Possibly some enhancments are
required on Impala's ThreadPool to allow increasing the number of worker
threads.
IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It
cannot directly work on the RowBatch object that it gets from the
FragmentInstanceState's ExecInternal() function, as it is being released
immediately after IcebergDeleteBuilder's Send() returns. Luckily RowBatch
already has an AcquireState() method that we can use.
IcebergDeleteBuilder could create a fresh new empty RowBatch object that would
acquire the state of the incoming RowBatch in Send(), hence
FragmentInstanceState wouldn't release its resources when Send() returns.
Then ProcessBuildBatch could work on its own RowBatch instance, then invoke
RowBatch:Release() in the end.
*EDIT*
Unfortunately RowBatch's AcquireState() does not work well until we fix some
resource ownership issue, see:
https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205
> Implement adaptive parallelization in IcebergDeleteBuilder
> ----------------------------------------------------------
>
> Key: IMPALA-13404
> URL: https://issues.apache.org/jira/browse/IMPALA-13404
> Project: IMPALA
> Issue Type: Improvement
> Reporter: Zoltán Borók-Nagy
> Priority: Major
> Labels: impala-iceberg, performance
>
> IcebergDeleteBuilder uses a single thread to build the data structures for
> IcebergDeleteNode.
> In most cases it is OK as it can run in parallel with the SCAN operators of
> the data files that don't have associated delete files:
> {noformat}
> UNION ALL
> / \
> / \
> / \
> SCAN all ANTI JOIN
> datafiles / BUILD
> without / \
> deletes SCAN SCAN
> ^ datafiles deletes
> | with deletes. ^
> | ^ |
> | | |
> RUNS NOT STARTED RUNS{noformat}
> But if that SCAN operator is finished, and the BUILD is not ready, we are
> potentially blocking #MT_DOP threads:
> {noformat}
> UNION ALL
> / \
> / \
> / \
> SCAN all ANTI JOIN
> datafiles / BUILD
> without / \
> deletes SCAN SCAN
> ^ datafiles deletes
> | with deletes. ^
> | ^ |
> | | |
> FINISHED BLOCKED RUNS
> {noformat}
> Or, for plain count( * ) queries the situation is even worse, as the UNION
> ALL and its left side are optimized out:
> {noformat}
> ArithmeticExpr(ADD)
> / \
> / \
> / \
> record_count AGGREGATE
> of all COUNT(*)
> datafiles |
> without ANTI JOIN
> deletes / BUILD
> ^ / \
> | SCAN SCAN
> | datafiles deletes
> | with deletes ^
> compile-time ^ |
> constant | |
> BLOCKED RUNS
> (MT_DOP threads) (single thread){noformat}
> The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked,
> and dynamically increase its thread count. Possibly some enhancments are
> required on Impala's ThreadPool to allow increasing the number of worker
> threads.
> IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It
> cannot directly work on the RowBatch object that it gets from the
> FragmentInstanceState's ExecInternal() function, as it is being released
> immediately after IcebergDeleteBuilder's Send() returns. Unfortunately we
> cannot use RowBatch's AcquireState() either due to a bug, see:
> [https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205]
> Also, row batches can reference memory of other row batches, which means we
> cannot process and free them in parallel without deep copying the incoming
> row batches. Since deep copying is expensive and needs to happen on the
> "main" thread of the builder, we cannot gain anything from that.
> We need to find a way to quickly extract and copy information from the
> incoming row batches then we could insert into the different roaring bitmaps
> in parallel.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]