[ 
https://issues.apache.org/jira/browse/SPARK-36087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-36087:
------------------------------------

    Assignee: Apache Spark

> An Impl of skew key detection and data inflation optimization
> -------------------------------------------------------------
>
>                 Key: SPARK-36087
>                 URL: https://issues.apache.org/jira/browse/SPARK-36087
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: zhengruifeng
>            Assignee: Apache Spark
>            Priority: Major
>
> design doc: 
> [https://docs.google.com/document/d/1jYGlipQdirhuRR3_x1PflAYdMow7StOHOLUjSw6Vk9s/edit?usp=sharing]
> 1, introduce {{ShuffleExecAccumulator}} in ShuffleExchangeExec to support 
> arbitrary statistics;
> 2, impl a key sampling {{ShuffleExecAccumulator}} to detect skew keys and 
> show debug info on SparkUI;
> 3, in {{OptimizeSkewedJoin}}, estimate the joined size of each partition 
> based on the sampled keys, and split a partition if it is not split yet and 
> its estimated joined size is too larger.
>  
> data inflation case:
> {code:java}
> spark.conf.set("spark.sql.adaptive.enabled", true)
> spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
> spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", 
> "80")
> spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "80")
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
> spark.conf.set("spark.sql.shuffle.partitions", 10)sc.setLogLevel("INFO")
> val df1 = spark.range(0, 1000000, 1, 9).select(col("id").as("key1"), 
> col("id").as("value1"), 
> hash(col("id")).mod(100).as("hash1")).withColumn("key1", when(col("hash1") 
> === lit(0), lit(0)).otherwise(col("key1")))
> val df2 = spark.range(0, 500000, 1, 7).select(col("id").as("key2"), 
> col("id").as("value2"), 
> hash(col("id")).mod(100).as("hash2")).withColumn("key2", when(col("hash2") 
> === lit(0), lit(0)).otherwise(col("key2")))
> df1.join(df2, col("key1") === 
> col("key2")).write.mode("overwrite").parquet("/tmp/result_0")
> spark.conf.set("spark.sql.adaptive.skewJoin.inflation.enabled", true)
> spark.conf.set("spark.sql.adaptive.skewJoin.inflation.factor", 10)
> spark.conf.set("spark.sql.adaptive.shuffle.sampleSizePerPartition", 100)
> spark.conf.set("spark.sql.adaptive.shuffle.detectSkewness", true)
> df1.join(df2, col("key1") === 
> col("key2")).write.mode("overwrite").parquet("/tmp/result_1"){code}
>  
> it also partially resolve https://issues.apache.org/jira/browse/SPARK-35596
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to