[
https://issues.apache.org/jira/browse/IMPALA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zoltán Borók-Nagy updated IMPALA-13404:
---------------------------------------
Description:
IcebergDeleteBuilder uses a single thread to build the data structures for
IcebergDeleteNode.
In most cases it is OK as it can run in parallel with the SCAN operators of the
data files that don't have associated delete files:
{noformat}
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / BUILD
without / \
deletes SCAN SCAN
^ datafiles deletes
| with deletes. ^
| ^ |
| | |
RUNS NOT STARTED RUNS{noformat}
But if that SCAN operator is finished (if the IO is cached SCANs can finish
quite quickly), and the BUILD is not ready, we are potentially blocking #MT_DOP
threads:
{noformat}
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / BUILD
without / \
deletes SCAN SCAN
^ datafiles deletes
| with deletes. ^
| ^ |
| | |
FINISHED BLOCKED RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL
and its left side are optimized out:
{noformat}
ArithmeticExpr(ADD)
/ \
/ \
/ \
record_count AGGREGATE
of all COUNT(*)
datafiles |
without ANTI JOIN
deletes / BUILD
^ / \
| SCAN SCAN
| datafiles deletes
| with deletes ^
compile-time ^ |
constant | |
BLOCKED RUNS
(MT_DOP threads) (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked,
and dynamically increase its thread count. Possibly some enhancments are
required on Impala's ThreadPool to allow increasing the number of worker
threads.
IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It
cannot directly work on the RowBatch object that it gets from the
FragmentInstanceState's ExecInternal() function, as it is being released
immediately after IcebergDeleteBuilder's Send() returns. Unfortunately we
cannot use RowBatch's AcquireState() either due to a bug, see:
[https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205]
Also, row batches can reference memory of other row batches, which means we
cannot process and free them in parallel without deep copying them. Since deep
copying is expensive and needs to happen on the "main" thread of the builder,
we cannot gain anything from that.
We need to find a way to quickly extract and copy information from the incoming
row batches then we could insert into the different roaring bitmaps in parallel.
was:
IcebergDeleteBuilder uses a single thread to build the data structures for
IcebergDeleteNode.
In most cases it is OK as it can run in parallel with the SCAN operators of the
data files that don't have associated delete files:
{noformat}
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / BUILD
without / \
deletes SCAN SCAN
^ datafiles deletes
| with deletes. ^
| ^ |
| | |
RUNS NOT STARTED RUNS{noformat}
But if that SCAN operator is finished, and the BUILD is not ready, we are
potentially blocking #MT_DOP threads:
{noformat}
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / BUILD
without / \
deletes SCAN SCAN
^ datafiles deletes
| with deletes. ^
| ^ |
| | |
FINISHED BLOCKED RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL
and its left side are optimized out:
{noformat}
ArithmeticExpr(ADD)
/ \
/ \
/ \
record_count AGGREGATE
of all COUNT(*)
datafiles |
without ANTI JOIN
deletes / BUILD
^ / \
| SCAN SCAN
| datafiles deletes
| with deletes ^
compile-time ^ |
constant | |
BLOCKED RUNS
(MT_DOP threads) (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked,
and dynamically increase its thread count. Possibly some enhancments are
required on Impala's ThreadPool to allow increasing the number of worker
threads.
IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It
cannot directly work on the RowBatch object that it gets from the
FragmentInstanceState's ExecInternal() function, as it is being released
immediately after IcebergDeleteBuilder's Send() returns. Unfortunately we
cannot use RowBatch's AcquireState() either due to a bug, see:
[https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205]
Also, row batches can reference memory of other row batches, which means we
cannot process and free them in parallel without deep copying them. Since deep
copying is expensive and needs to happen on the "main" thread of the builder,
we cannot gain anything from that.
We need to find a way to quickly extract and copy information from the incoming
row batches then we could insert into the different roaring bitmaps in parallel.
> Implement adaptive parallelization in IcebergDeleteBuilder
> ----------------------------------------------------------
>
> Key: IMPALA-13404
> URL: https://issues.apache.org/jira/browse/IMPALA-13404
> Project: IMPALA
> Issue Type: Improvement
> Reporter: Zoltán Borók-Nagy
> Priority: Major
> Labels: impala-iceberg, performance
>
> IcebergDeleteBuilder uses a single thread to build the data structures for
> IcebergDeleteNode.
> In most cases it is OK as it can run in parallel with the SCAN operators of
> the data files that don't have associated delete files:
> {noformat}
> UNION ALL
> / \
> / \
> / \
> SCAN all ANTI JOIN
> datafiles / BUILD
> without / \
> deletes SCAN SCAN
> ^ datafiles deletes
> | with deletes. ^
> | ^ |
> | | |
> RUNS NOT STARTED RUNS{noformat}
> But if that SCAN operator is finished (if the IO is cached SCANs can finish
> quite quickly), and the BUILD is not ready, we are potentially blocking
> #MT_DOP threads:
> {noformat}
> UNION ALL
> / \
> / \
> / \
> SCAN all ANTI JOIN
> datafiles / BUILD
> without / \
> deletes SCAN SCAN
> ^ datafiles deletes
> | with deletes. ^
> | ^ |
> | | |
> FINISHED BLOCKED RUNS
> {noformat}
> Or, for plain count( * ) queries the situation is even worse, as the UNION
> ALL and its left side are optimized out:
> {noformat}
> ArithmeticExpr(ADD)
> / \
> / \
> / \
> record_count AGGREGATE
> of all COUNT(*)
> datafiles |
> without ANTI JOIN
> deletes / BUILD
> ^ / \
> | SCAN SCAN
> | datafiles deletes
> | with deletes ^
> compile-time ^ |
> constant | |
> BLOCKED RUNS
> (MT_DOP threads) (single thread){noformat}
> The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked,
> and dynamically increase its thread count. Possibly some enhancments are
> required on Impala's ThreadPool to allow increasing the number of worker
> threads.
> IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It
> cannot directly work on the RowBatch object that it gets from the
> FragmentInstanceState's ExecInternal() function, as it is being released
> immediately after IcebergDeleteBuilder's Send() returns. Unfortunately we
> cannot use RowBatch's AcquireState() either due to a bug, see:
> [https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205]
> Also, row batches can reference memory of other row batches, which means we
> cannot process and free them in parallel without deep copying them. Since
> deep copying is expensive and needs to happen on the "main" thread of the
> builder, we cannot gain anything from that.
> We need to find a way to quickly extract and copy information from the
> incoming row batches then we could insert into the different roaring bitmaps
> in parallel.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]