Michael Chen created SPARK-44897: ------------------------------------ Summary: Local Property Propagation to Subquery Broadcast Exec Key: SPARK-44897 URL: https://issues.apache.org/jira/browse/SPARK-44897 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.0 Reporter: Michael Chen
https://issues.apache.org/jira/browse/SPARK-32748 was opened and then I believe mistakenly reverted to address this issue. The claim was local properties propagation in SubqueryBroadcastExec to the dynamic pruning thread is not necessary because they will be propagated by broadcast threads anyways. However, in a scenario where the dynamic pruning thread is first to initialize the broadcast relation future, the local properties will not be propagated correctly. This is because the local properties being propagated to the broadcast threads would already be incorrect. I do not have a good way of reproducing this consistently because generally the SubqueryBroadcastExec is not the first to initialize the broadcast relation future, but by adding a Thread.sleep(1) into the doPrepare method of SubqueryBroadcastExec, the following test always fails. {code:java} withSQLConf(StaticSQLConf.SUBQUERY_BROADCAST_MAX_THREAD_THRESHOLD.key -> "1") { withTable("a", "b") { val confKey = "spark.sql.y" val confValue1 = UUID.randomUUID().toString() val confValue2 = UUID.randomUUID().toString() Seq((confValue1, "1")).toDF("key", "value") .write .format("parquet") .partitionBy("key") .mode("overwrite") .saveAsTable("a") val df1 = spark.table("a") def generateBroadcastDataFrame(confKey: String, confValue: String): Dataset[String] = { val df = spark.range(1).mapPartitions { _ => Iterator(TaskContext.get.getLocalProperty(confKey)) }.filter($"value".contains(confValue)).as("c") df.hint("broadcast") } // set local property and assert val df2 = generateBroadcastDataFrame(confKey, confValue1) spark.sparkContext.setLocalProperty(confKey, confValue1) val checkDF = df1.join(df2).where($"a.key" === $"c.value").select($"a.key", $"c.value") val checks = checkDF.collect() assert(checks.forall(_.toSeq == Seq(confValue1, confValue1))) // change local property and re-assert Seq((confValue2, "1")).toDF("key", "value") .write .format("parquet") .partitionBy("key") .mode("overwrite") .saveAsTable("b") val df3 = spark.table("b") val df4 = generateBroadcastDataFrame(confKey, confValue2) spark.sparkContext.setLocalProperty(confKey, confValue2) val checks2DF = df3.join(df4).where($"b.key" === $"c.value").select($"b.key", $"c.value") val checks2 = checks2DF.collect() assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2))) assert(checks2.nonEmpty) } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org