[ 
https://issues.apache.org/jira/browse/SPARK-43157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rob Reeves updated SPARK-43157:
-------------------------------
    Description: 
If a cached dataset is used by multiple other datasets materialized in separate 
threads it can corrupt the TreeNode.tags map in any of the cached plan nodes. 
This will hang the driver forever. This happens because TreeNode.tags is not 
thread-safe. How this happens:
 # Multiple datasets are materialized at the same time in different threads 
that reference the same cached dataset
 # AdaptiveSparkPlanExec.onUpdatePlan will call ExplainMode.fromString
 # ExplainUtils uses the TreeNode.tags map to store the operator Id for every 
node in the plan. This is usually okay because the plan is cloned. When there 
is an InMemoryScanExec the InMemoryRelation.cachedPlan is not cloned so 
multiple threads can set the operator Id.

Making the TreeNode.tags field thread-safe does not solve this problem because 
there is still a correctness issue. The threads may be overwriting each other's 
operator Ids, which could be different.

Example stack trace of the infinite loop:
{code:java}
scala.collection.mutable.HashTable.resize(HashTable.scala:265)
scala.collection.mutable.HashTable.addEntry0(HashTable.scala:158)
scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:170)
scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
scala.collection.mutable.HashMap.put(HashMap.scala:126)
scala.collection.mutable.HashMap.update(HashMap.scala:131)
org.apache.spark.sql.catalyst.trees.TreeNode.setTagValue(TreeNode.scala:108)
org.apache.spark.sql.execution.ExplainUtils$.setOpId$1(ExplainUtils.scala:134)
…org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:662){code}
Example to show the cachedPlan object is not cloned:
{code:java}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import spark.implicits._

def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
  if (plan.isInstanceOf[InMemoryTableScanExec]) {
    Some(plan.asInstanceOf[InMemoryTableScanExec])
  } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
    None
  } else {
    (plan.subqueries.flatMap(p => findCacheOperator(p)) ++
      plan.children.flatMap(findCacheOperator)).headOption
  }
}

val df = spark.range(10).filter($"id" < 100).cache()
val df1 = df.limit(1)
val df2 = df.limit(1)// Get the cache operator (InMemoryTableScanExec) in each 
plan
val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get// Check if 
InMemoryTableScanExec references point to the same object

println(plan1.eq(plan2))
// returns false// Check if InMemoryRelation references point to the same object

println(plan1.relation.eq(plan2.relation))
// returns false

// Check if the cached SparkPlan references point to the same object
println(plan1.relation.cachedPlan.eq(plan2.relation.cachedPlan))
// returns true
// This shows that the cloned plan2 still has references to the original plan1 
{code}

  was:
If a cached dataset is used by multiple other datasets materialized in separate 
threads it can corrupt the TreeNode.tags map in any of the cached plan nodes. 
This will hang the driver forever. This happens because TreeNode.tags is not 
thread-safe. How this happens:
 # Multiple datasets are materialized at the same time in different threads 
that reference the same cached dataset
 # AdaptiveSparkPlanExec.onUpdatePlan will call ExplainMode.fromString
 # ExplainUtils uses the TreeNode.tags map to store the operator Id for every 
node in the plan. This is usually okay because the plan is cloned. When there 
is an InMemoryScanExec the InMemoryRelation.cachedPlan is not cloned so 
multiple threads can set the operator Id.

Making the TreeNode.tags field thread-safe does not solve this problem because 
there is still a correctness issue. The threads may be overwriting each other's 
operator Ids, which could be different.

Example stack trace of the infinite loop:
{code:java}
scala.collection.mutable.HashTable.resize(HashTable.scala:265)
scala.collection.mutable.HashTable.addEntry0(HashTable.scala:158)
scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:170)
scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
scala.collection.mutable.HashMap.put(HashMap.scala:126)
scala.collection.mutable.HashMap.update(HashMap.scala:131)
org.apache.spark.sql.catalyst.trees.TreeNode.setTagValue(TreeNode.scala:108)
org.apache.spark.sql.execution.ExplainUtils$.setOpId$1(ExplainUtils.scala:134)
…org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:662){code}
 


> TreeNode tags can become corrupted and hang driver when the dataset is cached
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-43157
>                 URL: https://issues.apache.org/jira/browse/SPARK-43157
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0, 3.5.0
>            Reporter: Rob Reeves
>            Priority: Major
>
> If a cached dataset is used by multiple other datasets materialized in 
> separate threads it can corrupt the TreeNode.tags map in any of the cached 
> plan nodes. This will hang the driver forever. This happens because 
> TreeNode.tags is not thread-safe. How this happens:
>  # Multiple datasets are materialized at the same time in different threads 
> that reference the same cached dataset
>  # AdaptiveSparkPlanExec.onUpdatePlan will call ExplainMode.fromString
>  # ExplainUtils uses the TreeNode.tags map to store the operator Id for every 
> node in the plan. This is usually okay because the plan is cloned. When there 
> is an InMemoryScanExec the InMemoryRelation.cachedPlan is not cloned so 
> multiple threads can set the operator Id.
> Making the TreeNode.tags field thread-safe does not solve this problem 
> because there is still a correctness issue. The threads may be overwriting 
> each other's operator Ids, which could be different.
> Example stack trace of the infinite loop:
> {code:java}
> scala.collection.mutable.HashTable.resize(HashTable.scala:265)
> scala.collection.mutable.HashTable.addEntry0(HashTable.scala:158)
> scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:170)
> scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
> scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
> scala.collection.mutable.HashMap.put(HashMap.scala:126)
> scala.collection.mutable.HashMap.update(HashMap.scala:131)
> org.apache.spark.sql.catalyst.trees.TreeNode.setTagValue(TreeNode.scala:108)
> org.apache.spark.sql.execution.ExplainUtils$.setOpId$1(ExplainUtils.scala:134)
> …org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:662){code}
> Example to show the cachedPlan object is not cloned:
> {code:java}
> import org.apache.spark.sql.execution.SparkPlan
> import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
> import spark.implicits._
> def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
>   if (plan.isInstanceOf[InMemoryTableScanExec]) {
>     Some(plan.asInstanceOf[InMemoryTableScanExec])
>   } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
>     None
>   } else {
>     (plan.subqueries.flatMap(p => findCacheOperator(p)) ++
>       plan.children.flatMap(findCacheOperator)).headOption
>   }
> }
> val df = spark.range(10).filter($"id" < 100).cache()
> val df1 = df.limit(1)
> val df2 = df.limit(1)// Get the cache operator (InMemoryTableScanExec) in 
> each plan
> val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
> val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get// Check if 
> InMemoryTableScanExec references point to the same object
> println(plan1.eq(plan2))
> // returns false// Check if InMemoryRelation references point to the same 
> object
> println(plan1.relation.eq(plan2.relation))
> // returns false
> // Check if the cached SparkPlan references point to the same object
> println(plan1.relation.cachedPlan.eq(plan2.relation.cachedPlan))
> // returns true
> // This shows that the cloned plan2 still has references to the original 
> plan1 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to