zhengruifeng created SPARK-36087:
------------------------------------

             Summary: An Impl of skew key detection and data inflation 
optimization
                 Key: SPARK-36087
                 URL: https://issues.apache.org/jira/browse/SPARK-36087
             Project: Spark
          Issue Type: New Feature
          Components: SQL
    Affects Versions: 3.3.0
            Reporter: zhengruifeng


design doc: 
[https://docs.google.com/document/d/1jYGlipQdirhuRR3_x1PflAYdMow7StOHOLUjSw6Vk9s/edit?usp=sharing]

1, introduce {{ShuffleExecAccumulator}} in ShuffleExchangeExec to support 
arbitrary statistics;

2, impl a key sampling {{ShuffleExecAccumulator}} to detect skew keys and show 
debug info on SparkUI;

3, in {{OptimizeSkewedJoin}}, estimate the joined size of each partition based 
on the sampled keys, and split a partition if it is not split yet and its 
estimated joined size is too larger.

 

data inflation case:
{code:java}
 spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", 
"80")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "80")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
spark.conf.set("spark.sql.shuffle.partitions", 10)sc.setLogLevel("INFO")val df1 
= spark.range(0, 1000000, 1, 9).select(col("id").as("key1"), 
col("id").as("value1"), 
hash(col("id")).mod(100).as("hash1")).withColumn("key1", when(col("hash1") === 
lit(0), lit(0)).otherwise(col("key1")))val df2 = spark.range(0, 500000, 1, 
7).select(col("id").as("key2"), col("id").as("value2"), 
hash(col("id")).mod(100).as("hash2")).withColumn("key2", when(col("hash2") === 
lit(0), lit(0)).otherwise(col("key2")))
df1.join(df2, col("key1") === 
col("key2")).write.mode("overwrite").parquet("/tmp/result_0")
spark.conf.set("spark.sql.adaptive.skewJoin.inflation.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.inflation.factor", 10)
spark.conf.set("spark.sql.adaptive.shuffle.sampleSizePerPartition", 100)
spark.conf.set("spark.sql.adaptive.shuffle.detectSkewness", true)df1.join(df2, 
col("key1") === 
col("key2")).write.mode("overwrite").parquet("/tmp/result_1"){code}
 

it also partially resolve https://issues.apache.org/jira/browse/SPARK-35596

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to