[ 
https://issues.apache.org/jira/browse/IMPALA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zoltán Borók-Nagy updated IMPALA-13404:
---------------------------------------
    Description: 
IcebergDeleteBuilder uses a single thread to build the data structures for 
IcebergDeleteNode.

In most cases it is OK as it can run in parallel with the SCAN operators of the 
data files that don't have associated delete files:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
 RUNS   NOT STARTED    RUNS{noformat}
But if that SCAN operator is finished, and the BUILD is not ready, we are 
potentially blocking #MT_DOP threads:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
FINISHED   BLOCKED     RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL 
and its left side are optimized out:
{noformat}
      ArithmeticExpr(ADD)
      /             \
     /               \
    /                 \
record_count       AGGREGATE
of all             COUNT(*)
datafiles              |
without            ANTI JOIN
deletes           /        BUILD
   ^             /           \
   |            SCAN        SCAN
   |            datafiles   deletes
   |            with deletes    ^
compile-time       ^            |
 constant          |            |
                BLOCKED        RUNS
           (MT_DOP threads)  (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked, 
and dynamically increase its thread count. Possibly some enhancments are 
required on Impala's ThreadPool to allow increasing the number of worker 
threads.

IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It 
cannot directly work on the RowBatch object that it gets from the 
FragmentInstanceState's ExecInternal() function, as it is being released 
immediately after IcebergDeleteBuilder's Send() returns. Unfortunately we 
cannot use RowBatch's AcquireState() either due to a bug, see:

[https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205]

Also, row batches can reference memory of other row batches, which means we 
cannot process and free them in parallel without deep copying the incoming row 
batches. Since deep copying is expensive and needs to happen on the "main" 
thread of the builder, we cannot gain anything from that.

We need to find a way to quickly extract and copy information from the incoming 
row batches then we could insert into the different roaring bitmaps in parallel.

  was:
IcebergDeleteBuilder uses a single thread to build the data structures for 
IcebergDeleteNode.

In most cases it is OK as it can run in parallel with the SCAN operators of the 
data files that don't have associated delete files:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
 RUNS   NOT STARTED    RUNS{noformat}
But if that SCAN operator is finished, and the BUILD is not ready, we are 
potentially blocking #MT_DOP threads:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
FINISHED   BLOCKED     RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL 
and its left side are optimized out:
{noformat}
      ArithmeticExpr(ADD)
      /             \
     /               \
    /                 \
record_count       AGGREGATE
of all             COUNT(*)
datafiles              |
without            ANTI JOIN
deletes           /        BUILD
   ^             /           \
   |            SCAN        SCAN
   |            datafiles   deletes
   |            with deletes    ^
compile-time       ^            |
 constant          |            |
                BLOCKED        RUNS
           (MT_DOP threads)  (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked, 
and dynamically increase its thread count. Possibly some enhancments are 
required on Impala's ThreadPool to allow increasing the number of worker 
threads.

IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It 
cannot directly work on the RowBatch object that it gets from the 
FragmentInstanceState's ExecInternal() function, as it is being released 
immediately after IcebergDeleteBuilder's Send() returns. Luckily RowBatch 
already has an AcquireState() method that we can use.

IcebergDeleteBuilder could create a fresh new empty RowBatch object that would 
acquire the state of the incoming RowBatch in Send(), hence 
FragmentInstanceState wouldn't release its resources when Send() returns.

Then ProcessBuildBatch could work on its own RowBatch instance, then invoke 
RowBatch:Release() in the end.

*EDIT*

Unfortunately RowBatch's AcquireState() does not work well until we fix some 
resource ownership issue, see:

https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205

 


> Implement adaptive parallelization in IcebergDeleteBuilder
> ----------------------------------------------------------
>
>                 Key: IMPALA-13404
>                 URL: https://issues.apache.org/jira/browse/IMPALA-13404
>             Project: IMPALA
>          Issue Type: Improvement
>            Reporter: Zoltán Borók-Nagy
>            Priority: Major
>              Labels: impala-iceberg, performance
>
> IcebergDeleteBuilder uses a single thread to build the data structures for 
> IcebergDeleteNode.
> In most cases it is OK as it can run in parallel with the SCAN operators of 
> the data files that don't have associated delete files:
> {noformat}
>     UNION ALL
>    /         \
>   /           \
>  /             \
> SCAN all    ANTI JOIN
> datafiles  /        BUILD
> without   /           \
> deletes  SCAN         SCAN
>   ^      datafiles    deletes
>   |      with deletes.  ^
>   |          ^          |
>   |          |          |
>  RUNS   NOT STARTED    RUNS{noformat}
> But if that SCAN operator is finished, and the BUILD is not ready, we are 
> potentially blocking #MT_DOP threads:
> {noformat}
>     UNION ALL
>    /         \
>   /           \
>  /             \
> SCAN all    ANTI JOIN
> datafiles  /        BUILD
> without   /           \
> deletes  SCAN         SCAN
>   ^      datafiles    deletes
>   |      with deletes.  ^
>   |          ^          |
>   |          |          |
> FINISHED   BLOCKED     RUNS
> {noformat}
> Or, for plain count( * ) queries the situation is even worse, as the UNION 
> ALL and its left side are optimized out:
> {noformat}
>       ArithmeticExpr(ADD)
>       /             \
>      /               \
>     /                 \
> record_count       AGGREGATE
> of all             COUNT(*)
> datafiles              |
> without            ANTI JOIN
> deletes           /        BUILD
>    ^             /           \
>    |            SCAN        SCAN
>    |            datafiles   deletes
>    |            with deletes    ^
> compile-time       ^            |
>  constant          |            |
>                 BLOCKED        RUNS
>            (MT_DOP threads)  (single thread){noformat}
> The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked, 
> and dynamically increase its thread count. Possibly some enhancments are 
> required on Impala's ThreadPool to allow increasing the number of worker 
> threads.
> IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It 
> cannot directly work on the RowBatch object that it gets from the 
> FragmentInstanceState's ExecInternal() function, as it is being released 
> immediately after IcebergDeleteBuilder's Send() returns. Unfortunately we 
> cannot use RowBatch's AcquireState() either due to a bug, see:
> [https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205]
> Also, row batches can reference memory of other row batches, which means we 
> cannot process and free them in parallel without deep copying the incoming 
> row batches. Since deep copying is expensive and needs to happen on the 
> "main" thread of the builder, we cannot gain anything from that.
> We need to find a way to quickly extract and copy information from the 
> incoming row batches then we could insert into the different roaring bitmaps 
> in parallel.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to