This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c2bf8f04b1 [VL] Delta support / Hudi support as Gluten components 
(#8282)
c2bf8f04b1 is described below

commit c2bf8f04b1e990e4873e2b4e9a283c1379582a78
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Dec 20 16:47:51 2024 +0800

    [VL] Delta support / Hudi support as Gluten components (#8282)
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  | 23 +++++++
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |  1 -
 ...org.apache.gluten.component.VeloxDeltaComponent |  0
 .../gluten/component/VeloxDeltaComponent.scala     | 56 +++++++++++++++
 .../org.apache.gluten.component.VeloxHudiComponent |  0
 .../gluten/component/VeloxHudiComponent.scala      | 50 ++++++++++++++
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 24 ++++++-
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  2 -
 .../memtarget/spark/RegularMemoryConsumer.java     |  2 +-
 ...ten.execution.DataSourceScanTransformerRegister |  1 -
 ...uten.extension.columnar.RewriteTransformerRules |  1 -
 ...formerProvider.scala => OffloadDeltaScan.scala} | 15 ++--
 ...erRules.scala => DeltaPostTransformRules.scala} | 13 ++--
 ...ten.execution.DataSourceScanTransformerRegister |  1 -
 ...sformerProvider.scala => OffloadHudiScan.scala} | 20 ++++--
 .../gluten/backendsapi/BackendSettingsApi.scala    |  6 ++
 .../execution/BatchScanExecTransformer.scala       |  9 +--
 .../DataSourceScanTransformerRegister.scala        | 55 ---------------
 .../execution/FileSourceScanExecTransformer.scala  | 10 +--
 .../gluten/execution/ScanTransformerFactory.scala  | 79 +++++-----------------
 .../extension/columnar/RewriteTransformer.scala    | 53 ---------------
 gluten-ut/common/pom.xml                           |  1 -
 gluten-ut/spark32/pom.xml                          |  1 -
 pom.xml                                            |  9 +--
 24 files changed, 206 insertions(+), 226 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 1aaf801860..5c5032981d 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -25,6 +25,7 @@ import org.apache.gluten.execution.WriteFilesExecTransformer
 import org.apache.gluten.expression.WindowFunctionsBuilder
 import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionFunc}
+import org.apache.gluten.substrait.rel.LocalFilesNode
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
 
@@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.connector.read.Scan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.datasources.FileFormat
@@ -198,6 +200,27 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
     }
   }
 
+  override def getSubstraitReadFileFormatV1(
+      fileFormat: FileFormat): LocalFilesNode.ReadFileFormat = {
+    fileFormat.getClass.getSimpleName match {
+      case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
+      case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
+      case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat
+      case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat
+      case "CSVFileFormat" => ReadFileFormat.TextReadFormat
+      case _ => ReadFileFormat.UnknownFormat
+    }
+  }
+
+  override def getSubstraitReadFileFormatV2(scan: Scan): 
LocalFilesNode.ReadFileFormat = {
+    scan.getClass.getSimpleName match {
+      case "OrcScan" => ReadFileFormat.OrcReadFormat
+      case "ParquetScan" => ReadFileFormat.ParquetReadFormat
+      case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
+      case _ => ReadFileFormat.UnknownFormat
+    }
+  }
+
   override def supportWriteFilesExec(
       format: FileFormat,
       fields: Array[StructField],
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 32961c21a2..05a50396fc 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -93,7 +93,6 @@ object CHRuleApi {
 
     // Legacy: Post-transform rules.
     injector.injectPostTransform(_ => PruneNestedColumnsInHiveTableScan)
-    injector.injectPostTransform(c => 
intercept(RewriteTransformer.apply(c.session)))
     injector.injectPostTransform(_ => PushDownFilterToScan)
     injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
     injector.injectPostTransform(_ => EnsureLocalSortRequirements)
diff --git 
a/backends-velox/src-delta/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxDeltaComponent
 
b/backends-velox/src-delta/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxDeltaComponent
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
 
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
new file mode 100644
index 0000000000..3557c3dfaa
--- /dev/null
+++ 
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.component
+
+import org.apache.gluten.backendsapi.velox.VeloxBackend
+import org.apache.gluten.execution.OffloadDeltaScan
+import org.apache.gluten.extension.DeltaPostTransformRules
+import org.apache.gluten.extension.columnar.enumerated.RasOffload
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.validator.Validators
+import org.apache.gluten.extension.injector.Injector
+
+import org.apache.spark.sql.execution.FileSourceScanExec
+
+class VeloxDeltaComponent extends Component {
+  override def name(): String = "velox-delta"
+  override def buildInfo(): Component.BuildInfo =
+    Component.BuildInfo("VeloxDelta", "N/A", "N/A", "N/A")
+  override def dependencies(): Seq[Class[_ <: Component]] = 
classOf[VeloxBackend] :: Nil
+  override def injectRules(injector: Injector): Unit = {
+    val legacy = injector.gluten.legacy
+    val ras = injector.gluten.ras
+    legacy.injectTransform {
+      c =>
+        val offload = Seq(OffloadDeltaScan())
+        HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, 
offload), offload)
+    }
+    ras.injectRasRule {
+      c =>
+        RasOffload.Rule(
+          RasOffload.from[FileSourceScanExec](OffloadDeltaScan()),
+          Validators.newValidator(c.glutenConf),
+          Nil)
+    }
+    DeltaPostTransformRules.rules.foreach {
+      r =>
+        legacy.injectPostTransform(_ => r)
+        ras.injectPostTransform(_ => r)
+    }
+  }
+}
diff --git 
a/backends-velox/src-hudi/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxHudiComponent
 
