This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c2bf8f04b1 [VL] Delta support / Hudi support as Gluten components
(#8282)
c2bf8f04b1 is described below
commit c2bf8f04b1e990e4873e2b4e9a283c1379582a78
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Dec 20 16:47:51 2024 +0800
[VL] Delta support / Hudi support as Gluten components (#8282)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 23 +++++++
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 1 -
...org.apache.gluten.component.VeloxDeltaComponent | 0
.../gluten/component/VeloxDeltaComponent.scala | 56 +++++++++++++++
.../org.apache.gluten.component.VeloxHudiComponent | 0
.../gluten/component/VeloxHudiComponent.scala | 50 ++++++++++++++
.../gluten/backendsapi/velox/VeloxBackend.scala | 24 ++++++-
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 -
.../memtarget/spark/RegularMemoryConsumer.java | 2 +-
...ten.execution.DataSourceScanTransformerRegister | 1 -
...uten.extension.columnar.RewriteTransformerRules | 1 -
...formerProvider.scala => OffloadDeltaScan.scala} | 15 ++--
...erRules.scala => DeltaPostTransformRules.scala} | 13 ++--
...ten.execution.DataSourceScanTransformerRegister | 1 -
...sformerProvider.scala => OffloadHudiScan.scala} | 20 ++++--
.../gluten/backendsapi/BackendSettingsApi.scala | 6 ++
.../execution/BatchScanExecTransformer.scala | 9 +--
.../DataSourceScanTransformerRegister.scala | 55 ---------------
.../execution/FileSourceScanExecTransformer.scala | 10 +--
.../gluten/execution/ScanTransformerFactory.scala | 79 +++++-----------------
.../extension/columnar/RewriteTransformer.scala | 53 ---------------
gluten-ut/common/pom.xml | 1 -
gluten-ut/spark32/pom.xml | 1 -
pom.xml | 9 +--
24 files changed, 206 insertions(+), 226 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 1aaf801860..5c5032981d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -25,6 +25,7 @@ import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionFunc}
+import org.apache.gluten.substrait.rel.LocalFilesNode
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
@@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.FileFormat
@@ -198,6 +200,27 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
}
}
+ override def getSubstraitReadFileFormatV1(
+ fileFormat: FileFormat): LocalFilesNode.ReadFileFormat = {
+ fileFormat.getClass.getSimpleName match {
+ case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
+ case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
+ case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat
+ case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat
+ case "CSVFileFormat" => ReadFileFormat.TextReadFormat
+ case _ => ReadFileFormat.UnknownFormat
+ }
+ }
+
+ override def getSubstraitReadFileFormatV2(scan: Scan):
LocalFilesNode.ReadFileFormat = {
+ scan.getClass.getSimpleName match {
+ case "OrcScan" => ReadFileFormat.OrcReadFormat
+ case "ParquetScan" => ReadFileFormat.ParquetReadFormat
+ case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
+ case _ => ReadFileFormat.UnknownFormat
+ }
+ }
+
override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 32961c21a2..05a50396fc 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -93,7 +93,6 @@ object CHRuleApi {
// Legacy: Post-transform rules.
injector.injectPostTransform(_ => PruneNestedColumnsInHiveTableScan)
- injector.injectPostTransform(c =>
intercept(RewriteTransformer.apply(c.session)))
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
diff --git
a/backends-velox/src-delta/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxDeltaComponent
b/backends-velox/src-delta/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxDeltaComponent
new file mode 100644
index 0000000000..e69de29bb2
diff --git
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
new file mode 100644
index 0000000000..3557c3dfaa
--- /dev/null
+++
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.component
+
+import org.apache.gluten.backendsapi.velox.VeloxBackend
+import org.apache.gluten.execution.OffloadDeltaScan
+import org.apache.gluten.extension.DeltaPostTransformRules
+import org.apache.gluten.extension.columnar.enumerated.RasOffload
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.validator.Validators
+import org.apache.gluten.extension.injector.Injector
+
+import org.apache.spark.sql.execution.FileSourceScanExec
+
+class VeloxDeltaComponent extends Component {
+ override def name(): String = "velox-delta"
+ override def buildInfo(): Component.BuildInfo =
+ Component.BuildInfo("VeloxDelta", "N/A", "N/A", "N/A")
+ override def dependencies(): Seq[Class[_ <: Component]] =
classOf[VeloxBackend] :: Nil
+ override def injectRules(injector: Injector): Unit = {
+ val legacy = injector.gluten.legacy
+ val ras = injector.gluten.ras
+ legacy.injectTransform {
+ c =>
+ val offload = Seq(OffloadDeltaScan())
+ HeuristicTransform.Simple(Validators.newValidator(c.glutenConf,
offload), offload)
+ }
+ ras.injectRasRule {
+ c =>
+ RasOffload.Rule(
+ RasOffload.from[FileSourceScanExec](OffloadDeltaScan()),
+ Validators.newValidator(c.glutenConf),
+ Nil)
+ }
+ DeltaPostTransformRules.rules.foreach {
+ r =>
+ legacy.injectPostTransform(_ => r)
+ ras.injectPostTransform(_ => r)
+ }
+ }
+}
diff --git
a/backends-velox/src-hudi/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxHudiComponent
b/backends-velox/src-hudi/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxHudiComponent
new file mode 100644
index 0000000000..e69de29bb2
diff --git
a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
new file mode 100644
index 0000000000..c9eeabcdfe
--- /dev/null
+++
b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.component
+
+import org.apache.gluten.backendsapi.velox.VeloxBackend
+import org.apache.gluten.execution.OffloadHudiScan
+import org.apache.gluten.extension.columnar.enumerated.RasOffload
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.validator.Validators
+import org.apache.gluten.extension.injector.Injector
+
+import org.apache.spark.sql.execution.FileSourceScanExec
+
+class VeloxHudiComponent extends Component {
+ override def name(): String = "velox-hudi"
+ override def buildInfo(): Component.BuildInfo =
+ Component.BuildInfo("VeloxHudi", "N/A", "N/A", "N/A")
+ override def dependencies(): Seq[Class[_ <: Component]] =
classOf[VeloxBackend] :: Nil
+ override def injectRules(injector: Injector): Unit = {
+ val legacy = injector.gluten.legacy
+ val ras = injector.gluten.ras
+ legacy.injectTransform {
+ c =>
+ val offload = Seq(OffloadHudiScan())
+ HeuristicTransform.Simple(Validators.newValidator(c.glutenConf,
offload), offload)
+ }
+ ras.injectRasRule {
+ c =>
+ RasOffload.Rule(
+ RasOffload.from[FileSourceScanExec](OffloadHudiScan()),
+ Validators.newValidator(c.glutenConf),
+ Nil)
+ }
+ }
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index b485763e25..bc335792d1 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -27,6 +27,7 @@ import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionFunc}
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.LocalFilesNode
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat,
OrcReadFormat, ParquetReadFormat}
import org.apache.gluten.utils._
@@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias,
CumeDist, DenseRank, De
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
ApproximatePercentile, Percentile}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils}
+import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.{ColumnarCachedBatchSerializer,
SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
@@ -173,7 +175,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
validateTypes(typeValidator)
}
- case _ => Some(s"Unsupported file format for $format.")
+ case _ => Some(s"Unsupported file format $format.")
}
}
@@ -194,6 +196,26 @@ object VeloxBackendSettings extends BackendSettingsApi {
.toSeq
}
+ override def getSubstraitReadFileFormatV1(
+ fileFormat: FileFormat): LocalFilesNode.ReadFileFormat = {
+ fileFormat.getClass.getSimpleName match {
+ case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
+ case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
+ case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat
+ case "CSVFileFormat" => ReadFileFormat.TextReadFormat
+ case _ => ReadFileFormat.UnknownFormat
+ }
+ }
+
+ override def getSubstraitReadFileFormatV2(scan: Scan):
LocalFilesNode.ReadFileFormat = {
+ scan.getClass.getSimpleName match {
+ case "OrcScan" => ReadFileFormat.OrcReadFormat
+ case "ParquetScan" => ReadFileFormat.ParquetReadFormat
+ case "DwrfScan" => ReadFileFormat.DwrfReadFormat
+ case _ => ReadFileFormat.UnknownFormat
+ }
+ }
+
override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 22919538ff..460547bc71 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -85,7 +85,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
- injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
@@ -163,7 +162,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
- injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java
index 054b9ef106..c16981430e 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/RegularMemoryConsumer.java
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
/**
* A trivial memory consumer implementation used by Gluten.
*
- * @deprecated Use {@link TreeMemoryConsumers#shared()} instead.
+ * @deprecated Use {@link TreeMemoryConsumers} instead.
*/
@Deprecated
public class RegularMemoryConsumer extends MemoryConsumer
diff --git
a/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
b/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
deleted file mode 100644
index 9613b83ffe..0000000000
---
a/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.gluten.execution.DeltaScanTransformerProvider
\ No newline at end of file
diff --git
a/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.extension.columnar.RewriteTransformerRules
b/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.extension.columnar.RewriteTransformerRules
deleted file mode 100644
index a598d1208a..0000000000
---
a/gluten-delta/src-delta/main/resources/META-INF/services/org.apache.gluten.extension.columnar.RewriteTransformerRules
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.gluten.extension.DeltaRewriteTransformerRules
\ No newline at end of file
diff --git
a/gluten-delta/src-delta/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
b/gluten-delta/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala
similarity index 64%
rename from
gluten-delta/src-delta/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
rename to
gluten-delta/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala
index e482150b8e..59c637c4a6 100644
---
a/gluten-delta/src-delta/main/scala/org/apache/gluten/execution/DeltaScanTransformerProvider.scala
+++
b/gluten-delta/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala
@@ -16,14 +16,15 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
-class DeltaScanTransformerProvider extends DataSourceScanTransformerRegister {
+import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
- override val scanClassName: String =
"org.apache.spark.sql.delta.DeltaParquetFileFormat"
-
- override def createDataSourceTransformer(
- batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
- DeltaScanTransformer(batchScan)
+case class OffloadDeltaScan() extends OffloadSingleNode {
+ override def offload(plan: SparkPlan): SparkPlan = plan match {
+ case scan: FileSourceScanExec
+ if scan.relation.fileFormat.getClass.getName ==
"org.apache.spark.sql.delta.DeltaParquetFileFormat" =>
+ DeltaScanTransformer(scan)
+ case other => other
}
}
diff --git
a/gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
b/gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
similarity index 93%
rename from
gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
rename to
gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
index 011184724e..417f8adb41 100644
---
a/gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
+++
b/gluten-delta/src-delta/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
@@ -17,8 +17,7 @@
package org.apache.gluten.extension
import org.apache.gluten.execution.{DeltaFilterExecTransformer,
DeltaProjectExecTransformer, DeltaScanTransformer, ProjectExecTransformer}
-import
org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule,
filterRule, projectRule, pushDownInputFileExprRule}
-import org.apache.gluten.extension.columnar.RewriteTransformerRules
+import org.apache.gluten.extension.columnar.transition.RemoveTransitions
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart,
InputFileName}
@@ -30,12 +29,10 @@ import org.apache.spark.sql.execution.datasources.FileFormat
import scala.collection.mutable.ListBuffer
-class DeltaRewriteTransformerRules extends RewriteTransformerRules {
- override def rules: Seq[Rule[SparkPlan]] =
- columnMappingRule :: filterRule :: projectRule ::
pushDownInputFileExprRule :: Nil
-}
-
-object DeltaRewriteTransformerRules {
+object DeltaPostTransformRules {
+ def rules: Seq[Rule[SparkPlan]] =
+ RemoveTransitions :: columnMappingRule :: filterRule :: projectRule ::
+ pushDownInputFileExprRule :: Nil
private val COLUMN_MAPPING_RULE_TAG: TreeNodeTag[String] =
TreeNodeTag[String]("org.apache.gluten.delta.column.mapping")
diff --git
a/gluten-hudi/src-hudi/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
b/gluten-hudi/src-hudi/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
deleted file mode 100644
index ccfe1ada47..0000000000
---
a/gluten-hudi/src-hudi/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.gluten.execution.HudiScanTransformerProvider
\ No newline at end of file
diff --git
a/gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
b/gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala
similarity index 56%
rename from
gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
rename to
gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala
index 6c083107f7..98a8a154e0 100644
---
a/gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
+++
b/gluten-hudi/src-hudi/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala
@@ -14,16 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
-import org.apache.spark.sql.execution.FileSourceScanExec
+package org.apache.gluten.execution
-class HudiScanTransformerProvider extends DataSourceScanTransformerRegister {
+import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
- override val scanClassName: String = "HoodieParquetFileFormat"
+import org.apache.spark.sql.execution.SparkPlan
- override def createDataSourceTransformer(
- batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
- HudiScanTransformer(batchScan)
+/** Since https://github.com/apache/incubator-gluten/pull/6049. */
+case class OffloadHudiScan() extends OffloadSingleNode {
+ override def offload(plan: SparkPlan): SparkPlan = {
+ plan match {
+ // Hudi has multiple file format definitions whose names end with
"HoodieParquetFileFormat".
+ case scan: org.apache.spark.sql.execution.FileSourceScanExec
+ if
scan.relation.fileFormat.getClass.getName.endsWith("HoodieParquetFileFormat") =>
+ HudiScanTransformer(scan)
+ case other => other
+ }
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 0d5b6b36da..48a999c327 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -19,11 +19,13 @@ package org.apache.gluten.backendsapi
import org.apache.gluten.GlutenConfig
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.substrait.rel.LocalFilesNode
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.connector.read.Scan
import
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat,
InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.types.StructField
@@ -39,6 +41,10 @@ trait BackendSettingsApi {
rootPaths: Seq[String],
properties: Map[String, String]): ValidationResult =
ValidationResult.succeeded
+ def getSubstraitReadFileFormatV1(fileFormat: FileFormat):
LocalFilesNode.ReadFileFormat
+
+ def getSubstraitReadFileFormatV2(scan: Scan): LocalFilesNode.ReadFileFormat
+
def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 67710cba07..958f4ee6c8 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -175,13 +175,8 @@ abstract class BatchScanExecTransformerBase(
@transient protected lazy val filteredFlattenPartitions: Seq[InputPartition]
=
filteredPartitions.flatten
- @transient override lazy val fileFormat: ReadFileFormat =
scan.getClass.getSimpleName match {
- case "OrcScan" => ReadFileFormat.OrcReadFormat
- case "ParquetScan" => ReadFileFormat.ParquetReadFormat
- case "DwrfScan" => ReadFileFormat.DwrfReadFormat
- case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
- case _ => ReadFileFormat.UnknownFormat
- }
+ @transient override lazy val fileFormat: ReadFileFormat =
+ BackendsApiManager.getSettings.getSubstraitReadFileFormatV2(scan)
override def simpleString(maxFields: Int): String = {
val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
deleted file mode 100644
index ac6811f0ba..0000000000
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution
-
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-
-/**
- * Data sources transformer should implement this trait so that they can
register an alias to their
- * data source transformer. This allows users to give the data source
transformer alias as the
- * format type over the fully qualified class name.
- */
-trait DataSourceScanTransformerRegister {
-
- /**
- * The class name that used to identify what kind of datasource this is。
- *
- * For DataSource V1, it should be relation.fileFormat like
- * {{{
- * override val scanClassName: String =
"org.apache.spark.sql.delta.DeltaParquetFileFormat"
- * }}}
- *
- * For DataSource V2, it should be the child class name of
- * [[org.apache.spark.sql.connector.read.Scan]]. For example:
- * {{{
- * override val scanClassName: String =
"org.apache.iceberg.spark.source.SparkBatchQueryScan"
- * }}}
- */
- val scanClassName: String
-
- def createDataSourceTransformer(
- batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
- throw new UnsupportedOperationException(
- "This should not be called, please implement this method in child
class.");
- }
-
- def createDataSourceV2Transformer(batchScan: BatchScanExec):
BatchScanExecTransformerBase = {
- throw new UnsupportedOperationException(
- "This should not be called, please implement this method in child
class.");
- }
-}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index a113221d2f..2716e4e1d3 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -190,15 +190,7 @@ abstract class FileSourceScanExecTransformerBase(
}
@transient override lazy val fileFormat: ReadFileFormat =
- relation.fileFormat.getClass.getSimpleName match {
- case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
- case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
- case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat
- case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat
- case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat
- case "CSVFileFormat" => ReadFileFormat.TextReadFormat
- case _ => ReadFileFormat.UnknownFormat
- }
+
BackendsApiManager.getSettings.getSubstraitReadFileFormatV1(relation.fileFormat)
override def simpleString(maxFields: Int): String = {
val metadataEntries = metadata.toSeq.sorted.map {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index 745c895688..2a8cc91382 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -21,76 +21,29 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-import java.util.ServiceLoader
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.JavaConverters._
-
object ScanTransformerFactory {
- private val scanTransformerMap = new ConcurrentHashMap[String, Class[_]]()
-
def createFileSourceScanTransformer(
scanExec: FileSourceScanExec): FileSourceScanExecTransformerBase = {
- val fileFormat = scanExec.relation.fileFormat
- lookupDataSourceScanTransformer(fileFormat.getClass.getName) match {
- case Some(clz) =>
- clz
- .getDeclaredConstructor()
- .newInstance()
- .asInstanceOf[DataSourceScanTransformerRegister]
- .createDataSourceTransformer(scanExec)
- case _ =>
- FileSourceScanExecTransformer(
- scanExec.relation,
- scanExec.output,
- scanExec.requiredSchema,
- scanExec.partitionFilters,
- scanExec.optionalBucketSet,
- scanExec.optionalNumCoalescedBuckets,
- scanExec.dataFilters,
- scanExec.tableIdentifier,
- scanExec.disableBucketedScan
- )
- }
+ FileSourceScanExecTransformer(
+ scanExec.relation,
+ scanExec.output,
+ scanExec.requiredSchema,
+ scanExec.partitionFilters,
+ scanExec.optionalBucketSet,
+ scanExec.optionalNumCoalescedBuckets,
+ scanExec.dataFilters,
+ scanExec.tableIdentifier,
+ scanExec.disableBucketedScan
+ )
}
def createBatchScanTransformer(batchScanExec: BatchScanExec):
BatchScanExecTransformerBase = {
- val scan = batchScanExec.scan
- lookupDataSourceScanTransformer(scan.getClass.getName) match {
- case Some(clz) =>
- clz
- .getDeclaredConstructor()
- .newInstance()
- .asInstanceOf[DataSourceScanTransformerRegister]
- .createDataSourceV2Transformer(batchScanExec)
- case _ =>
- BatchScanExecTransformer(
- batchScanExec.output,
- batchScanExec.scan,
- batchScanExec.runtimeFilters,
- table =
SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
- )
- }
- }
-
- private def lookupDataSourceScanTransformer(scanClassName: String):
Option[Class[_]] = {
- val clz = scanTransformerMap.computeIfAbsent(
- scanClassName,
- _ => {
- val loader = Option(Thread.currentThread().getContextClassLoader)
- .getOrElse(getClass.getClassLoader)
- val serviceLoader =
ServiceLoader.load(classOf[DataSourceScanTransformerRegister], loader)
- serviceLoader.asScala
- .filter(service => scanClassName.contains(service.scanClassName))
- .toList match {
- case head :: Nil =>
- // there is exactly one registered alias
- head.getClass
- case _ => null
- }
- }
+ BatchScanExecTransformer(
+ batchScanExec.output,
+ batchScanExec.scan,
+ batchScanExec.runtimeFilters,
+ table =
SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
)
- Option(clz)
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RewriteTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RewriteTransformer.scala
deleted file mode 100644
index bdd7d94724..0000000000
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/RewriteTransformer.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension.columnar
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.SparkPlan
-
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
-
-trait RewriteTransformerRules {
- def rules: Seq[Rule[SparkPlan]]
-}
-
-case class RewriteTransformer(session: SparkSession) extends Rule[SparkPlan] {
-
- private val rules: Seq[Rule[SparkPlan]] =
RewriteTransformer.loadRewritePlanRules
-
- override def apply(plan: SparkPlan): SparkPlan = {
- rules.foldLeft(plan) {
- case (plan, rule) =>
- rule(plan)
- }
- }
-
-}
-
-object RewriteTransformer {
-
- private def loadRewritePlanRules: Seq[Rule[SparkPlan]] = {
- val loader = Option(Thread.currentThread().getContextClassLoader)
- .getOrElse(getClass.getClassLoader)
- val serviceLoader = ServiceLoader.load(classOf[RewriteTransformerRules],
loader)
-
- serviceLoader.asScala.flatMap(_.rules).toSeq
- }
-}
diff --git a/gluten-ut/common/pom.xml b/gluten-ut/common/pom.xml
index 14cb212b64..a61a8c47d1 100644
--- a/gluten-ut/common/pom.xml
+++ b/gluten-ut/common/pom.xml
@@ -30,7 +30,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
- <version>3.0.1</version>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
diff --git a/gluten-ut/spark32/pom.xml b/gluten-ut/spark32/pom.xml
index 4de49a5563..1ef95e0ea7 100644
--- a/gluten-ut/spark32/pom.xml
+++ b/gluten-ut/spark32/pom.xml
@@ -71,7 +71,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
- <version>3.0.1</version>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
diff --git a/pom.xml b/pom.xml
index 4d704dc9b4..018cd597ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -340,8 +340,8 @@
<iceberg.version>1.5.0</iceberg.version>
<delta.package.name>delta-spark</delta.package.name>
<delta.version>3.2.0</delta.version>
- <delta.binary.version>32</delta.binary.version>
- <hudi.version>0.15.0</hudi.version>
+ <delta.binary.version>32</delta.binary.version>
+ <hudi.version>0.15.0</hudi.version>
<fasterxml.version>2.15.1</fasterxml.version>
<hadoop.version>3.3.4</hadoop.version>
<antlr4.version>4.9.3</antlr4.version>
@@ -1200,11 +1200,6 @@
<version>${fasterxml.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>3.2.1</version>
- </dependency>
</dependencies>
</dependencyManagement>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]