[ 
https://issues.apache.org/jira/browse/SPARK-36443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Yao updated SPARK-36443:
-----------------------------
    Description: 
 
h2. A test case

Use bin/spark-sql with local mode and all other default settings with 3.1.2 to 
run the case below
{code:sql}
// Some comments here
set spark.sql.shuffle.partitions=20;
set spark.sql.adaptive.enabled=true;
-- set spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0; -- (default 
0.2)enable this for not demote bhj
set spark.sql.autoBroadcastJoinThreshold=200;
SELECT
  l.id % 12345 k,
  sum(l.id) sum,
  count(l.id) cnt,
  avg(l.id) avg,
  min(l.id) min,
  max(l.id) max
from (select id % 3 id from range(0, 1e8, 1, 100)) l
  left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id
GROUP BY 1;
{code}
 
 1. demote bhj w/ nonEmptyPartitionRatioForBroadcastJoin comment out

 
| |
||[Job Id 
▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages:
 Succeeded/Total||Tasks (for all stages): Succeeded/Total||
|4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06
 17:31:37|71 ms|1/1 (4 skipped)|3/3 (205 skipped) 
  |
|3|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=3]|2021/08/06
 17:31:18|19 s|1/1 (3 skipped)|4/4 (201 skipped) 
  |
|2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06
 17:31:18|87 ms|1/1 (1 skipped)|1/1 (100 skipped) 
  |
|1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06
 17:31:16|2 s|1/1|100/100 
  |
|0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06
 17:31:15|2 s|1/1|100/100 |

2. set nonEmptyPartitionRatioForBroadcastJoin to 0 to tell spark not to demote 
bhj

 
||[Job Id (Job Group) 
▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id+%28Job+Group%29&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages:
 Succeeded/Total||Tasks (for all stages): Succeeded/Total||
|5|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=5]|2021/08/06
 18:25:15|29 ms|1/1 (2 skipped)|3/3 (200 skipped) 
  |
|4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06
 18:25:13|2 s|1/1 (1 skipped)|100/100 (100 skipped) 
  |
|3 (700fefe1-8446-4761-9be2-b68ed6e84c11)|broadcast exchange (runId 
700fefe1-8446-4761-9be2-b68ed6e84c11)[$anonfun$withThreadLocalCaptured$1 at 
FutureTask.java:266|http://localhost:4040/jobs/job/?id=3]|2021/08/06 
18:25:13|54 ms|1/1 (2 skipped)|1/1 (101 skipped) 
  |
|2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06
 18:25:13|88 ms|1/1 (1 skipped)|1/1 (100 skipped) 
  |
|1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06
 18:25:10|2 s|1/1|100/100 
  |
|0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06
 18:25:10|3 s|1/1|100/100
  |

The clause `select id % 3 id from range(0, 1e8, 1, 100)) l ` here produces 
highly compressed shuffle map output and 17/20 empty partitions at the reduced 
side, where is also the AQE reOptimize point for DynamicJoinSelection.
{code:java}
Exchange

shuffle records written: 100,000,000
shuffle write time total (min, med, max )
891 ms (2 ms, 5 ms, 33 ms )
records read: 100,000,000
local bytes read total (min, med, max )
10.0 MiB (3.3 MiB, 3.4 MiB, 3.4 MiB )
fetch wait time total (min, med, max )
0 ms (0 ms, 0 ms, 0 ms )
remote bytes read: 0.0 B
local blocks read: 300
remote blocks read: 0
data size total (min, med, max )
1525.9 MiB (15.3 MiB, 15.3 MiB, 15.3 MiB )
remote bytes read to disk: 0.0 B
shuffle bytes written total (min, med, max )
10.0 MiB (102.3 KiB, 102.3 KiB, 102.3 KiB )
{code}
 

In the case 1), the bhj is demoted and the `coalesce partitions rule` 
successfully coalesces these 'small' partitions even set 
*spark.sql.adaptive.advisoryPartitionSizeInBytes=1m*. See,

 

!screenshot-1.png!

Then, as you can see at the smj phase, the former coalesce and the latter 
expansion cause performance regression

 
{code:java}
// code placeholder
Sort

sort time total (min, med, max (stageId: taskId))
166 ms (0 ms, 55 ms, 57 ms (stage 7.0: task 203))
peak memory total (min, med, max (stageId: taskId))
315.1 MiB (64.0 KiB, 105.0 MiB, 105.0 MiB (stage 7.0: task 201))
spill size total (min, med, max (stageId: taskId))
1845.0 MiB (0.0 B, 615.0 MiB, 615.0 MiB (stage 7.0: task 201)
{code}
 

 
|1|202|0|SUCCESS| |driver| | |2021-08-06 17:31:18|18 s|4 s|3.0 ms|10.0 ms| | 
|105.3 MiB|1.0 ms|91 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| |
|2|203|0|SUCCESS| |driver| | |2021-08-06 17:31:18|19 s|4 s|4.0 ms|10.0 ms| | 
|105.3 MiB|1.0 ms|89 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| |
|0|201|0|SUCCESS| |driver| | |2021-08-06 17:31:18|17 s|4 s|6.0 ms|10.0 ms| | 
|105.3 MiB|1.0 ms|70 B / 1|3.3 MiB / 33333334|615 MiB|4.4 MiB|

 

In the case 2), the bhj mode increases task numbers which will casue extra 
schedule overhead and running unnecessary empty tasks, but it avoid the oom 
risk and the performance regression described  above.
h2. A real-world case, in which the expansion of the data increases the oom 
risk to a very high level. 

 

 

!image-2021-08-06-11-24-34-122.png!

  was:
 
h2. A test case

Use bin/spark-sql with local mode and all other default settings with 3.1.2 to 
run the case below
{code:sql}
// Some comments here
set spark.sql.shuffle.partitions=20;
set spark.sql.adaptive.enabled=true;
-- set spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0; -- (default 
0.2)enable this for demote bhj
set spark.sql.autoBroadcastJoinThreshold=200;
SELECT
  l.id % 12345 k,
  sum(l.id) sum,
  count(l.id) cnt,
  avg(l.id) avg,
  min(l.id) min,
  max(l.id) max
from (select id % 3 id from range(0, 1e8, 1, 100)) l
  left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id
GROUP BY 1;
{code}
 
 1. demote bhj w/ nonEmptyPartitionRatioForBroadcastJoin comment out

 
| |
||[Job Id 
▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages:
 Succeeded/Total||Tasks (for all stages): Succeeded/Total||
|4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06
 17:31:37|71 ms|1/1 (4 skipped)|3/3 (205 skipped) 
  |
|3|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=3]|2021/08/06
 17:31:18|19 s|1/1 (3 skipped)|4/4 (201 skipped) 
  |
|2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06
 17:31:18|87 ms|1/1 (1 skipped)|1/1 (100 skipped) 
  |
|1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06
 17:31:16|2 s|1/1|100/100 
  |
|0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06
 17:31:15|2 s|1/1|100/100 |

2. set nonEmptyPartitionRatioForBroadcastJoin to 0 to tell spark not to demote 
bhj

 
||[Job Id (Job Group) 
▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id+%28Job+Group%29&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages:
 Succeeded/Total||Tasks (for all stages): Succeeded/Total||
|5|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=5]|2021/08/06
 18:25:15|29 ms|1/1 (2 skipped)|3/3 (200 skipped) 
 |
|4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06
 18:25:13|2 s|1/1 (1 skipped)|100/100 (100 skipped) 
 |
|3 (700fefe1-8446-4761-9be2-b68ed6e84c11)|broadcast exchange (runId 
700fefe1-8446-4761-9be2-b68ed6e84c11)[$anonfun$withThreadLocalCaptured$1 at 
FutureTask.java:266|http://localhost:4040/jobs/job/?id=3]|2021/08/06 
18:25:13|54 ms|1/1 (2 skipped)|1/1 (101 skipped) 
 |
|2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06
 18:25:13|88 ms|1/1 (1 skipped)|1/1 (100 skipped) 
 |
|1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06
 18:25:10|2 s|1/1|100/100 
 |
|0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) 
l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group 
by gid) r ON l.id = r.id GROUP BY 1[main at 
NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06
 18:25:10|3 s|1/1|100/100
 |

The clause `select id % 3 id from range(0, 1e8, 1, 100)) l ` here produces 
highly compressed shuffle map output and 17/20 empty partitions at the reduced 
side, where is also the AQE reOptimize point for DynamicJoinSelection.
{code:java}
Exchange

shuffle records written: 100,000,000
shuffle write time total (min, med, max )
891 ms (2 ms, 5 ms, 33 ms )
records read: 100,000,000
local bytes read total (min, med, max )
10.0 MiB (3.3 MiB, 3.4 MiB, 3.4 MiB )
fetch wait time total (min, med, max )
0 ms (0 ms, 0 ms, 0 ms )
remote bytes read: 0.0 B
local blocks read: 300
remote blocks read: 0
data size total (min, med, max )
1525.9 MiB (15.3 MiB, 15.3 MiB, 15.3 MiB )
remote bytes read to disk: 0.0 B
shuffle bytes written total (min, med, max )
10.0 MiB (102.3 KiB, 102.3 KiB, 102.3 KiB )
{code}
 

In the case 1), the bhj is demoted and the `coalesce partitions rule` 
successfully coalesces these 'small' partitions even set 
*spark.sql.adaptive.advisoryPartitionSizeInBytes=1m*. See,

 