b/backends-velox/src-hudi/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxHudiComponent
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
 
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
new file mode 100644
index 0000000000..c9eeabcdfe
--- /dev/null
+++ 
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.component
+
+import org.apache.gluten.backendsapi.velox.VeloxBackend
+import org.apache.gluten.execution.OffloadHudiScan
+import org.apache.gluten.extension.columnar.enumerated.RasOffload
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.validator.Validators
+import org.apache.gluten.extension.injector.Injector
+
+import org.apache.spark.sql.execution.FileSourceScanExec
+
+class VeloxHudiComponent extends Component {
+  override def name(): String = "velox-hudi"
+  override def buildInfo(): Component.BuildInfo =
+    Component.BuildInfo("VeloxHudi", "N/A", "N/A", "N/A")
+  override def dependencies(): Seq[Class[_ <: Component]] = 
classOf[VeloxBackend] :: Nil
+  override def injectRules(injector: Injector): Unit = {
+    val legacy = injector.gluten.legacy
+    val ras = injector.gluten.ras
+    legacy.injectTransform {
+      c =>
+        val offload = Seq(OffloadHudiScan())
+        HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, 
offload), offload)
+    }
+    ras.injectRasRule {
+      c =>
+        RasOffload.Rule(
+          RasOffload.from[FileSourceScanExec](OffloadHudiScan()),
+          Validators.newValidator(c.glutenConf),
+          Nil)
+    }
+  }
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index b485763e25..bc335792d1 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -27,6 +27,7 @@ import org.apache.gluten.expression.WindowFunctionsBuilder
 import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionFunc}
 import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.LocalFilesNode
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, 
OrcReadFormat, ParquetReadFormat}
 import org.apache.gluten.utils._
@@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
CumeDist, DenseRank, De
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
ApproximatePercentile, Percentile}
 import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
+import org.apache.spark.sql.connector.read.Scan
 import org.apache.spark.sql.execution.{ColumnarCachedBatchSerializer, 
SparkPlan}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
@@ -173,7 +175,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
             }
             validateTypes(typeValidator)
           }
-        case _ => Some(s"Unsupported file format for $format.")
+        case _ => Some(s"Unsupported file format $format.")
       }
     }
 
@@ -194,6 +196,26 @@ object VeloxBackendSettings extends BackendSettingsApi {
       .toSeq
   }
 
