Zoltán Borók-Nagy created IMPALA-13404:
------------------------------------------

             Summary: Implement adaptive parallelization in IcebergDeleteBuilder
                 Key: IMPALA-13404
                 URL: https://issues.apache.org/jira/browse/IMPALA-13404
             Project: IMPALA
          Issue Type: Improvement
            Reporter: Zoltán Borók-Nagy


IcebergDeleteBuilder uses a single thread to build the data structures for 
IcebergDeleteNode.

In most cases it is OK as it can run in parallel with the SCAN operators of the 
data files that don't have associated delete files:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |                     |
 RUNS                  RUNS{noformat}
But if that SCAN operator is finished, and the BUILD is not ready, we are 
potentially blocking #MT_DOP threads:
{noformat}
    UNION ALL
   /         \
  /           \
 /             \
SCAN all    ANTI JOIN
datafiles  /        BUILD
without   /           \
deletes  SCAN         SCAN
  ^      datafiles    deletes
  |      with deletes.  ^
  |          ^          |
  |          |          |
FINISHED   BLOCKED     RUNS
{noformat}
Or, for plain count( * ) queries the situation is even worse, as the UNION ALL 
and its left side are optimized out:
{noformat}
      ArithmeticExpr(ADD)
      /             \
     /               \
    /                 \
record_count       AGGREGATE
of all             COUNT(*)
datafiles              |
without            ANTI JOIN
deletes           /        BUILD
(compile-time    /           \
 constant)      SCAN        SCAN
                datafiles   deletes
                with deletes    ^
                   ^            |
                   |            |
                BLOCKED        RUNS
           (MT_DOP threads)  (single thread){noformat}
The IcebergDeleteBuilder could keep track LHS SCAN threads are being blocked, 
and dynamically increase its thread count. Possibly some enhancments are 
required on Impala's ThreadPool to allow increasing the number of worker 
threads.

IcebergDeleteBuilder's ProcessBuildBatch could be executed in parallel. It 
cannot directly work on the RowBatch object that it gets from the 
FragmentInstanceState's ExecInternal() function, as it is being released 
immediately after IcebergDeleteBuilder's Send() returns. Luckily RowBatch 
already has an AcquireState() method that we can use.

IcebergDeleteBuilder could create a fresh new empty RowBatch object that would 
acquire the state of the incoming RowBatch in Send(), hence 
FragmentInstanceState wouldn't release its resources when Send() returns.

Then ProcessBuildBatch could work on its own RowBatch instance, then invoke 
RowBatch:Release() in the end.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to