[ 
https://issues.apache.org/jira/browse/IMPALA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zoltán Borók-Nagy updated IMPALA-13404:
---------------------------------------
    Description: 
IcebergDeleteBuilder uses a single thread to build the data structures for 
IcebergDeleteNode.

In most cases it is OK as it can run in parallel with the SCAN operators of the 
data files that don't have associated delete files:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
 RUNS   NOT STARTED    RUNS{noformat}
But if that SCAN operator is finished, and the BUILD is not ready, we are 
potentially blocking #MT_DOP threads:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
FINISHED   BLOCKED     RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL 
and its left side are optimized out:
{noformat}
      ArithmeticExpr(ADD)
      /             \
     /               \
    /                 \
record_count       AGGREGATE
of all             COUNT(*)
datafiles              |
without            ANTI JOIN
deletes           /        BUILD
   ^             /           \
   |            SCAN        SCAN
   |            datafiles   deletes
   |            with deletes    ^
compile-time       ^            |
 constant          |            |
                BLOCKED        RUNS
           (MT_DOP threads)  (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked, 
and dynamically increase its thread count. Possibly some enhancments are 
required on Impala's ThreadPool to allow increasing the number of worker 
threads.

IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It 
cannot directly work on the RowBatch object that it gets from the 
FragmentInstanceState's ExecInternal() function, as it is being released 
immediately after IcebergDeleteBuilder's Send() returns. Luckily RowBatch 
already has an AcquireState() method that we can use.

IcebergDeleteBuilder could create a fresh new empty RowBatch object that would 
acquire the state of the incoming RowBatch in Send(), hence 
FragmentInstanceState wouldn't release its resources when Send() returns.

Then ProcessBuildBatch could work on its own RowBatch instance, then invoke 
RowBatch:Release() in the end.

*EDIT*

Unfortunately RowBatch's AcquireState() does not work well until we fix some 
resource ownership issue, see:

https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205

 

  was:
IcebergDeleteBuilder uses a single thread to build the data structures for 
IcebergDeleteNode.

In most cases it is OK as it can run in parallel with the SCAN operators of the 
data files that don't have associated delete files:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
 RUNS   NOT STARTED    RUNS{noformat}
But if that SCAN operator is finished, and the BUILD is not ready, we are 
potentially blocking #MT_DOP threads:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
FINISHED   BLOCKED     RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL 
and its left side are optimized out:
{noformat}
      ArithmeticExpr(ADD)
      /             \
     /               \
    /                 \
record_count       AGGREGATE
of all             COUNT(*)
datafiles              |
without            ANTI JOIN
deletes           /        BUILD
   ^             /           \
   |            SCAN        SCAN
   |            datafiles   deletes
   |            with deletes    ^
compile-time       ^            |
 constant          |            |
                BLOCKED        RUNS
           (MT_DOP threads)  (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked, 
and dynamically increase its thread count. Possibly some enhancments are 
required on Impala's ThreadPool to allow increasing the number of worker 
threads.

IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It 
cannot directly work on the RowBatch object that it gets from the 
FragmentInstanceState's ExecInternal() function, as it is being released 
immediately after IcebergDeleteBuilder's Send() returns. Luckily RowBatch 
already has an AcquireState() method that we can use.

IcebergDeleteBuilder could create a fresh new empty RowBatch object that would 
acquire the state of the incoming RowBatch in Send(), hence 
FragmentInstanceState wouldn't release its resources when Send() returns.

Then ProcessBuildBatch could work on its own RowBatch instance, then invoke 
RowBatch:Release() in the end.


> Implement adaptive parallelization in IcebergDeleteBuilder
> ----------------------------------------------------------
>
>                 Key: IMPALA-13404
>                 URL: https://issues.apache.org/jira/browse/IMPALA-13404
>             Project: IMPALA
>          Issue Type: Improvement
>            Reporter: Zoltán Borók-Nagy
>            Priority: Major
>              Labels: impala-iceberg, performance
>
> IcebergDeleteBuilder uses a single thread to build the data structures for 
> IcebergDeleteNode.
> In most cases it is OK as it can run in parallel with the SCAN operators of 
> the data files that don't have associated delete files:
> {noformat}
>     UNION ALL
>    /         \
>   /           \
>  /             \
> SCAN all    ANTI JOIN
> datafiles  /        BUILD
> without   /           \
> deletes  SCAN         SCAN
>   ^      datafiles    deletes
>   |      with deletes.  ^
>   |          ^          |
>   |          |          |
>  RUNS   NOT STARTED    RUNS{noformat}
> But if that SCAN operator is finished, and the BUILD is not ready, we are 
> potentially blocking #MT_DOP threads:
> {noformat}
>     UNION ALL
>    /         \
>   /           \
>  /             \
> SCAN all    ANTI JOIN
> datafiles  /        BUILD
> without   /           \
> deletes  SCAN         SCAN
>   ^      datafiles    deletes
>   |      with deletes.  ^
>   |          ^          |
>   |          |          |
> FINISHED   BLOCKED     RUNS
> {noformat}
> Or, for plain count( * ) queries the situation is even worse, as the UNION 
> ALL and its left side are optimized out:
> {noformat}
>       ArithmeticExpr(ADD)
>       /             \
>      /               \
>     /                 \
> record_count       AGGREGATE
> of all             COUNT(*)
> datafiles              |
> without            ANTI JOIN
> deletes           /        BUILD
>    ^             /           \
>    |            SCAN        SCAN
>    |            datafiles   deletes
>    |            with deletes    ^
> compile-time       ^            |
>  constant          |            |
>                 BLOCKED        RUNS
>            (MT_DOP threads)  (single thread){noformat}
> The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked, 
> and dynamically increase its thread count. Possibly some enhancments are 
> required on Impala's ThreadPool to allow increasing the number of worker 
> threads.
> IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It 
> cannot directly work on the RowBatch object that it gets from the 
> FragmentInstanceState's ExecInternal() function, as it is being released 
> immediately after IcebergDeleteBuilder's Send() returns. Luckily RowBatch 
> already has an AcquireState() method that we can use.
> IcebergDeleteBuilder could create a fresh new empty RowBatch object that 
> would acquire the state of the incoming RowBatch in Send(), hence 
> FragmentInstanceState wouldn't release its resources when Send() returns.
> Then ProcessBuildBatch could work on its own RowBatch instance, then invoke 
> RowBatch:Release() in the end.
> *EDIT*
> Unfortunately RowBatch's AcquireState() does not work well until we fix some 
> resource ownership issue, see:
> https://github.com/apache/impala/blob/8fea75cb5ce206ad071859bb331fa4811573cf4b/be/src/exec/nested-loop-join-builder.cc#L197-L205
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to