!screenshot-1.png!

Then, as you can see at the smj phase, the former coalesce and the latter 
expansion cause performance regression

 
{code:java}
// code placeholder
Sort

sort time total (min, med, max (stageId: taskId))
166 ms (0 ms, 55 ms, 57 ms (stage 7.0: task 203))
peak memory total (min, med, max (stageId: taskId))
315.1 MiB (64.0 KiB, 105.0 MiB, 105.0 MiB (stage 7.0: task 201))
spill size total (min, med, max (stageId: taskId))
1845.0 MiB (0.0 B, 615.0 MiB, 615.0 MiB (stage 7.0: task 201)
{code}
 

 
|1|202|0|SUCCESS| |driver| | |2021-08-06 17:31:18|18 s|4 s|3.0 ms|10.0 ms| | 
|105.3 MiB|1.0 ms|91 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| |
|2|203|0|SUCCESS| |driver| | |2021-08-06 17:31:18|19 s|4 s|4.0 ms|10.0 ms| | 
|105.3 MiB|1.0 ms|89 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| |
|0|201|0|SUCCESS| |driver| | |2021-08-06 17:31:18|17 s|4 s|6.0 ms|10.0 ms| | 
|105.3 MiB|1.0 ms|70 B / 1|3.3 MiB / 33333334|615 MiB|4.4 MiB|

 

In the case 2), the bhj mode increases task numbers which will casue extra 
schedule overhead and running unnecessary empty tasks, but it avoid the oom 
risk and the performance regression described  above.
h2. A real-world case, in which the expansion of the data increases the oom 
risk to a very high level. 

 

 

