This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 09950de2d [GLUTEN-5414] [VL] Move ArrowFileScanExec class to module 
backends-velox
09950de2d is described below

commit 09950de2dd80090a0bc0fea0631749916614cec1
Author: Jin Chengcheng <[email protected]>
AuthorDate: Mon May 13 09:37:36 2024 +0800

    [GLUTEN-5414] [VL] Move ArrowFileScanExec class to module backends-velox
---
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  | 10 +++++--
 .../gluten/extension/ArrowScanReplaceRule.scala    | 34 ++++++++++++++++++++++
 .../sql/execution/ArrowFileSourceScanExec.scala    |  0
 .../gluten/backendsapi/SparkPlanExecApi.scala      |  2 ++
 .../extension/columnar/MiscColumnarRules.scala     |  4 ++-
 .../extension/columnar/OffloadSingleNode.scala     | 10 +------
 .../scala/org/apache/gluten/utils/PlanUtil.scala   |  6 +++-
 7 files changed, 53 insertions(+), 13 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 772f1cfb2..8d01ab96b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.execution._
 import org.apache.gluten.expression._
 import org.apache.gluten.expression.ConverterUtils.FunctionConfig
 import org.apache.gluten.expression.aggregate.{HLLAdapter, 
VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
-import org.apache.gluten.extension.{BloomFilterMightContainJointRewriteRule, 
CollectRewriteRule, FlushableHashAggregateRule, HLLRewriteRule}
+import org.apache.gluten.extension.{ArrowScanReplaceRule, 
BloomFilterMightContainJointRewriteRule, CollectRewriteRule, 
FlushableHashAggregateRule, HLLRewriteRule}
 import org.apache.gluten.extension.columnar.TransformHints
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode, IfThenNode}
@@ -744,7 +744,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
    * @return
    */
   override def genExtendedColumnarValidationRules(): List[SparkSession => 
Rule[SparkPlan]] = List(
-    BloomFilterMightContainJointRewriteRule.apply
+    BloomFilterMightContainJointRewriteRule.apply,
+    ArrowScanReplaceRule.apply
   )
 
   /**
@@ -849,4 +850,9 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       case other => other
     }
   }
+
+  override def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): 
Boolean = plan match {
+    case _: ArrowFileSourceScanExec => true
+    case _ => false
+  }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
new file mode 100644
index 000000000..2b7c4b1da
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowScanReplaceRule.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.datasource.ArrowCSVFileFormat
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ArrowFileSourceScanExec, 
FileSourceScanExec, SparkPlan}
+
+case class ArrowScanReplaceRule(spark: SparkSession) extends Rule[SparkPlan] {
+  override def apply(plan: SparkPlan): SparkPlan = {
+    plan.transformUp {
+      case plan: FileSourceScanExec if 
plan.relation.fileFormat.isInstanceOf[ArrowCSVFileFormat] =>
+        ArrowFileSourceScanExec(plan)
+      case p => p
+    }
+
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
similarity index 100%
rename from 
gluten-core/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
rename to 
backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index fb2fd961b..8f2ef19f1 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -739,4 +739,6 @@ trait SparkPlanExecApi {
   def genPostProjectForGenerate(generate: GenerateExec): SparkPlan
 
   def maybeCollapseTakeOrderedAndProject(plan: SparkPlan): SparkPlan = plan
+
+  def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = false
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index 068f62e49..08c63000e 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -68,7 +68,9 @@ object MiscColumnarRules {
       case RowToColumnarExec(child) =>
         logDebug(s"ColumnarPostOverrides RowToColumnarExec(${child.getClass})")
         
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(child)
-      case c2r @ ColumnarToRowExec(child) if 
PlanUtil.outputNativeColumnarData(child) =>
+      case c2r @ ColumnarToRowExec(child)
+          if PlanUtil.outputNativeColumnarData(child) &&
+            !PlanUtil.outputNativeColumnarSparkCompatibleData(child) =>
         logDebug(s"ColumnarPostOverrides ColumnarToRowExec(${child.getClass})")
         val nativeC2r = 
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child)
         if (nativeC2r.doValidate().isValid) {
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 067aad32c..84a2ec5c6 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -297,15 +297,7 @@ object OffloadOthers {
   class ReplaceSingleNode() extends LogLevelUtil with Logging {
 
     def doReplace(p: SparkPlan): SparkPlan = {
-      val plan = p match {
-        case plan: FileSourceScanExec
-            if plan.relation.fileFormat.getClass.getSimpleName == 
"ArrowCSVFileFormat" =>
-          val arrowScan = ArrowFileSourceScanExec(plan)
-          TransformHints.tagNotTransformable(arrowScan, "Arrow scan cannot 
transform")
-          return arrowScan
-        case p => p
-      }
-
+      val plan = p
       if (TransformHints.isNotTransformable(plan)) {
         logDebug(s"Columnar Processing for ${plan.getClass} is under row 
guard.")
         plan match {
diff --git a/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala 
b/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
index 610f14c86..4c02687a6 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.utils
 
+import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.GlutenPlan
 
 import org.apache.spark.sql.execution._
@@ -50,12 +51,15 @@ object PlanUtil {
       case s: WholeStageCodegenExec => outputNativeColumnarData(s.child)
       case s: AdaptiveSparkPlanExec => outputNativeColumnarData(s.executedPlan)
       case i: InMemoryTableScanExec => PlanUtil.isGlutenTableCache(i)
-      case _: ArrowFileSourceScanExec => false
       case _: GlutenPlan => true
       case _ => false
     }
   }
 
+  def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = {
+    
BackendsApiManager.getSparkPlanExecApiInstance.outputNativeColumnarSparkCompatibleData(plan)
+  }
+
   def isVanillaColumnarOp(plan: SparkPlan): Boolean = {
     plan match {
       case i: InMemoryTableScanExec =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to