This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 99c60a8ae3 [GLUTEN-9901][VL] RAS: DistinguishIdenticalScans rule to
distinguish identical scans (#9915)
99c60a8ae3 is described below
commit 99c60a8ae3dabb8b517433afe1fc74960c420f6f
Author: Boxuan Li <[email protected]>
AuthorDate: Sun Jun 29 19:09:47 2025 -0700
[GLUTEN-9901][VL] RAS: DistinguishIdenticalScans rule to distinguish
identical scans (#9915)
---
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 1 +
.../apache/spark/sql/execution/SparkPlanUtil.scala | 39 +++++++++++
.../columnar/DistinguishIdenticalScans.scala | 79 ++++++++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 4 ++
.../sql/execution/GlutenStreamingQuerySuite.scala | 22 ++++++
5 files changed, 145 insertions(+)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 48e5ef1a43..c750db1b9a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -184,6 +184,7 @@ object VeloxRuleApi {
c => RasOffload.Rule(offload, validatorBuilder(new
GlutenConfig(c.sqlConf)), rewrites)))
// Gluten RAS: Post rules.
+ injector.injectPostTransform(_ => DistinguishIdenticalScans)
injector.injectPostTransform(_ =>
AppendBatchResizeForShuffleInputAndOutput())
injector.injectPostTransform(_ => RemoveTransitions)
injector.injectPostTransform(_ => UnionTransformerRule())
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkPlanUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkPlanUtil.scala
new file mode 100644
index 0000000000..11deb4825d
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkPlanUtil.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.trees.TreeNode
+
+import java.lang.reflect.Method
+
+/**
+ * This is a hack to access withNewChildrenInternal method of TreeNode, which
is a protected method
+ */
+object SparkPlanUtil {
+ def withNewChildrenInternal(plan: SparkPlan, children:
IndexedSeq[SparkPlan]): SparkPlan = {
+ // 1. Get the Method object for the protected method
+ val method: Method =
+ classOf[TreeNode[_]].getDeclaredMethod("withNewChildrenInternal",
classOf[IndexedSeq[_]])
+
+ // 2. Make it accessible, bypassing the 'protected' modifier
+ method.setAccessible(true)
+
+ // 3. Invoke the method on the specific instance of the SparkPlan
+ // and cast the result to the expected type SparkPlan.
+ method.invoke(plan, children).asInstanceOf[SparkPlan]
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/DistinguishIdenticalScans.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/DistinguishIdenticalScans.scala
new file mode 100644
index 0000000000..4ab259c24a
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/DistinguishIdenticalScans.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, RDDScanExec,
SparkPlan, SparkPlanUtil}
+import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2ScanExecBase}
+
+import scala.collection.mutable
+
+object DistinguishIdenticalScans extends Rule[SparkPlan] {
+
+ override def apply(plan: SparkPlan): SparkPlan = {
+ val seenScans = mutable.Set[SparkPlan]()
+
+ def traverse(node: SparkPlan): (SparkPlan, Boolean) = {
+ // 1. Recursively traverse children first (bottom-up).
+ var childrenChanged = false
+ val newChildren = node.children.map {
+ child =>
+ val (newChild, childChanged) = traverse(child)
+ if (childChanged) {
+ childrenChanged = true
+ }
+ newChild
+ }
+
+ // 2. Rebuild the parent node ONLY if a child was changed.
+ val nodeWithNewChildren = if (childrenChanged) {
+ SparkPlanUtil.withNewChildrenInternal(node, newChildren.toIndexedSeq)
+ } else {
+ node
+ }
+
+ // 3. Process the current node (after its children are finalized).
+ val (finalNode, currentNodeChanged) = nodeWithNewChildren match {
+ case scan: FileSourceScanExec => distinguish(scan)
+ case scan: BatchScanExec => distinguish(scan)
+ case scan: DataSourceV2ScanExecBase => distinguish(scan)
+ case scan: RDDScanExec => distinguish(scan)
+ // If it's not a scan, it doesn't change at this step.
+ case other => (other, false)
+ }
+
+ // 4. The final 'changed' status is true if EITHER the children changed
OR
+ // the current node changed.
+ val overallChanged = childrenChanged || currentNodeChanged
+ (finalNode, overallChanged)
+ }
+
+ def distinguish[T <: SparkPlan](scan: T): (T, Boolean) = {
+ if (seenScans.contains(scan)) {
+ // A clone was made, so return the new object and 'true'.
+ (scan.clone().asInstanceOf[T], true)
+ } else {
+ // First time seeing this scan. Add it to the set and signal no change.
+ seenScans.add(scan)
+ (scan, false)
+ }
+ }
+
+ val (newPlan, _) = traverse(plan)
+ newPlan
+ }
+}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 512d4b6bb9..43153a6974 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -957,6 +957,10 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Independent Batched Python UDFs and Scalar Pandas UDFs should be
combined separately")
.exclude("Dependent Batched Python UDFs and Scalar Pandas UDFs should not
be combined")
.exclude("Python UDF should not break column pruning/filter pushdown --
Parquet V2")
+ enableSuite[GlutenStreamingQuerySuite]
+ // requires test resources that don't exist in Gluten repo
+ .exclude("detect escaped path and report the migration guide")
+ .exclude("ignore the escaped path check when the flag is off")
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
new file mode 100644
index 0000000000..d09576908f
--- /dev/null
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.GlutenSQLTestsTrait
+import org.apache.spark.sql.streaming._
+
+class GlutenStreamingQuerySuite extends StreamingQuerySuite with
GlutenSQLTestsTrait {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]