[ 
https://issues.apache.org/jira/browse/IMPALA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zoltán Borók-Nagy updated IMPALA-13404:
---------------------------------------
    Description: 
IcebergDeleteBuilder uses a single thread to build the data structures for 
IcebergDeleteNode.

In most cases it is OK as it can run in parallel with the SCAN operators of the 
data files that don't have associated delete files:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
 RUNS   NOT STARTED    RUNS{noformat}
But if that SCAN operator is finished (if the IO is cached SCANs can finish 
quite quickly), and the BUILD is not ready, we are potentially blocking #MT_DOP 
threads:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
FINISHED   BLOCKED     RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL 
and its left side are optimized out:
{noformat}
      ArithmeticExpr(ADD)
      /             \
     /               \
    /                 \
record_count       AGGREGATE
of all             COUNT(*)
datafiles              |
without            ANTI JOIN
deletes           /        BUILD
   ^             /           \
   |            SCAN        SCAN
   |            datafiles   deletes
   |            with deletes    ^
compile-time       ^            |
 constant          |            |
                BLOCKED        RUNS
           (MT_DOP threads)  (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked, 
and dynamically increase its thread count. Possibly some enhancments are 
required on Impala's ThreadPool to allow increasing the number of worker 
threads.

IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It 
cannot directly work on the RowBatch object that it gets from the 
FragmentInstanceState's ExecInternal() function, as it is being released 
immediately after IcebergDeleteBuilder's Send() returns. Unfortunately we 
cannot use RowBatch's AcquireState() either due to a bug, see:

[https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205]

Also, row batches can reference memory of other row batches, which means we 
cannot process and free them in parallel without deep copying them. Since deep 
copying is expensive and needs to happen on the "main" thread of the builder, 
we cannot gain anything from that.

We need to find a way to quickly extract and copy information from the incoming 
row batches then we could insert into the different roaring bitmaps in parallel.

  was:
IcebergDeleteBuilder uses a single thread to build the data structures for 
IcebergDeleteNode.

In most cases it is OK as it can run in parallel with the SCAN operators of the 
data files that don't have associated delete files:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
 RUNS   NOT STARTED    RUNS{noformat}
But if that SCAN operator is finished, and the BUILD is not ready, we are 
potentially blocking #MT_DOP threads:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
FINISHED   BLOCKED     RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL 
and its left side are optimized out:
{noformat}
      ArithmeticExpr(ADD)
      /             \
     /               \
    /                 \
record_count       AGGREGATE
of all             COUNT(*)
datafiles              |
without            ANTI JOIN
deletes           /        BUILD
   ^             /           \
   |            SCAN        SCAN
   |            datafiles   deletes
   |            with deletes    ^
compile-time       ^            |
 constant          |            |
                BLOCKED        RUNS
           (MT_DOP threads)  (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked, 
and dynamically increase its thread count. Possibly some enhancments are 
required on Impala's ThreadPool to allow increasing the number of worker 
threads.

IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It 
cannot directly work on the RowBatch object that it gets from the 
FragmentInstanceState's ExecInternal() function, as it is being released 
immediately after IcebergDeleteBuilder's Send() returns. Unfortunately we 
cannot use RowBatch's AcquireState() either due to a bug, see:

[https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205]

Also, row batches can reference memory of other row batches, which means we 
cannot process and free them in parallel without deep copying them. Since deep 
copying is expensive and needs to happen on the "main" thread of the builder, 
we cannot gain anything from that.

We need to find a way to quickly extract and copy information from the incoming 
row batches then we could insert into the different roaring bitmaps in parallel.


> Implement adaptive parallelization in IcebergDeleteBuilder
> ----------------------------------------------------------
>
>                 Key: IMPALA-13404
>                 URL: https://issues.apache.org/jira/browse/IMPALA-13404
>             Project: IMPALA
>          Issue Type: Improvement
>            Reporter: Zoltán Borók-Nagy
>            Priority: Major
>              Labels: impala-iceberg, performance
>
> IcebergDeleteBuilder uses a single thread to build the data structures for 
> IcebergDeleteNode.
> In most cases it is OK as it can run in parallel with the SCAN operators of 
> the data files that don't have associated delete files:
> {noformat}
>     UNION ALL
>    /         \
>   /           \
>  /             \
> SCAN all    ANTI JOIN
> datafiles  /        BUILD
> without   /           \
> deletes  SCAN         SCAN
>   ^      datafiles    deletes
>   |      with deletes.  ^
>   |          ^          |
>   |          |          |
>  RUNS   NOT STARTED    RUNS{noformat}
> But if that SCAN operator is finished (if the IO is cached SCANs can finish 
> quite quickly), and the BUILD is not ready, we are potentially blocking 
> #MT_DOP threads:
> {noformat}
>     UNION ALL
>    /         \
>   /           \
>  /             \
> SCAN all    ANTI JOIN
> datafiles  /        BUILD
> without   /           \
> deletes  SCAN         SCAN
>   ^      datafiles    deletes
>   |      with deletes.  ^
>   |          ^          |
>   |          |          |
> FINISHED   BLOCKED     RUNS
> {noformat}
> Or, for plain count( * ) queries the situation is even worse, as the UNION 
> ALL and its left side are optimized out:
> {noformat}
>       ArithmeticExpr(ADD)
>       /             \
>      /               \
>     /                 \
> record_count       AGGREGATE
> of all             COUNT(*)
> datafiles              |
> without            ANTI JOIN
> deletes           /        BUILD
>    ^             /           \
>    |            SCAN        SCAN
>    |            datafiles   deletes
>    |            with deletes    ^
> compile-time       ^            |
>  constant          |            |
>                 BLOCKED        RUNS
>            (MT_DOP threads)  (single thread){noformat}
> The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked, 
> and dynamically increase its thread count. Possibly some enhancments are 
> required on Impala's ThreadPool to allow increasing the number of worker 
> threads.
> IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It 
> cannot directly work on the RowBatch object that it gets from the 
> FragmentInstanceState's ExecInternal() function, as it is being released 
> immediately after IcebergDeleteBuilder's Send() returns. Unfortunately we 
> cannot use RowBatch's AcquireState() either due to a bug, see:
> [https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205]
> Also, row batches can reference memory of other row batches, which means we 
> cannot process and free them in parallel without deep copying them. Since 
> deep copying is expensive and needs to happen on the "main" thread of the 
> builder, we cannot gain anything from that.
> We need to find a way to quickly extract and copy information from the 
> incoming row batches then we could insert into the different roaring bitmaps 
> in parallel.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to