This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 5cc4c5d2912 [SPARK-43157][SQL] Clone InMemoryRelation cached plan to
prevent cloned plan from referencing same objects
5cc4c5d2912 is described below
commit 5cc4c5d2912a999e247f6c6f92c8ba858946594a
Author: Rob Reeves <[email protected]>
AuthorDate: Thu May 18 14:24:05 2023 +0800
[SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned
plan from referencing same objects
### What changes were proposed in this pull request?
This is the most narrow fix for the issue observed in SPARK-43157. It does
not attempt to identify or solve all potential correctness and concurrency
issues from TreeNode.tags being modified in multiple places. It solves the
issue described in SPARK-43157 by cloning the cached plan when populating
`InMemoryRelation.innerChildren`. I chose to do the clone at this point to
limit the scope to tree traversal used for building up the string
representation of the plan, which is where we se [...]
Another solution I tried was to modify `InMemoryRelation.clone` to create a
new `CachedRDDBuilder` and pass in a cloned `cachedPlan`. I opted not to go
with this approach because `CachedRDDBuilder` has mutable state that needs to
be moved to the new object and I didn't want to add that complexity if not
needed.
### Why are the changes needed?
When caching is used the cached part of the SparkPlan is leaked to new
clones of the plan. This leakage is an issue because if the TreeNode.tags are
modified in one plan, it impacts the other plan. This is a correctness issue
and a concurrency issue if the TreeNode.tags are set in different threads for
the cloned plans.
See the description of
[SPARK-43157](https://issues.apache.org/jira/browse/SPARK-43157) for an example
of the concurrency issue.
### Does this PR introduce _any_ user-facing change?
Yes. It fixes a driver hanging issue the user can observe.
### How was this patch tested?
Unit test added and I manually verified `Dataset.explain("formatted")`
still had the expected output.
```scala
spark.range(10).cache.filter($"id" > 5).explain("formatted")
== Physical Plan ==
* Filter (4)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- * Range (3)
(1) InMemoryTableScan
Output [1]: [id#0L]
Arguments: [id#0L], [(id#0L > 5)]
(2) InMemoryRelation
Arguments: [id#0L],
CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer418b946b,StorageLevel(disk,
memory, deserialized, 1 replicas),*(1) Range (0, 10, step=1, splits=16)
,None), [id#0L ASC NULLS FIRST]
(3) Range [codegen id : 1]
Output [1]: [id#0L]
Arguments: Range (0, 10, step=1, splits=Some(16))
(4) Filter [codegen id : 1]
Input [1]: [id#0L]
Condition : (id#0L > 5)
```
I also verified that the `InMemory.innerChildren` is cloned when the entire
plan is cloned.
```scala
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import spark.implicits._
def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
if (plan.isInstanceOf[InMemoryTableScanExec]) {
Some(plan.asInstanceOf[InMemoryTableScanExec])
} else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
None
} else {
(plan.subqueries.flatMap(p => findCacheOperator(p)) ++
plan.children.flatMap(findCacheOperator)).headOption
}
}
val df = spark.range(10).filter($"id" < 100).cache()
val df1 = df.limit(1)
val df2 = df.limit(1)
// Get the cache operator (InMemoryTableScanExec) in each plan
val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get
// Check if InMemoryTableScanExec references point to the same object
println(plan1.eq(plan2))
// returns false// Check if InMemoryRelation references point to the same
object
println(plan1.relation.eq(plan2.relation))
// returns false
// Check if the cached SparkPlan references point to the same object
println(plan1.relation.innerChildren.head.eq(plan2.relation.innerChildren.head))
// returns false
// This shows the issue is fixed
```
Closes #40812 from robreeves/roreeves/explain_util.
Authored-by: Rob Reeves <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 5e5999538899732bf3cdd04b974f1abeb949ccd0)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/execution/columnar/InMemoryRelation.scala | 12 ++++++-
.../execution/columnar/InMemoryRelationSuite.scala | 37 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 98f4a164a22..4df9915dc96 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -378,7 +378,17 @@ case class InMemoryRelation(
@volatile var statsOfPlanToCache: Statistics = null
- override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
+
+ override lazy val innerChildren: Seq[SparkPlan] = {
+ // The cachedPlan needs to be cloned here because it does not get cloned
when SparkPlan.clone is
+ // called. This is a problem because when the explain output is generated
for
+ // a plan it traverses the innerChildren and modifies their TreeNode.tags.
If the plan is not
+ // cloned, there is a thread safety issue in the case that two plans with
a shared cache
+ // operator have explain called at the same time. The cachedPlan cannot be
cloned because
+ // it contains stateful information so we only clone it for the purpose of
generating the
+ // explain output.
+ Seq(cachedPlan.clone())
+ }
override def doCanonicalize(): logical.LogicalPlan =
copy(output = output.map(QueryPlan.normalizeExpressions(_,
cachedPlan.output)),
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala
new file mode 100644
index 00000000000..72b3a4bc109
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSparkSessionBase
+import org.apache.spark.storage.StorageLevel
+
+class InMemoryRelationSuite extends SparkFunSuite with SharedSparkSessionBase {
+ test("SPARK-43157: Clone innerChildren cached plan") {
+ val d = spark.range(1)
+ val relation = InMemoryRelation(StorageLevel.MEMORY_ONLY,
d.queryExecution, None)
+ val cloned = relation.clone().asInstanceOf[InMemoryRelation]
+
+ val relationCachedPlan = relation.innerChildren.head
+ val clonedCachedPlan = cloned.innerChildren.head
+
+ // verify the plans are not the same object but are logically equivalent
+ assert(!relationCachedPlan.eq(clonedCachedPlan))
+ assert(relationCachedPlan === clonedCachedPlan)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]