[
https://issues.apache.org/jira/browse/SPARK-33933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-33933:
------------------------------------
Assignee: Apache Spark
> Broadcast timeout happened unexpectedly in AQE
> -----------------------------------------------
>
> Key: SPARK-33933
> URL: https://issues.apache.org/jira/browse/SPARK-33933
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0, 3.0.1
> Reporter: Yu Zhong
> Assignee: Apache Spark
> Priority: Major
>
> In Spark 3.0, when AQE is enabled, there is often broadcast timeout in normal
> queries as below.
>
> {code:java}
> Could not execute broadcast in 300 secs. You can increase the timeout for
> broadcasts via spark.sql.broadcastTimeout or disable broadcast join by
> setting spark.sql.autoBroadcastJoinThreshold to -1
> {code}
>
> This is usually happens when broadcast join(with or without hint) after a
> long running shuffle (more than 5 minutes). By disable AQE, the issues
> disappear.
> The workaround is to increase spark.sql.broadcastTimeout and it works. But
> because the data to broadcast is very small, that doesn't make sense.
> After investigation, the root cause should be like this: when enable AQE, in
> getFinalPhysicalPlan, spark traversal the physical plan bottom up and create
> query stage for materialized part by createQueryStages and materialize those
> new created query stages to submit map stages or broadcasting. When
> ShuffleQueryStage are materializing before BroadcastQueryStage, the map job
> and broadcast job are submitted almost at the same time, but map job will
> hold all the computing resources. If the map job runs slow (when lots of data
> needs to process and the resource is limited), the broadcast job cannot be
> started(and finished) before spark.sql.broadcastTimeout, thus cause whole job
> failed (introduced in SPARK-31475).
> Code to reproduce:
>
> {code:java}
> import java.util.UUID
> import scala.util.Random
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.SparkSession
> val spark = SparkSession.builder()
> .master("local[2]")
> .appName("Test Broadcast").getOrCreate()
> import spark.implicits._
> spark.conf.set("spark.sql.adaptive.enabled", "true")
> val sc = spark.sparkContext
> sc.setLogLevel("INFO")
> val uuid = UUID.randomUUID
> val df = sc.parallelize(Range(0, 10000), 10000).flatMap(x => {
> for (i <- Range(0, 10000 + Random.nextInt(10000)))
> yield (x % 26, x, Random.nextInt(10), UUID.randomUUID.toString)
> }).toDF("index", "part", "pv", "uuid")
> .withColumn("md5", md5($"uuid"))
> val dim_data = Range(0, 26).map(x => (('a' + x).toChar.toString, x))
> val dim = dim_data.toDF("name", "index")
> val result = df.groupBy("index")
> .agg(sum($"pv").alias("pv"), countDistinct("uuid").alias("uv"))
> .join(dim, Seq("index"))
> .collect(){code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]