+  override def getSubstraitReadFileFormatV1(
+      fileFormat: FileFormat): LocalFilesNode.ReadFileFormat = {
+    fileFormat.getClass.getSimpleName match {
+      case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
+      case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
+      case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat
+      case "CSVFileFormat" => ReadFileFormat.TextReadFormat
+      case _ => ReadFileFormat.UnknownFormat
+    }
+  }
+
+  override def getSubstraitReadFileFormatV2(scan: Scan): 
LocalFilesNode.ReadFileFormat = {
+    scan.getClass.getSimpleName match {
+      case "OrcScan" => ReadFileFormat.OrcReadFormat
+      case "ParquetScan" => ReadFileFormat.ParquetReadFormat
+      case "DwrfScan" => ReadFileFormat.DwrfReadFormat
+      case _ => ReadFileFormat.UnknownFormat
+    }
+  }
+
   override def supportWriteFilesExec(
       format: FileFormat,
       fields: Array[StructField],
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 22919538ff..460547bc71 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -85,7 +85,6 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => UnionTransformerRule())
     injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
     injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
-    injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
     injector.injectPostTransform(_ => PushDownFilterToScan)
     injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
     injector.injectPostTransform(_ => EnsureLocalSortRequirements)
@@ -163,7 +162,6 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => UnionTransformerRule())
     injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
     injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
-    injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
     injector.injectPostTransform(_ => PushDownFilterToScan)
     injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
     injector.injectPostTransform(_ => EnsureLocalSortRequirements)
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java
index 054b9ef106..c16981430e 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
 /**
  * A trivial memory consumer implementation used by Gluten.
  *
- * @deprecated Use {@link TreeMemoryConsumers#shared()} instead.
+ * @deprecated Use {@link TreeMemoryConsumers} instead.
  */
 @Deprecated
 public class RegularMemoryConsumer extends MemoryConsumer
diff --git 
a/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
 
b/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
deleted file mode 100644
index 9613b83ffe..0000000000
--- 
a/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.gluten.execution.DeltaScanTransformerProvider
\ No newline at end of file
diff --git 
a/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.extension.columnar.RewriteTransformerRules
 
b/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.extension.columnar.RewriteTransformerRules
deleted file mode 100644
index a598d1208a..0000000000
--- 
a/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.extension.columnar.RewriteTransformerRules
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.gluten.extension.DeltaRewriteTransformerRules
\ No newline at end of file
diff --git 
a/gluten-delta/src-delta/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
 
b/gluten-delta/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala
similarity index 64%
rename from 
gluten-delta/src-delta/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
rename to 
gluten-delta/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala
index e482150b8e..59c637c4a6 100644
--- 
a/gluten-delta/src-delta/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
+++ 
b/gluten-delta/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala
@@ -16,14 +16,15 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
 
-class DeltaScanTransformerProvider extends DataSourceScanTransformerRegister {
+import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
 
-  override val scanClassName: String = 
"org.apache.spark.sql.delta.DeltaParquetFileFormat"
-
-  override def createDataSourceTransformer(
-      batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
-    DeltaScanTransformer(batchScan)
+case class OffloadDeltaScan() extends OffloadSingleNode {
+  override def offload(plan: SparkPlan): SparkPlan = plan match {
+    case scan: FileSourceScanExec
+        if scan.relation.fileFormat.getClass.getName == 
"org.apache.spark.sql.delta.DeltaParquetFileFormat" =>
+      DeltaScanTransformer(scan)
+    case other => other
   }
 }
diff --git 
a/gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
 
b/gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
similarity index 93%
rename from 
gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
rename to 
gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
index 011184724e..417f8adb41 100644
--- 
a/gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
+++ 
b/gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
@@ -17,8 +17,7 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.execution.{DeltaFilterExecTransformer, 
DeltaProjectExecTransformer, DeltaScanTransformer, ProjectExecTransformer}
-import 
org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, 
filterRule, projectRule, pushDownInputFileExprRule}
-import org.apache.gluten.extension.columnar.RewriteTransformerRules
+import org.apache.gluten.extension.columnar.transition.RemoveTransitions
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, 
InputFileName}
@@ -30,12 +29,10 @@ import org.apache.spark.sql.execution.datasources.FileFormat
 
 import scala.collection.mutable.ListBuffer
 
