[
https://issues.apache.org/jira/browse/SPARK-36087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-36087:
------------------------------------
Assignee: (was: Apache Spark)
> An Impl of skew key detection and data inflation optimization
> -------------------------------------------------------------
>
> Key: SPARK-36087
> URL: https://issues.apache.org/jira/browse/SPARK-36087
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 3.3.0
> Reporter: zhengruifeng
> Priority: Major
>
> design doc:
> [https://docs.google.com/document/d/1jYGlipQdirhuRR3_x1PflAYdMow7StOHOLUjSw6Vk9s/edit?usp=sharing]
> 1, introduce {{ShuffleExecAccumulator}} in ShuffleExchangeExec to support
> arbitrary statistics;
> 2, impl a key sampling {{ShuffleExecAccumulator}} to detect skew keys and
> show debug info on SparkUI;
> 3, in {{OptimizeSkewedJoin}}, estimate the joined size of each partition
> based on the sampled keys, and split a partition if it is not split yet and
> its estimated joined size is too larger.
>
> data inflation case:
> {code:java}
> spark.conf.set("spark.sql.adaptive.enabled", true)
> spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
> spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes",
> "80")
> spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "80")
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
> spark.conf.set("spark.sql.shuffle.partitions", 10)sc.setLogLevel("INFO")
> val df1 = spark.range(0, 1000000, 1, 9).select(col("id").as("key1"),
> col("id").as("value1"),
> hash(col("id")).mod(100).as("hash1")).withColumn("key1", when(col("hash1")
> === lit(0), lit(0)).otherwise(col("key1")))
> val df2 = spark.range(0, 500000, 1, 7).select(col("id").as("key2"),
> col("id").as("value2"),
> hash(col("id")).mod(100).as("hash2")).withColumn("key2", when(col("hash2")
> === lit(0), lit(0)).otherwise(col("key2")))
> df1.join(df2, col("key1") ===
> col("key2")).write.mode("overwrite").parquet("/tmp/result_0")
> spark.conf.set("spark.sql.adaptive.skewJoin.inflation.enabled", true)
> spark.conf.set("spark.sql.adaptive.skewJoin.inflation.factor", 10)
> spark.conf.set("spark.sql.adaptive.shuffle.sampleSizePerPartition", 100)
> spark.conf.set("spark.sql.adaptive.shuffle.detectSkewness", true)
> df1.join(df2, col("key1") ===
> col("key2")).write.mode("overwrite").parquet("/tmp/result_1"){code}
>
> it also partially resolve https://issues.apache.org/jira/browse/SPARK-35596
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]