[
https://issues.apache.org/jira/browse/SPARK-43157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-43157:
-----------------------------------
Labels: pull-request-available (was: )
> TreeNode tags can become corrupted and hang driver when the dataset is cached
> -----------------------------------------------------------------------------
>
> Key: SPARK-43157
> URL: https://issues.apache.org/jira/browse/SPARK-43157
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.3.0, 3.5.0
> Reporter: Rob Reeves
> Assignee: Rob Reeves
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.4.1, 3.5.0
>
>
> If a cached dataset is used by multiple other datasets materialized in
> separate threads it can corrupt the TreeNode.tags map in any of the cached
> plan nodes. This will hang the driver forever. This happens because
> TreeNode.tags is not thread-safe. How this happens:
> # Multiple datasets are materialized at the same time in different threads
> that reference the same cached dataset
> # AdaptiveSparkPlanExec.onUpdatePlan will call ExplainMode.fromString
> # ExplainUtils uses the TreeNode.tags map to store the operator Id for every
> node in the plan. This is usually okay because the plan is cloned. When there
> is an InMemoryScanExec the InMemoryRelation.cachedPlan is not cloned so
> multiple threads can set the operator Id.
> Making the TreeNode.tags field thread-safe does not solve this problem
> because there is still a correctness issue. The threads may be overwriting
> each other's operator Ids, which could be different.
> Example stack trace of the infinite loop:
> {code:scala}
> scala.collection.mutable.HashTable.resize(HashTable.scala:265)
> scala.collection.mutable.HashTable.addEntry0(HashTable.scala:158)
> scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:170)
> scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
> scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
> scala.collection.mutable.HashMap.put(HashMap.scala:126)
> scala.collection.mutable.HashMap.update(HashMap.scala:131)
> org.apache.spark.sql.catalyst.trees.TreeNode.setTagValue(TreeNode.scala:108)
> org.apache.spark.sql.execution.ExplainUtils$.setOpId$1(ExplainUtils.scala:134)
> …
> org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:662){code}
> Example to show the cachedPlan object is not cloned:
> {code:java}
> import org.apache.spark.sql.execution.SparkPlan
> import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
> import spark.implicits._
> def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
> if (plan.isInstanceOf[InMemoryTableScanExec]) {
> Some(plan.asInstanceOf[InMemoryTableScanExec])
> } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
> None
> } else {
> (plan.subqueries.flatMap(p => findCacheOperator(p)) ++
> plan.children.flatMap(findCacheOperator)).headOption
> }
> }
> val df = spark.range(10).filter($"id" < 100).cache()
> val df1 = df.limit(1)
> val df2 = df.limit(1)
> // Get the cache operator (InMemoryTableScanExec) in each plan
> val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
> val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get
> // Check if InMemoryTableScanExec references point to the same object
> println(plan1.eq(plan2))
> // returns false// Check if InMemoryRelation references point to the same
> object
> println(plan1.relation.eq(plan2.relation))
> // returns false
> // Check if the cached SparkPlan references point to the same object
> println(plan1.relation.cachedPlan.eq(plan2.relation.cachedPlan))
> // returns true
> // This shows that the cloned plan2 still has references to the original
> plan1 {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]