zhengruifeng created SPARK-36087:
------------------------------------
Summary: An Impl of skew key detection and data inflation
optimization
Key: SPARK-36087
URL: https://issues.apache.org/jira/browse/SPARK-36087
Project: Spark
Issue Type: New Feature
Components: SQL
Affects Versions: 3.3.0
Reporter: zhengruifeng
design doc:
[https://docs.google.com/document/d/1jYGlipQdirhuRR3_x1PflAYdMow7StOHOLUjSw6Vk9s/edit?usp=sharing]
1, introduce {{ShuffleExecAccumulator}} in ShuffleExchangeExec to support
arbitrary statistics;
2, impl a key sampling {{ShuffleExecAccumulator}} to detect skew keys and show
debug info on SparkUI;
3, in {{OptimizeSkewedJoin}}, estimate the joined size of each partition based
on the sampled keys, and split a partition if it is not split yet and its
estimated joined size is too larger.
data inflation case:
{code:java}
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes",
"80")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "80")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
spark.conf.set("spark.sql.shuffle.partitions", 10)sc.setLogLevel("INFO")val df1
= spark.range(0, 1000000, 1, 9).select(col("id").as("key1"),
col("id").as("value1"),
hash(col("id")).mod(100).as("hash1")).withColumn("key1", when(col("hash1") ===
lit(0), lit(0)).otherwise(col("key1")))val df2 = spark.range(0, 500000, 1,
7).select(col("id").as("key2"), col("id").as("value2"),
hash(col("id")).mod(100).as("hash2")).withColumn("key2", when(col("hash2") ===
lit(0), lit(0)).otherwise(col("key2")))
df1.join(df2, col("key1") ===
col("key2")).write.mode("overwrite").parquet("/tmp/result_0")
spark.conf.set("spark.sql.adaptive.skewJoin.inflation.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.inflation.factor", 10)
spark.conf.set("spark.sql.adaptive.shuffle.sampleSizePerPartition", 100)
spark.conf.set("spark.sql.adaptive.shuffle.detectSkewness", true)df1.join(df2,
col("key1") ===
col("key2")).write.mode("overwrite").parquet("/tmp/result_1"){code}
it also partially resolve https://issues.apache.org/jira/browse/SPARK-35596
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]