!image-2021-08-06-11-24-34-122.png!


> Demote BroadcastJoin causes performance regression and increases OOM risks
> --------------------------------------------------------------------------
>
>                 Key: SPARK-36443
>                 URL: https://issues.apache.org/jira/browse/SPARK-36443
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 3.1.2
>            Reporter: Kent Yao
>            Priority: Major
>         Attachments: image-2021-08-06-11-24-34-122.png, 
> image-2021-08-06-17-57-15-765.png, screenshot-1.png
>
>
>  
> h2. A test case
> Use bin/spark-sql with local mode and all other default settings with 3.1.2 
> to run the case below
> {code:sql}
> // Some comments here
> set spark.sql.shuffle.partitions=20;
> set spark.sql.adaptive.enabled=true;
> -- set spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0; -- 
> (default 0.2)enable this for not demote bhj
> set spark.sql.autoBroadcastJoinThreshold=200;
> SELECT
>   l.id % 12345 k,
>   sum(l.id) sum,
>   count(l.id) cnt,
>   avg(l.id) avg,
>   min(l.id) min,
>   max(l.id) max
> from (select id % 3 id from range(0, 1e8, 1, 100)) l
>   left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) 
> group by gid) r ON l.id = r.id
> GROUP BY 1;
> {code}
>  
>  1. demote bhj w/ nonEmptyPartitionRatioForBroadcastJoin comment out
>  
> | |
> ||[Job Id 
> ▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages:
>  Succeeded/Total||Tasks (for all stages): Succeeded/Total||
> |4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06
>  17:31:37|71 ms|1/1 (4 skipped)|3/3 (205 skipped) 
>   |
> |3|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=3]|2021/08/06
>  17:31:18|19 s|1/1 (3 skipped)|4/4 (201 skipped) 
>   |
> |2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06
>  17:31:18|87 ms|1/1 (1 skipped)|1/1 (100 skipped) 
>   |
> |1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06
>  17:31:16|2 s|1/1|100/100 
>   |
> |0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06
>  17:31:15|2 s|1/1|100/100 |
> 2. set nonEmptyPartitionRatioForBroadcastJoin to 0 to tell spark not to 
> demote bhj
>  
> ||[Job Id (Job Group) 
> ▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id+%28Job+Group%29&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages:
>  Succeeded/Total||Tasks (for all stages): Succeeded/Total||
> |5|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=5]|2021/08/06
>  18:25:15|29 ms|1/1 (2 skipped)|3/3 (200 skipped) 
>   |
> |4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06
>  18:25:13|2 s|1/1 (1 skipped)|100/100 (100 skipped) 
>   |
> |3 (700fefe1-8446-4761-9be2-b68ed6e84c11)|broadcast exchange (runId 
> 700fefe1-8446-4761-9be2-b68ed6e84c11)[$anonfun$withThreadLocalCaptured$1 at 
> FutureTask.java:266|http://localhost:4040/jobs/job/?id=3]|2021/08/06 
> 18:25:13|54 ms|1/1 (2 skipped)|1/1 (101 skipped) 
>   |
> |2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06
>  18:25:13|88 ms|1/1 (1 skipped)|1/1 (100 skipped) 
>   |
> |1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06
>  18:25:10|2 s|1/1|100/100 
>   |
> |0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, 
> min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 
> 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 
> 100) group by gid) r ON l.id = r.id GROUP BY 1[main at 
> NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06
>  18:25:10|3 s|1/1|100/100
>   |
> The clause `select id % 3 id from range(0, 1e8, 1, 100)) l ` here produces 
> highly compressed shuffle map output and 17/20 empty partitions at the 
> reduced side, where is also the AQE reOptimize point for DynamicJoinSelection.
> {code:java}
> Exchange
> shuffle records written: 100,000,000
> shuffle write time total (min, med, max )
> 891 ms (2 ms, 5 ms, 33 ms )
> records read: 100,000,000
> local bytes read total (min, med, max )
> 10.0 MiB (3.3 MiB, 3.4 MiB, 3.4 MiB )
> fetch wait time total (min, med, max )
> 0 ms (0 ms, 0 ms, 0 ms )
> remote bytes read: 0.0 B
> local blocks read: 300
> remote blocks read: 0
> data size total (min, med, max )
> 1525.9 MiB (15.3 MiB, 15.3 MiB, 15.3 MiB )
> remote bytes read to disk: 0.0 B
> shuffle bytes written total (min, med, max )
> 10.0 MiB (102.3 KiB, 102.3 KiB, 102.3 KiB )
> {code}
>  
> In the case 1), the bhj is demoted and the `coalesce partitions rule` 
> successfully coalesces these 'small' partitions even set 
> *spark.sql.adaptive.advisoryPartitionSizeInBytes=1m*. See,
>  
> !screenshot-1.png!
> Then, as you can see at the smj phase, the former coalesce and the latter 
> expansion cause performance regression
>  
> {code:java}
> // code placeholder
> Sort
> sort time total (min, med, max (stageId: taskId))
> 166 ms (0 ms, 55 ms, 57 ms (stage 7.0: task 203))
> peak memory total (min, med, max (stageId: taskId))
> 315.1 MiB (64.0 KiB, 105.0 MiB, 105.0 MiB (stage 7.0: task 201))
> spill size total (min, med, max (stageId: taskId))
> 1845.0 MiB (0.0 B, 615.0 MiB, 615.0 MiB (stage 7.0: task 201)
> {code}
>  
>  
> |1|202|0|SUCCESS| |driver| | |2021-08-06 17:31:18|18 s|4 s|3.0 ms|10.0 ms| | 
> |105.3 MiB|1.0 ms|91 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| |
> |2|203|0|SUCCESS| |driver| | |2021-08-06 17:31:18|19 s|4 s|4.0 ms|10.0 ms| | 
> |105.3 MiB|1.0 ms|89 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| |
> |0|201|0|SUCCESS| |driver| | |2021-08-06 17:31:18|17 s|4 s|6.0 ms|10.0 ms| | 
> |105.3 MiB|1.0 ms|70 B / 1|3.3 MiB / 33333334|615 MiB|4.4 MiB|
>  
> In the case 2), the bhj mode increases task numbers which will casue extra 
> schedule overhead and running unnecessary empty tasks, but it avoid the oom 
> risk and the performance regression described  above.
> h2. A real-world case, in which the expansion of the data increases the oom 
> risk to a very high level. 
>  
>  
> !image-2021-08-06-11-24-34-122.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to