Michael Chen created SPARK-44897:
------------------------------------

             Summary: Local Property Propagation to Subquery Broadcast Exec
                 Key: SPARK-44897
                 URL: https://issues.apache.org/jira/browse/SPARK-44897
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.4.0
            Reporter: Michael Chen


https://issues.apache.org/jira/browse/SPARK-32748 was opened and then I believe 
mistakenly reverted to address this issue. The claim was local properties 
propagation in SubqueryBroadcastExec to the dynamic pruning thread is not 
necessary because they will be propagated by broadcast threads anyways. 
However, in a scenario where the dynamic pruning thread is first to initialize 
the broadcast relation future, the local properties will not be propagated 
correctly. This is because the local properties being propagated to the 
broadcast threads would already be incorrect.
I do not have a good way of reproducing this consistently because generally the 
SubqueryBroadcastExec is not the first to initialize the broadcast relation 
future, but by adding a Thread.sleep(1) into the doPrepare method of 
SubqueryBroadcastExec, the following test always fails.
{code:java}
withSQLConf(StaticSQLConf.SUBQUERY_BROADCAST_MAX_THREAD_THRESHOLD.key -> "1") {
  withTable("a", "b") {
    val confKey = "spark.sql.y"
    val confValue1 = UUID.randomUUID().toString()
    val confValue2 = UUID.randomUUID().toString()
    Seq((confValue1, "1")).toDF("key", "value")
      .write
      .format("parquet")
      .partitionBy("key")
      .mode("overwrite")
      .saveAsTable("a")
    val df1 = spark.table("a")

    def generateBroadcastDataFrame(confKey: String, confValue: String): 
Dataset[String] = {
      val df = spark.range(1).mapPartitions { _ =>
        Iterator(TaskContext.get.getLocalProperty(confKey))
      }.filter($"value".contains(confValue)).as("c")
      df.hint("broadcast")
    }

    // set local property and assert
    val df2 = generateBroadcastDataFrame(confKey, confValue1)
    spark.sparkContext.setLocalProperty(confKey, confValue1)
    val checkDF = df1.join(df2).where($"a.key" === $"c.value").select($"a.key", 
$"c.value")
    val checks = checkDF.collect()
    assert(checks.forall(_.toSeq == Seq(confValue1, confValue1)))

    // change local property and re-assert
    Seq((confValue2, "1")).toDF("key", "value")
      .write
      .format("parquet")
      .partitionBy("key")
      .mode("overwrite")
      .saveAsTable("b")
    val df3 = spark.table("b")
    val df4 = generateBroadcastDataFrame(confKey, confValue2)
    spark.sparkContext.setLocalProperty(confKey, confValue2)
    val checks2DF = df3.join(df4).where($"b.key" === 
$"c.value").select($"b.key", $"c.value")
    val checks2 = checks2DF.collect()
    assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2)))
    assert(checks2.nonEmpty)
  }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to