[
https://issues.apache.org/jira/browse/SPARK-44897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan resolved SPARK-44897.
---------------------------------
Fix Version/s: 3.5.0
Resolution: Fixed
Issue resolved by pull request 42587
[https://github.com/apache/spark/pull/42587]
> Local Property Propagation to Subquery Broadcast Exec
> -----------------------------------------------------
>
> Key: SPARK-44897
> URL: https://issues.apache.org/jira/browse/SPARK-44897
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.0
> Reporter: Michael Chen
> Assignee: Michael Chen
> Priority: Major
> Fix For: 3.5.0
>
>
> https://issues.apache.org/jira/browse/SPARK-32748 was opened and then I
> believe mistakenly reverted to address this issue. The claim was local
> properties propagation in SubqueryBroadcastExec to the dynamic pruning thread
> is not necessary because they will be propagated by broadcast threads
> anyways. However, in a scenario where the dynamic pruning thread is first to
> initialize the broadcast relation future, the local properties will not be
> propagated correctly. This is because the local properties being propagated
> to the broadcast threads would already be incorrect.
> I do not have a good way of reproducing this consistently because generally
> the SubqueryBroadcastExec is not the first to initialize the broadcast
> relation future, but by adding a Thread.sleep(1) into the doPrepare method of
> SubqueryBroadcastExec, the following test always fails.
> {code:java}
> withSQLConf(StaticSQLConf.SUBQUERY_BROADCAST_MAX_THREAD_THRESHOLD.key -> "1")
> {
> withTable("a", "b") {
> val confKey = "spark.sql.y"
> val confValue1 = UUID.randomUUID().toString()
> val confValue2 = UUID.randomUUID().toString()
> Seq((confValue1, "1")).toDF("key", "value")
> .write
> .format("parquet")
> .partitionBy("key")
> .mode("overwrite")
> .saveAsTable("a")
> val df1 = spark.table("a")
> def generateBroadcastDataFrame(confKey: String, confValue: String):
> Dataset[String] = {
> val df = spark.range(1).mapPartitions { _ =>
> Iterator(TaskContext.get.getLocalProperty(confKey))
> }.filter($"value".contains(confValue)).as("c")
> df.hint("broadcast")
> }
> // set local property and assert
> val df2 = generateBroadcastDataFrame(confKey, confValue1)
> spark.sparkContext.setLocalProperty(confKey, confValue1)
> val checkDF = df1.join(df2).where($"a.key" ===
> $"c.value").select($"a.key", $"c.value")
> val checks = checkDF.collect()
> assert(checks.forall(_.toSeq == Seq(confValue1, confValue1)))
> // change local property and re-assert
> Seq((confValue2, "1")).toDF("key", "value")
> .write
> .format("parquet")
> .partitionBy("key")
> .mode("overwrite")
> .saveAsTable("b")
> val df3 = spark.table("b")
> val df4 = generateBroadcastDataFrame(confKey, confValue2)
> spark.sparkContext.setLocalProperty(confKey, confValue2)
> val checks2DF = df3.join(df4).where($"b.key" ===
> $"c.value").select($"b.key", $"c.value")
> val checks2 = checks2DF.collect()
> assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2)))
> assert(checks2.nonEmpty)
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]