This is an automated email from the ASF dual-hosted git repository.

yma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f54592993 [GLUTEN-5471][VL]feat: Support read Hudi COW table (#6049)
f54592993 is described below

commit f54592993542807c92c3d2b673adffc8bf5f51b9
Author: Yan Ma <[email protected]>
AuthorDate: Wed Aug 28 11:30:04 2024 +0800

    [GLUTEN-5471][VL]feat: Support read Hudi COW table (#6049)
    
    * [GLUTEN-5471][VL]feat: Support read Hudi COW table
    
    * Refine code and fix UTs
    
    ---------
    
    Co-authored-by: Shiyan Xu <[email protected]>
---
 .github/workflows/velox_backend.yml                |  17 ++-
 docs/get-started/Velox.md                          |  16 +-
 docs/get-started/build-guide.md                    |   1 +
 .../DataSourceScanTransformerRegister.scala        |  10 +-
 .../gluten/execution/ScanTransformerFactory.scala  |   2 +-
 gluten-hudi/pom.xml                                | 162 +++++++++++++++++++++
 ...ten.execution.DataSourceScanTransformerRegister |   1 +
 .../gluten/execution/HudiScanTransformer.scala     |  93 ++++++++++++
 .../execution/HudiScanTransformerProvider.scala    |  29 ++++
 .../apache/gluten/execution/VeloxHudiSuite.scala   | 132 +++++++++++++++++
 .../gluten/execution/VeloxTPCHHudiSuite.scala      |  63 ++++++++
 package/pom.xml                                    |  12 +-
 pom.xml                                            |  15 +-
 13 files changed, 536 insertions(+), 17 deletions(-)

diff --git a/.github/workflows/velox_backend.yml 
b/.github/workflows/velox_backend.yml
index fc375c666..de6995673 100644
--- a/.github/workflows/velox_backend.yml
+++ b/.github/workflows/velox_backend.yml
@@ -30,6 +30,7 @@ on:
       - 'gluten-data/**'
       - 'gluten-delta/**'
       - 'gluten-iceberg/**'
+      - 'gluten-hudi/**'
       - 'gluten-ut/**'
       - 'shims/**'
       - 'tools/gluten-it/**'
@@ -567,7 +568,7 @@ jobs:
           cd $GITHUB_WORKSPACE/
           export SPARK_SCALA_VERSION=2.12
           $MVN_CMD clean test -Pspark-3.2 -Pspark-ut -Pbackends-velox 
-Pceleborn -Piceberg \
-          -Pdelta 
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" \
+          -Pdelta -Phudi 
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" \
           
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
       - name: Upload golden files
         if: failure()
@@ -617,7 +618,7 @@ jobs:
       - name: Build and run unit test for Spark 3.2.2 (slow tests)
         run: |
           cd $GITHUB_WORKSPACE/
-          $MVN_CMD clean test -Pspark-3.2 -Pspark-ut -Pbackends-velox 
-Pceleborn -Piceberg -Pdelta \
+          $MVN_CMD clean test -Pspark-3.2 -Pspark-ut -Pbackends-velox 
-Pceleborn -Piceberg -Pdelta -Phudi \
           
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" 
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
 
   run-spark-test-spark33:
@@ -667,7 +668,7 @@ jobs:
         run: |
           cd $GITHUB_WORKSPACE/
           export SPARK_SCALA_VERSION=2.12
-          $MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Pspark-ut \
+          $MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Phudi -Pspark-ut \
           
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" \
           
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
       - name: Upload golden files
@@ -719,7 +720,7 @@ jobs:
       - name: Build and Run unit test for Spark 3.3.1 (slow tests)
         run: |
           cd $GITHUB_WORKSPACE/
-          $MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Pspark-ut \
+          $MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Phudi -Pspark-ut \
           
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" \
           -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
 
@@ -770,7 +771,7 @@ jobs:
         run: |
           cd $GITHUB_WORKSPACE/
           export SPARK_SCALA_VERSION=2.12
-          $MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Pspark-ut \
+          $MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Phudi -Pspark-ut \
           
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" \
           
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
       - name: Upload golden files
@@ -822,7 +823,7 @@ jobs:
       - name: Build and Run unit test for Spark 3.4.2 (slow tests)
         run: |
           cd $GITHUB_WORKSPACE/
-          $MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Pspark-ut \
+          $MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Pspark-ut -Phudi \
           
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" \
           -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
 
@@ -873,7 +874,7 @@ jobs:
         run: |
           cd $GITHUB_WORKSPACE/
           export SPARK_SCALA_VERSION=2.12
-          $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Pspark-ut \
+          $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Phudi -Pspark-ut \
           
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \
           
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags
       - name: Upload golden files
@@ -975,7 +976,7 @@ jobs:
       - name: Build and Run unit test for Spark 3.5.1 (slow tests)
         run: |
           cd $GITHUB_WORKSPACE/
-          $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Pspark-ut \
+          $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn 
-Piceberg -Pdelta -Phudi -Pspark-ut \
           
-DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \
           -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
 
diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md
index 776736b2f..2f7dae4fb 100644
--- a/docs/get-started/Velox.md
+++ b/docs/get-started/Velox.md
@@ -321,7 +321,7 @@ About column mapping, see more 
[here](https://docs.delta.io/latest/delta-column-
 
 ## Iceberg Support
 
-Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) 
table. Currently, only reading COW (Copy-On-Write) tables is supported.
+Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) 
table. Currently, both reading COW (Copy-On-Write) and MOR (Merge-On-Read) 
tables are supported.
 
 ### How to use
 
@@ -333,6 +333,20 @@ mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg 
-DskipTests
 
 Once built successfully, iceberg features will be included in 
gluten-velox-bundle-X jar. Then you can query iceberg table by gluten/velox 
without scan's fallback.
 
+## Hudi Support
+
+Gluten with velox backend supports [Hudi](https://hudi.apache.org/) table. 
Currently, only reading COW (Copy-On-Write) tables is supported.
+
+### How to use
+
+First of all, compile gluten-hudi module by a `hudi` profile, as follows:
+
+```
+mvn clean package -Pbackends-velox -Pspark-3.3 -Phudi -DskipTests
+```
+
+Once built successfully, hudi features will be included in 
gluten-velox-bundle-X jar. Then you can query hudi **COW** table by 
gluten/velox without scan's fallback.
+
 # Coverage
 
 Spark3.3 has 387 functions in total. ~240 are commonly used. To get the 
support status of all Spark built-in functions, please refer to [Velox 
Backend's Supported Operators & 
Functions](../velox-backend-support-progress.md).
diff --git a/docs/get-started/build-guide.md b/docs/get-started/build-guide.md
index 05e6119ec..d1b1533ad 100644
--- a/docs/get-started/build-guide.md
+++ b/docs/get-started/build-guide.md
@@ -62,6 +62,7 @@ The below parameters can be set via `-P` for mvn.
 | uniffle             | Build Gluten with Uniffle.            | disabled      |
 | delta               | Build Gluten with Delta Lake support. | disabled      |
 | iceberg             | Build Gluten with Iceberg support.    | disabled      |
+| hudi                | Build Gluten with Hudi support.       | disabled      |
 | spark-3.2           | Build Gluten for Spark 3.2.           | enabled       |
 | spark-3.3           | Build Gluten for Spark 3.3.           | disabled      |
 | spark-3.4           | Build Gluten for Spark 3.4.           | disabled      |
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
index 5b46c2385..ac6811f0b 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
@@ -29,13 +29,13 @@ trait DataSourceScanTransformerRegister {
   /**
    * The class name that used to identify what kind of datasource this is。
    *
-   * For DataSource V1, it should be the child class name of
-   * [[org.apache.spark.sql.execution.datasources.FileIndex]].
+   * For DataSource V1, it should be relation.fileFormat like
+   * {{{
+   *   override val scanClassName: String = 
"org.apache.spark.sql.delta.DeltaParquetFileFormat"
+   * }}}
    *
    * For DataSource V2, it should be the child class name of
-   * [[org.apache.spark.sql.connector.read.Scan]].
-   *
-   * For example:
+   * [[org.apache.spark.sql.connector.read.Scan]]. For example:
    * {{{
    *   override val scanClassName: String = 
"org.apache.iceberg.spark.source.SparkBatchQueryScan"
    * }}}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index a05a5e72b..7d8c5aab6 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -127,7 +127,7 @@ object ScanTransformerFactory {
           .getOrElse(getClass.getClassLoader)
         val serviceLoader = 
ServiceLoader.load(classOf[DataSourceScanTransformerRegister], loader)
         serviceLoader.asScala
-          .filter(_.scanClassName.equalsIgnoreCase(scanClassName))
+          .filter(service => scanClassName.contains(service.scanClassName))
           .toList match {
           case head :: Nil =>
             // there is exactly one registered alias
diff --git a/gluten-hudi/pom.xml b/gluten-hudi/pom.xml
new file mode 100755
index 000000000..2faf53a07
--- /dev/null
+++ b/gluten-hudi/pom.xml
@@ -0,0 +1,162 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>gluten-parent</artifactId>
+    <groupId>org.apache.gluten</groupId>
+    <version>1.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>gluten-hudi</artifactId>
+  <packaging>jar</packaging>
+  <name>Gluten Hudi</name>
+
+  <properties>
+    <resource.dir>${project.basedir}/src/main/resources</resource.dir>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>gluten-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      
<artifactId>hudi-spark${sparkbundle.version}-bundle_${scala.binary.version}</artifactId>
+      <version>${hudi.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- For test -->
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>gluten-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>backends-velox</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>backends-velox</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <resources>
+      <resource>
+        <directory>${resource.dir}</directory>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.scalastyle</groupId>
+        <artifactId>scalastyle-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>com.diffplug.spotless</groupId>
+        <artifactId>spotless-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <configuration>
+          <junitxml>.</junitxml>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>prepare-test-jar</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
 
b/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
new file mode 100644
index 000000000..ccfe1ada4
--- /dev/null
+++ 
b/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister
@@ -0,0 +1 @@
+org.apache.gluten.execution.HudiScanTransformerProvider
\ No newline at end of file
diff --git 
a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
 
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
new file mode 100644
index 000000000..76a818c96
--- /dev/null
+++ 
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.collection.BitSet
+
+case class HudiScanTransformer(
+    @transient override val relation: HadoopFsRelation,
+    override val output: Seq[Attribute],
+    override val requiredSchema: StructType,
+    override val partitionFilters: Seq[Expression],
+    override val optionalBucketSet: Option[BitSet],
+    override val optionalNumCoalescedBuckets: Option[Int],
+    override val dataFilters: Seq[Expression],
+    override val tableIdentifier: Option[TableIdentifier],
+    override val disableBucketedScan: Boolean = false)
+  extends FileSourceScanExecTransformerBase(
+    relation,
+    output,
+    requiredSchema,
+    partitionFilters,
+    optionalBucketSet,
+    optionalNumCoalescedBuckets,
+    dataFilters,
+    tableIdentifier,
+    disableBucketedScan
+  ) {
+
+  override lazy val fileFormat: ReadFileFormat = 
ReadFileFormat.ParquetReadFormat
+
+  override protected def doValidateInternal(): ValidationResult = {
+    if (requiredSchema.fields.exists(_.name.startsWith("_hoodie"))) {
+      return ValidationResult.failed(s"Hudi meta field not supported.")
+    }
+    super.doValidateInternal()
+  }
+
+  override def doCanonicalize(): HudiScanTransformer = {
+    HudiScanTransformer(
+      relation,
+      output.map(QueryPlan.normalizeExpressions(_, output)),
+      requiredSchema,
+      QueryPlan.normalizePredicates(
+        filterUnusedDynamicPruningExpressions(partitionFilters),
+        output),
+      optionalBucketSet,
+      optionalNumCoalescedBuckets,
+      QueryPlan.normalizePredicates(dataFilters, output),
+      None,
+      disableBucketedScan
+    )
+  }
+}
+
+object HudiScanTransformer {
+
+  def apply(scanExec: FileSourceScanExec): HudiScanTransformer = {
+    new HudiScanTransformer(
+      scanExec.relation,
+      scanExec.output,
+      scanExec.requiredSchema,
+      scanExec.partitionFilters,
+      scanExec.optionalBucketSet,
+      scanExec.optionalNumCoalescedBuckets,
+      scanExec.dataFilters,
+      scanExec.tableIdentifier,
+      scanExec.disableBucketedScan
+    )
+  }
+}
diff --git 
a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
 
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
new file mode 100644
index 000000000..6c083107f
--- /dev/null
+++ 
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.sql.execution.FileSourceScanExec
+
+class HudiScanTransformerProvider extends DataSourceScanTransformerRegister {
+
+  override val scanClassName: String = "HoodieParquetFileFormat"
+
+  override def createDataSourceTransformer(
+      batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
+    HudiScanTransformer(batchScan)
+  }
+}
diff --git 
a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala 
b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala
new file mode 100644
index 000000000..4f9bd896c
--- /dev/null
+++ 
b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+
+class VeloxHudiSuite extends WholeStageTransformerSuite {
+
+  protected val rootPath: String = getClass.getResource("/").getPath
+  override protected val resourcePath: String = "/tpch-data-parquet-velox"
+  override protected val fileFormat: String = "parquet"
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.memory.offHeap.size", "2g")
+      .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set("spark.sql.autoBroadcastJoinThreshold", "-1")
+      .set("spark.sql.sources.useV1SourceList", "avro")
+      .set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+      .set("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+      .set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+  }
+
+  testWithSpecifiedSparkVersion("hudi: time travel", Some("3.2")) {
+    withTable("hudi_tm") {
+      spark.sql(s"""
+                   |create table hudi_tm (id int, name string) using hudi
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |insert into hudi_tm values (1, "v1"), (2, "v2")
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |insert into hudi_tm values (3, "v3"), (4, "v4")
+                   |""".stripMargin)
+      val df = spark.sql(" select _hoodie_commit_time from hudi_tm;")
+      val value = df.collectAsList().get(0).getAs[String](0)
+      val df1 = runQueryAndCompare("select id, name from hudi_tm timestamp AS 
OF " + value) {
+        checkGlutenOperatorMatch[HudiScanTransformer]
+      }
+      checkLengthAndPlan(df1, 2)
+      checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)
+      val df2 =
+        runQueryAndCompare("select name from hudi_tm timestamp AS OF " + value 
+ " where id = 2") {
+          checkGlutenOperatorMatch[HudiScanTransformer]
+        }
+      checkLengthAndPlan(df2, 1)
+      checkAnswer(df2, Row("v2") :: Nil)
+    }
+  }
+
+  testWithSpecifiedSparkVersion("hudi: soft delete", Some("3.2")) {
+    withTable("hudi_pf") {
+      spark.sql(s"""
+                   |create table hudi_pf (id int, name string) using hudi
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |insert into hudi_pf values (1, "v1"), (2, "v2"), (3, 
"v1"), (4, "v2")
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |delete from hudi_pf where name = "v2"
+                   |""".stripMargin)
+      val df1 = runQueryAndCompare("select id, name from hudi_pf") {
+        checkGlutenOperatorMatch[HudiScanTransformer]
+      }
+      checkLengthAndPlan(df1, 2)
+      checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
+    }
+  }
+
+  // FIXME: flaky leaked file systems issue
+  ignore("hudi: mor", Some("3.2")) {
+    withTable("hudi_mor") {
+      spark.sql(s"""
+                   |create table hudi_mor (id int, name string, ts bigint)
+                   |using hudi
+                   |tblproperties (
+                   |  type = 'mor',
+                   |  primaryKey = 'id',
+                   |  preCombineField = 'ts'
+                   |)
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |insert into hudi_mor values (1, "v1", 1000), (2, "v2", 
2000),
+                   | (3, "v1", 3000), (4, "v2", 4000)
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |delete from hudi_mor where id = 1
+                   |""".stripMargin)
+      val df1 =
+        runQueryAndCompare("select id, name from hudi_mor where name = 'v1'", 
true, false, false) {
+          _ =>
+        }
+      checkAnswer(df1, Row(3, "v1") :: Nil)
+    }
+  }
+
+  testWithSpecifiedSparkVersion("hudi: partition filters", Some("3.2")) {
+    withTable("hudi_pf") {
+      spark.sql(s"""
+                   |create table hudi_pf (id int, name string) using hudi 
partitioned by (name)
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |insert into hudi_pf values (1, "v1"), (2, "v2"), (3, 
"v1"), (4, "v2")
+                   |""".stripMargin)
+      val df1 = runQueryAndCompare("select id, name from hudi_pf where name = 
'v1'") { _ => }
+      val hudiScanTransformer = df1.queryExecution.executedPlan.collect {
+        case f: HudiScanTransformer => f
+      }.head
+      // No data filters as only partition filters exist
+      assert(hudiScanTransformer.filterExprs().size == 0)
+      checkLengthAndPlan(df1, 2)
+      checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
+    }
+  }
+}
diff --git 
a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala
 
b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala
new file mode 100644
index 000000000..ce6ec9bbc
--- /dev/null
+++ 
b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+
+import java.io.File
+
+class VeloxTPCHHudiSuite extends VeloxTPCHSuite {
+
+  protected val tpchBasePath: String = new File(
+    "../backends-velox/src/test/resources").getAbsolutePath
+
+  override protected val resourcePath: String =
+    new File(tpchBasePath, "tpch-data-parquet-velox").getCanonicalPath
+
+  override protected val veloxTPCHQueries: String =
+    new File(tpchBasePath, "tpch-queries-velox").getCanonicalPath
+
+  override protected val queriesResults: String =
+    new File(tpchBasePath, "queries-output").getCanonicalPath
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.executor.memory", "4g")
+      .set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+      .set("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+      .set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+  }
+
+  override protected def createTPCHNotNullTables(): Unit = {
+    TPCHTables
+      .map(_.name)
+      .map {
+        table =>
+          val tablePath = new File(resourcePath, table).getAbsolutePath
+          val tableDF = spark.read.format(fileFormat).load(tablePath)
+          tableDF.write.format("hudi").mode("append").saveAsTable(table)
+          (table, tableDF)
+      }
+      .toMap
+  }
+
+  override protected def afterAll(): Unit = {
+    TPCHTables.map(_.name).foreach(table => spark.sql(s"DROP TABLE IF EXISTS 
$table"))
+    super.afterAll()
+  }
+}
diff --git a/package/pom.xml b/package/pom.xml
index f385a2a5a..ac0754c38 100644
--- a/package/pom.xml
+++ b/package/pom.xml
@@ -98,7 +98,17 @@
       <dependencies>
         <dependency>
           <groupId>org.apache.gluten</groupId>
-          <artifactId>gluten-delta</artifactId>
+         <artifactId>gluten-delta</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>hudi</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.gluten</groupId>
+          <artifactId>gluten-hudi</artifactId>
           <version>${project.version}</version>
         </dependency>
       </dependencies>
diff --git a/pom.xml b/pom.xml
index e95300744..991ff835d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -280,6 +280,7 @@
         <delta.version>2.0.1</delta.version>
         <delta.binary.version>20</delta.binary.version>
         <antlr4.version>4.8</antlr4.version>
+        <hudi.version>0.15.0</hudi.version>
       </properties>
     </profile>
     <profile>
@@ -295,6 +296,7 @@
         <delta.version>2.3.0</delta.version>
         <delta.binary.version>23</delta.binary.version>
         <antlr4.version>4.8</antlr4.version>
+       <hudi.version>0.15.0</hudi.version>
       </properties>
     </profile>
     <profile>
@@ -309,6 +311,7 @@
         <delta.version>2.4.0</delta.version>
         <delta.binary.version>24</delta.binary.version>
         <antlr4.version>4.9.3</antlr4.version>
+        <hudi.version>0.15.0</hudi.version>
       </properties>
     </profile>
     <profile>
@@ -321,7 +324,8 @@
         <iceberg.version>1.5.0</iceberg.version>
         <delta.package.name>delta-spark</delta.package.name>
         <delta.version>3.2.0</delta.version>
-        <delta.binary.version>32</delta.binary.version>
+       <delta.binary.version>32</delta.binary.version>
+       <hudi.version>0.15.0</hudi.version>
         <fasterxml.version>2.15.1</fasterxml.version>
         <hadoop.version>3.3.4</hadoop.version>
         <antlr4.version>4.9.3</antlr4.version>
@@ -430,6 +434,15 @@
         <module>gluten-iceberg</module>
       </modules>
     </profile>
+    <profile>
+      <id>hudi</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <modules>
+        <module>gluten-hudi</module>
+      </modules>
+    </profile>
     <profile>
       <id>backends-velox</id>
       <activation>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to