-class DeltaRewriteTransformerRules extends RewriteTransformerRules {
-  override def rules: Seq[Rule[SparkPlan]] =
-    columnMappingRule :: filterRule :: projectRule :: 
pushDownInputFileExprRule :: Nil
-}
-
-object DeltaRewriteTransformerRules {
+object DeltaPostTransformRules {
+  def rules: Seq[Rule[SparkPlan]] =
+    RemoveTransitions :: columnMappingRule :: filterRule :: projectRule ::
+      pushDownInputFileExprRule :: Nil
 
   private val COLUMN_MAPPING_RULE_TAG: TreeNodeTag[String] =
     TreeNodeTag[String]("org.apache.gluten.delta.column.mapping")
diff --git 
a/gluten-hudi/src-hudi/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
 
b/gluten-hudi/src-hudi/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
deleted file mode 100644
index ccfe1ada47..0000000000
--- 
a/gluten-hudi/src-hudi/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.gluten.execution.HudiScanTransformerProvider
\ No newline at end of file
diff --git 
a/gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
 
b/gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala
similarity index 56%
rename from 
gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
rename to 
gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala
index 6c083107f7..98a8a154e0 100644
--- 
a/gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
+++ 
b/gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala
@@ -14,16 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.execution
 
-import org.apache.spark.sql.execution.FileSourceScanExec
+package org.apache.gluten.execution
 
-class HudiScanTransformerProvider extends DataSourceScanTransformerRegister {
+import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
 
-  override val scanClassName: String = "HoodieParquetFileFormat"
+import org.apache.spark.sql.execution.SparkPlan
 
-  override def createDataSourceTransformer(
-      batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
-    HudiScanTransformer(batchScan)
+/** Since https://github.com/apache/incubator-gluten/pull/6049. */
+case class OffloadHudiScan() extends OffloadSingleNode {
+  override def offload(plan: SparkPlan): SparkPlan = {
+    plan match {
+      // Hudi has multiple file format definitions whose names end with 
"HoodieParquetFileFormat".
+      case scan: org.apache.spark.sql.execution.FileSourceScanExec
+          if 
scan.relation.fileFormat.getClass.getName.endsWith("HoodieParquetFileFormat") =>
+        HudiScanTransformer(scan)
+      case other => other
+    }
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 0d5b6b36da..48a999c327 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -19,11 +19,13 @@ package org.apache.gluten.backendsapi
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.substrait.rel.LocalFilesNode
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
 import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.connector.read.Scan
 import 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
InsertIntoHadoopFsRelationCommand}
 import org.apache.spark.sql.types.StructField
@@ -39,6 +41,10 @@ trait BackendSettingsApi {
       rootPaths: Seq[String],
       properties: Map[String, String]): ValidationResult = 
ValidationResult.succeeded
 
+  def getSubstraitReadFileFormatV1(fileFormat: FileFormat): 
LocalFilesNode.ReadFileFormat
+
+  def getSubstraitReadFileFormatV2(scan: Scan): LocalFilesNode.ReadFileFormat
+
   def supportWriteFilesExec(
       format: FileFormat,
       fields: Array[StructField],
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 67710cba07..958f4ee6c8 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -175,13 +175,8 @@ abstract class BatchScanExecTransformerBase(
   @transient protected lazy val filteredFlattenPartitions: Seq[InputPartition] 
=
     filteredPartitions.flatten
 
-  @transient override lazy val fileFormat: ReadFileFormat = 
scan.getClass.getSimpleName match {
-    case "OrcScan" => ReadFileFormat.OrcReadFormat
-    case "ParquetScan" => ReadFileFormat.ParquetReadFormat
-    case "DwrfScan" => ReadFileFormat.DwrfReadFormat
-    case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
-    case _ => ReadFileFormat.UnknownFormat
-  }
+  @transient override lazy val fileFormat: ReadFileFormat =
+    BackendsApiManager.getSettings.getSubstraitReadFileFormatV2(scan)
 
   override def simpleString(maxFields: Int): String = {
     val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
deleted file mode 100644
index ac6811f0ba..0000000000
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution
-
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-
-/**
- * Data sources transformer should implement this trait so that they can 
register an alias to their
- * data source transformer. This allows users to give the data source 
transformer alias as the
- * format type over the fully qualified class name.
- */
-trait DataSourceScanTransformerRegister {
-
-  /**
-   * The class name that used to identify what kind of datasource this is。
-   *
-   * For DataSource V1, it should be relation.fileFormat like
-   * {{{
-   *   override val scanClassName: String = 
"org.apache.spark.sql.delta.DeltaParquetFileFormat"
-   * }}}
-   *
-   * For DataSource V2, it should be the child class name of
-   * [[org.apache.spark.sql.connector.read.Scan]]. For example:
-   * {{{
-   *   override val scanClassName: String = 
"org.apache.iceberg.spark.source.SparkBatchQueryScan"
-   * }}}
-   */
-  val scanClassName: String
-
-  def createDataSourceTransformer(
-      batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
-    throw new UnsupportedOperationException(
-      "This should not be called, please implement this method in child 
class.");
-  }
-
-  def createDataSourceV2Transformer(batchScan: BatchScanExec): 
BatchScanExecTransformerBase = {
-    throw new UnsupportedOperationException(
-      "This should not be called, please implement this method in child 
class.");
-  }
-}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index a113221d2f..2716e4e1d3 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -190,15 +190,7 @@ abstract class FileSourceScanExecTransformerBase(
   }
 
   @transient override lazy val fileFormat: ReadFileFormat =
-    relation.fileFormat.getClass.getSimpleName match {
-      case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
-      case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
-      case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat
-      case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat
-      case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat
-      case "CSVFileFormat" => ReadFileFormat.TextReadFormat
-      case _ => ReadFileFormat.UnknownFormat
-    }
+    
BackendsApiManager.getSettings.getSubstraitReadFileFormatV1(relation.fileFormat)
 
   override def simpleString(maxFields: Int): String = {
     val metadataEntries = metadata.toSeq.sorted.map {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index 745c895688..2a8cc91382 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -21,76 +21,29 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 
-import java.util.ServiceLoader
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.JavaConverters._
-
 object ScanTransformerFactory {
 
-  private val scanTransformerMap = new ConcurrentHashMap[String, Class[_]]()
-
   def createFileSourceScanTransformer(
       scanExec: FileSourceScanExec): FileSourceScanExecTransformerBase = {
-    val fileFormat = scanExec.relation.fileFormat
-    lookupDataSourceScanTransformer(fileFormat.getClass.getName) match {
-      case Some(clz) =>
-        clz
-          .getDeclaredConstructor()
-          .newInstance()
-          .asInstanceOf[DataSourceScanTransformerRegister]
-          .createDataSourceTransformer(scanExec)
-      case _ =>
-        FileSourceScanExecTransformer(
-          scanExec.relation,
-          scanExec.output,
-          scanExec.requiredSchema,
-          scanExec.partitionFilters,
-          scanExec.optionalBucketSet,
-          scanExec.optionalNumCoalescedBuckets,
-          scanExec.dataFilters,
-          scanExec.tableIdentifier,
-          scanExec.disableBucketedScan
-        )
-    }
+    FileSourceScanExecTransformer(
+      scanExec.relation,
+      scanExec.output,
+      scanExec.requiredSchema,
+      scanExec.partitionFilters,
+      scanExec.optionalBucketSet,
+      scanExec.optionalNumCoalescedBuckets,
+      scanExec.dataFilters,
+      scanExec.tableIdentifier,
+      scanExec.disableBucketedScan
+    )
   }
 
   def createBatchScanTransformer(batchScanExec: BatchScanExec): 
BatchScanExecTransformerBase = {
-    val scan = batchScanExec.scan
-    lookupDataSourceScanTransformer(scan.getClass.getName) match {
-      case Some(clz) =>
-        clz
-          .getDeclaredConstructor()
-          .newInstance()
-          .asInstanceOf[DataSourceScanTransformerRegister]
-          .createDataSourceV2Transformer(batchScanExec)
-      case _ =>
-        BatchScanExecTransformer(
-          batchScanExec.output,
-          batchScanExec.scan,
-          batchScanExec.runtimeFilters,
-          table = 
SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
-        )
-    }
-  }
-
-  private def lookupDataSourceScanTransformer(scanClassName: String): 
Option[Class[_]] = {
-    val clz = scanTransformerMap.computeIfAbsent(
-      scanClassName,
-      _ => {
-        val loader = Option(Thread.currentThread().getContextClassLoader)
-          .getOrElse(getClass.getClassLoader)
-        val serviceLoader = 
ServiceLoader.load(classOf[DataSourceScanTransformerRegister], loader)
-        serviceLoader.asScala
-          .filter(service => scanClassName.contains(service.scanClassName))
-          .toList match {
-          case head :: Nil =>
-            // there is exactly one registered alias
-            head.getClass
-          case _ => null
-        }
-      }
+    BatchScanExecTransformer(
+      batchScanExec.output,
+      batchScanExec.scan,
+      batchScanExec.runtimeFilters,
+      table = 
SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
     )
-    Option(clz)
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RewriteTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RewriteTransformer.scala
deleted file mode 100644
index bdd7d94724..0000000000
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RewriteTransformer.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension.columnar
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.SparkPlan
-
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
-
-trait RewriteTransformerRules {
-  def rules: Seq[Rule[SparkPlan]]
-}
-
-case class RewriteTransformer(session: SparkSession) extends Rule[SparkPlan] {
-
-  private val rules: Seq[Rule[SparkPlan]] = 
RewriteTransformer.loadRewritePlanRules
-
-  override def apply(plan: SparkPlan): SparkPlan = {
-    rules.foldLeft(plan) {
-      case (plan, rule) =>
-        rule(plan)
-    }
-  }
-
-}
-
-object RewriteTransformer {
-
-  private def loadRewritePlanRules: Seq[Rule[SparkPlan]] = {
-    val loader = Option(Thread.currentThread().getContextClassLoader)
-      .getOrElse(getClass.getClassLoader)
-    val serviceLoader = ServiceLoader.load(classOf[RewriteTransformerRules], 
loader)
-
-    serviceLoader.asScala.flatMap(_.rules).toSeq
-  }
-}
diff --git a/gluten-ut/common/pom.xml b/gluten-ut/common/pom.xml
index 14cb212b64..a61a8c47d1 100644
--- a/gluten-ut/common/pom.xml
+++ b/gluten-ut/common/pom.xml
@@ -30,7 +30,6 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-resources-plugin</artifactId>
-        <version>3.0.1</version>
       </plugin>
       <plugin>
         <groupId>net.alchim31.maven</groupId>
diff --git a/gluten-ut/spark32/pom.xml b/gluten-ut/spark32/pom.xml
index 4de49a5563..1ef95e0ea7 100644
--- a/gluten-ut/spark32/pom.xml
+++ b/gluten-ut/spark32/pom.xml
@@ -71,7 +71,6 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-resources-plugin</artifactId>
-        <version>3.0.1</version>
       </plugin>
       <plugin>
         <groupId>net.alchim31.maven</groupId>
diff --git a/pom.xml b/pom.xml
index 4d704dc9b4..018cd597ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -340,8 +340,8 @@
         <iceberg.version>1.5.0</iceberg.version>
         <delta.package.name>delta-spark</delta.package.name>
         <delta.version>3.2.0</delta.version>
-           <delta.binary.version>32</delta.binary.version>
-           <hudi.version>0.15.0</hudi.version>
+        <delta.binary.version>32</delta.binary.version>
+        <hudi.version>0.15.0</hudi.version>
         <fasterxml.version>2.15.1</fasterxml.version>
         <hadoop.version>3.3.4</hadoop.version>
         <antlr4.version>4.9.3</antlr4.version>
@@ -1200,11 +1200,6 @@
         <version>${fasterxml.version}</version>
         <scope>provided</scope>
       </dependency>
-      <dependency>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-source-plugin</artifactId>
-        <version>3.2.1</version>
-      </dependency>
     </dependencies>
   </dependencyManagement>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to