This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 09950de2d [GLUTEN-5414] [VL] Move ArrowFileScanExec class to module
backends-velox
09950de2d is described below
commit 09950de2dd80090a0bc0fea0631749916614cec1
Author: Jin Chengcheng <[email protected]>
AuthorDate: Mon May 13 09:37:36 2024 +0800
[GLUTEN-5414] [VL] Move ArrowFileScanExec class to module backends-velox
---
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 10 +++++--
.../gluten/extension/ArrowScanReplaceRule.scala | 34 ++++++++++++++++++++++
.../sql/execution/ArrowFileSourceScanExec.scala | 0
.../gluten/backendsapi/SparkPlanExecApi.scala | 2 ++
.../extension/columnar/MiscColumnarRules.scala | 4 ++-
.../extension/columnar/OffloadSingleNode.scala | 10 +------
.../scala/org/apache/gluten/utils/PlanUtil.scala | 6 +++-
7 files changed, 53 insertions(+), 13 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 772f1cfb2..8d01ab96b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.expression.ConverterUtils.FunctionConfig
import org.apache.gluten.expression.aggregate.{HLLAdapter,
VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
-import org.apache.gluten.extension.{BloomFilterMightContainJointRewriteRule,
CollectRewriteRule, FlushableHashAggregateRule, HLLRewriteRule}
+import org.apache.gluten.extension.{ArrowScanReplaceRule,
BloomFilterMightContainJointRewriteRule, CollectRewriteRule,
FlushableHashAggregateRule, HLLRewriteRule}
import org.apache.gluten.extension.columnar.TransformHints
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode, IfThenNode}
@@ -744,7 +744,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
* @return
*/
override def genExtendedColumnarValidationRules(): List[SparkSession =>
Rule[SparkPlan]] = List(
- BloomFilterMightContainJointRewriteRule.apply
+ BloomFilterMightContainJointRewriteRule.apply,
+ ArrowScanReplaceRule.apply
)
/**
@@ -849,4 +850,9 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
case other => other
}
}
+
+ override def outputNativeColumnarSparkCompatibleData(plan: SparkPlan):
Boolean = plan match {
+ case _: ArrowFileSourceScanExec => true
+ case _ => false
+ }
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
new file mode 100644
index 000000000..2b7c4b1da
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.datasource.ArrowCSVFileFormat
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ArrowFileSourceScanExec,
FileSourceScanExec, SparkPlan}
+
+case class ArrowScanReplaceRule(spark: SparkSession) extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ plan.transformUp {
+ case plan: FileSourceScanExec if
plan.relation.fileFormat.isInstanceOf[ArrowCSVFileFormat] =>
+ ArrowFileSourceScanExec(plan)
+ case p => p
+ }
+
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
rename to
backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index fb2fd961b..8f2ef19f1 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -739,4 +739,6 @@ trait SparkPlanExecApi {
def genPostProjectForGenerate(generate: GenerateExec): SparkPlan
def maybeCollapseTakeOrderedAndProject(plan: SparkPlan): SparkPlan = plan
+
+ def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = false
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index 068f62e49..08c63000e 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -68,7 +68,9 @@ object MiscColumnarRules {
case RowToColumnarExec(child) =>
logDebug(s"ColumnarPostOverrides RowToColumnarExec(${child.getClass})")
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(child)
- case c2r @ ColumnarToRowExec(child) if
PlanUtil.outputNativeColumnarData(child) =>
+ case c2r @ ColumnarToRowExec(child)
+ if PlanUtil.outputNativeColumnarData(child) &&
+ !PlanUtil.outputNativeColumnarSparkCompatibleData(child) =>
logDebug(s"ColumnarPostOverrides ColumnarToRowExec(${child.getClass})")
val nativeC2r =
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child)
if (nativeC2r.doValidate().isValid) {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 067aad32c..84a2ec5c6 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -297,15 +297,7 @@ object OffloadOthers {
class ReplaceSingleNode() extends LogLevelUtil with Logging {
def doReplace(p: SparkPlan): SparkPlan = {
- val plan = p match {
- case plan: FileSourceScanExec
- if plan.relation.fileFormat.getClass.getSimpleName ==
"ArrowCSVFileFormat" =>
- val arrowScan = ArrowFileSourceScanExec(plan)
- TransformHints.tagNotTransformable(arrowScan, "Arrow scan cannot
transform")
- return arrowScan
- case p => p
- }
-
+ val plan = p
if (TransformHints.isNotTransformable(plan)) {
logDebug(s"Columnar Processing for ${plan.getClass} is under row
guard.")
plan match {
diff --git a/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
b/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
index 610f14c86..4c02687a6 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.utils
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.GlutenPlan
import org.apache.spark.sql.execution._
@@ -50,12 +51,15 @@ object PlanUtil {
case s: WholeStageCodegenExec => outputNativeColumnarData(s.child)
case s: AdaptiveSparkPlanExec => outputNativeColumnarData(s.executedPlan)
case i: InMemoryTableScanExec => PlanUtil.isGlutenTableCache(i)
- case _: ArrowFileSourceScanExec => false
case _: GlutenPlan => true
case _ => false
}
}
+ def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = {
+
BackendsApiManager.getSparkPlanExecApiInstance.outputNativeColumnarSparkCompatibleData(plan)
+ }
+
def isVanillaColumnarOp(plan: SparkPlan): Boolean = {
plan match {
case i: InMemoryTableScanExec =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]