Rui Wang created SPARK-41512:
--------------------------------

             Summary: Row count based shuffle read to optimize global limit 
after a single partition shuffle (optionally with input partition sorted)
                 Key: SPARK-41512
                 URL: https://issues.apache.org/jira/browse/SPARK-41512
             Project: Spark
          Issue Type: Task
          Components: SQL
    Affects Versions: 3.4.0
            Reporter: Rui Wang
            Assignee: Rui Wang


h3. Problem Statement

In current Spark optimizer, a single partition shuffle might be created for a 
limit if this limit is not the last non-action operation (e.g. a filter 
following the limit). There is a possibility that the previous output 
partitions before go into this limit are sorted. The single partition shuffle 
approach has a correctness bug in this case: shuffle read partitions could be 
out of partition order and the limit exec just take the first limit rows which 
could lose the order thus result into wrong result. This is a shuffle so it is 
relatively costly. Meanwhile, to correct this bug, a native solution is to sort 
all the data fed into limit again, which is another overhead. 

h3. Proposed idea
So we propose a row count based AQE algorithm that optimizes this problem by 
two folds:

Avoid the extra sort on the shuffle read side (or with the limit exec) to 
achieve the correct result.

Avoid reading all shuffle data from mappers for this single partition shuffle 
to reduce shuffle cost.

Note that 1. is only applied for the sorted partition case where 2. is applied 
for general single partition shuffle + limit case

 

The algorithm works as the following: 

1. Each mapper will record a row count when writing shuffle data.

2. Since this is single shuffle partition case, there is only one partition but 
N mappers.

3. A accumulatorV2 is implemented to collect a list of tuple which records the 
mapping between mapper id and the number of row written by the mapper (row 
count metrics)

4. AQE framework detects a plan shape of shuffle plus a global limit.

5. AQE framework reads necessary data from mappers based on the limit. For 
example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the 
limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, 
thus skip the left mappers.

6. This is both correct for limit with the sorted or non-sorted partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to