This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 10cf0fdf [AURON #2030] Add Native Scan Support for Apache Hudi 
Copy-On-Write Tables. (#2031)
10cf0fdf is described below

commit 10cf0fdf2b1d3872b9415112e2d9c105b933d198
Author: slfan1989 <[email protected]>
AuthorDate: Fri Mar 20 10:33:34 2026 +0800

    [AURON #2030] Add Native Scan Support for Apache Hudi Copy-On-Write Tables. 
(#2031)
    
    ### Which issue does this PR close?
    
    Closes #2030
    
    ### Rationale for this change
    
    This PR adds native scan support for Hudi Copy-On-Write (COW) tables,
    enabling Auron to accelerate Hudi table reads by converting
    `FileSourceScanExec` operations to native Parquet/ORC scan
    implementations.
    
    ### What changes are included in this PR?
    
    #### 1. **New Module: `thirdparty/auron-hudi`**
    - **`HudiConvertProvider`**: Implements `AuronConvertProvider` SPI to
    intercept and convert Hudi `FileSourceScanExec` to native scans
    - Detects Hudi file formats (`HoodieParquetFileFormat`,
    `HoodieOrcFileFormat`)
      - Converts to `NativeParquetScanExec` or `NativeOrcScanExec`
      - Handles timestamp fallback logic automatically
    
    - **`HudiScanSupport`**: Core detection and validation logic
      - File format recognition with `NewHoodie*` format rejection
      - Table type resolution via multi-source metadata fallback:
        - Options → Catalog → `.hoodie/hoodie.properties`
      - MOR table detection and rejection
    - Time travel query detection (via `as.of.instant`, `as.of.timestamp`
    options)
      - FileIndex class hierarchy verification
    
    #### 2. **Configuration**
    - Added `spark.auron.enable.hudi.scan` config option (default: `true`)
    - Respects existing Parquet/ORC timestamp scanning configurations
    - Runtime Spark version validation (3.0–3.5 only)
    
    #### 3. **Build & Integration**
    - **Maven**: New profile `hudi-0.15` with enforcer rules
      - Validates `hudiEnabled=true` property
      - Restricts Spark to 3.0–3.5
      - Pins Hudi version to 0.15.0
    
    - **Build Script**: Enhanced `auron-build.sh`
      - Added `--hudi <VERSION>` parameter
      - Version compatibility validation
      - Auto-enables `hudiEnabled` property
    
    - **CI/CD**: New workflow `.github/workflows/hudi.yml`
      - Matrix testing: Spark 3.0–3.5 × JDK 8/17/21 × Scala 2.12
      - Independent Hudi test pipeline
    
    ### Are there any user-facing changes?
    
    ## New Configuration Option
    
    ```scala
    // Enable Hudi native scan (enabled by default)
    spark.conf.set("spark.auron.enable.hudi.scan", "true")
    ```
    
    ### How was this patch tested?
    
    Add Junit Test.
    
    Signed-off-by: slfan1989 <[email protected]>
---
 .github/workflows/hudi.yml                         | 108 +++++++
 auron-build.sh                                     |  31 +-
 dev/reformat                                       |   2 +-
 pom.xml                                            |  10 +
 .../configuration/SparkAuronConfiguration.java     |   6 +
 .../apache/spark/sql/auron/AuronConverters.scala   |   5 +-
 thirdparty/auron-hudi/pom.xml                      | 116 +++++++
 ...org.apache.spark.sql.auron.AuronConvertProvider |  18 ++
 .../spark/sql/auron/hudi/HudiConvertProvider.scala |  87 ++++++
 .../spark/sql/auron/hudi/HudiScanSupport.scala     | 257 ++++++++++++++++
 .../src/test/resources/log4j2.properties           |  35 +++
 .../sql/auron/hudi/HudiScanSupportSuite.scala      | 337 +++++++++++++++++++++
 12 files changed, 1009 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/hudi.yml b/.github/workflows/hudi.yml
new file mode 100644
index 00000000..48f3831d
--- /dev/null
+++ b/.github/workflows/hudi.yml
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: Hudi
+
+on:
+  workflow_dispatch:
+  push:
+    branches:
+      - master
+      - branch-*
+  pull_request:
+    branches:
+      - master
+      - branch-*
+
+concurrency:
+  group: hudi-${{ github.workflow }}-${{ github.event.pull_request.number || 
github.ref }}
+  cancel-in-progress: true
+
+jobs:
+  test-hudi:
+    name: Test Hudi (${{ matrix.sparkver }} / JDK${{ matrix.javaver }} / 
Scala${{ matrix.scalaver }})
+    runs-on: ubuntu-24.04
+    strategy:
+      fail-fast: false
+      matrix:
+        include:
+          - sparkver: "3.0"
+            scalaver: "2.12"
+            javaver: "8"
+            hudiver: "0.15"
+          - sparkver: "3.1"
+            scalaver: "2.12"
+            javaver: "8"
+            hudiver: "0.15"
+          - sparkver: "3.2"
+            scalaver: "2.12"
+            javaver: "8"
+            hudiver: "0.15"
+          - sparkver: "3.3"
+            scalaver: "2.12"
+            javaver: "8"
+            hudiver: "0.15"
+          - sparkver: "3.4"
+            scalaver: "2.12"
+            javaver: "17"
+            hudiver: "0.15"
+          - sparkver: "3.5"
+            scalaver: "2.12"
+            javaver: "17"
+            hudiver: "0.15"
+          - sparkver: "3.5"
+            scalaver: "2.12"
+            hudiver: "0.15"
+            javaver: "21"
+
+    steps:
+      - name: Checkout Auron
+        uses: actions/checkout@v6
+
+      - name: Setup Java and Maven cache
+        uses: actions/setup-java@v5
+        with:
+          distribution: 'adopt-hotspot'
+          java-version: ${{ matrix.javaver }}
+          cache: 'maven'
+
+      - name: Build dependencies (skip tests)
+        run: >
+          ./build/mvn -B install
+          -pl thirdparty/auron-hudi
+          -am
+          -Pscala-${{ matrix.scalaver }}
+          -Pspark-${{ matrix.sparkver }}
+          -Phudi-${{ matrix.hudiver }}
+          -Prelease
+          -DskipTests
+
+      - name: Test Hudi Module
+        run: >
+          ./build/mvn -B test
+          -pl thirdparty/auron-hudi
+          -Pscala-${{ matrix.scalaver }}
+          -Pspark-${{ matrix.sparkver }}
+          -Phudi-${{ matrix.hudiver }}
+          -Prelease
+
+      - name: Upload reports
+        if: failure()
+        uses: actions/upload-artifact@v6
+        with:
+          name: auron-hudi-${{ matrix.sparkver }}-hudi${{ matrix.hudiver 
}}-jdk${{ matrix.javaver }}-test-report
+          path: thirdparty/auron-hudi/target/surefire-reports
diff --git a/auron-build.sh b/auron-build.sh
index a88ee697..25cce20a 100755
--- a/auron-build.sh
+++ b/auron-build.sh
@@ -38,6 +38,7 @@ SUPPORTED_UNIFFLE_VERSIONS=("0.10")
 SUPPORTED_PAIMON_VERSIONS=("1.2")
 SUPPORTED_FLINK_VERSIONS=("1.18")
 SUPPORTED_ICEBERG_VERSIONS=("1.10.1")
+SUPPORTED_HUDI_VERSIONS=("0.15")
 
 # -----------------------------------------------------------------------------
 # Function: print_help
@@ -64,6 +65,7 @@ print_help() {
     IFS=','; echo "  --uniffle <VERSION>      Specify Uniffle version (e.g. 
${SUPPORTED_UNIFFLE_VERSIONS[*]})"; unset IFS
     IFS=','; echo "  --paimon <VERSION>       Specify Paimon version (e.g. 
${SUPPORTED_PAIMON_VERSIONS[*]})"; unset IFS
     IFS=','; echo "  --iceberg <VERSION>      Specify Iceberg version (e.g. 
${SUPPORTED_ICEBERG_VERSIONS[*]})"; unset IFS
+    IFS=','; echo "  --hudi <VERSION>         Specify Hudi version (e.g. 
${SUPPORTED_HUDI_VERSIONS[*]})"; unset IFS
 
     echo "  -h, --help               Show this help message"
     echo
@@ -78,7 +80,8 @@ print_help() {
          "--celeborn ${SUPPORTED_CELEBORN_VERSIONS[*]: -1}" \
          "--uniffle ${SUPPORTED_UNIFFLE_VERSIONS[*]: -1}" \
          "--paimon ${SUPPORTED_PAIMON_VERSIONS[*]: -1}" \
-         "--iceberg ${SUPPORTED_ICEBERG_VERSIONS[*]: -1}"
+         "--iceberg ${SUPPORTED_ICEBERG_VERSIONS[*]: -1}" \
+         "--hudi ${SUPPORTED_HUDI_VERSIONS[*]: -1}"
     exit 0
 }
 
@@ -135,6 +138,7 @@ CELEBORN_VER=""
 UNIFFLE_VER=""
 PAIMON_VER=""
 ICEBERG_VER=""
+HUDI_VER=""
 
 # -----------------------------------------------------------------------------
 # Section: Argument Parsing
@@ -301,6 +305,27 @@ while [[ $# -gt 0 ]]; do
                 exit 1
             fi
             ;;
+        --hudi)
+            if [[ -n "$2" && "$2" != -* ]]; then
+                HUDI_VER="$2"
+                if ! check_supported_version "$HUDI_VER" 
"${SUPPORTED_HUDI_VERSIONS[@]}"; then
+                  print_invalid_option_error Hudi "$HUDI_VER" 
"${SUPPORTED_HUDI_VERSIONS[@]}"
+                fi
+                if [ -z "$SPARK_VER" ]; then
+                  echo "ERROR: Building hudi requires spark at the same time, 
and only Spark versions 3.0 to 3.5 are supported."
+                  exit 1
+                fi
+                if [ "$SPARK_VER" != "3.0" ] && [ "$SPARK_VER" != "3.1" ] && [ 
"$SPARK_VER" != "3.2" ] && [ "$SPARK_VER" != "3.3" ] && [ "$SPARK_VER" != "3.4" 
] && [ "$SPARK_VER" != "3.5" ]; then
+                  echo "ERROR: Building hudi requires spark versions are 3.0 
to 3.5."
+                  exit 1
+                fi
+                shift 2
+            else
+                IFS=','; echo "ERROR: Missing argument for --hudi," \
+                "specify one of: ${SUPPORTED_HUDI_VERSIONS[*]}" >&2; unset IFS
+                exit 1
+            fi
+            ;;
         --flinkver)
             if [[ -n "$2" && "$2" != -* ]]; then
                 FLINK_VER="$2"
@@ -437,6 +462,9 @@ fi
 if [[ -n "$ICEBERG_VER" ]]; then
     BUILD_ARGS+=("-Piceberg-$ICEBERG_VER")
 fi
+if [[ -n "$HUDI_VER" ]]; then
+    BUILD_ARGS+=("-Phudi-$HUDI_VER")
+fi
 
 # Configure Maven build threads:
 # - local builds default to Maven's single-threaded behavior
@@ -473,6 +501,7 @@ get_build_info() {
     "paimon.version") echo "${PAIMON_VER}" ;;
     "flink.version") echo "${FLINK_VER}" ;;
     "iceberg.version") echo "${ICEBERG_VER}" ;;
+    "hudi.version") echo "${HUDI_VER}" ;;
     "build.timestamp") echo "$(date -u +"%Y-%m-%dT%H:%M:%SZ")" ;;
     *) echo "" ;;
   esac
diff --git a/dev/reformat b/dev/reformat
index b82816b6..e3f639e5 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -146,7 +146,7 @@ sparkver=spark-3.5
 prepare_for_spark "${sparkver}"
 for celebornver in celeborn-0.5 celeborn-0.6
 do
-  run_maven_format -P"${sparkver}" -Pceleborn,"${celebornver}" 
-Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink-1.18 -Piceberg-1.10.1
+  run_maven_format -P"${sparkver}" -Pceleborn,"${celebornver}" 
-Puniffle,uniffle-0.10 -Ppaimon,paimon-1.2 -Pflink-1.18 -Piceberg-1.10.1 
-Phudi-0.15
 
 done
 
diff --git a/pom.xml b/pom.xml
index dce4b3cc..2c6df25e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1319,6 +1319,16 @@
       </properties>
     </profile>
 
+    <profile>
+      <id>hudi-0.15</id>
+      <modules>
+        <module>thirdparty/auron-hudi</module>
+      </modules>
+      <properties>
+        <hudiVersion>0.15.0</hudiVersion>
+      </properties>
+    </profile>
+
     <profile>
       <id>flink-1.18</id>
       <modules>
diff --git 
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
index a1bf8e3c..bc46ed31 100644
--- 
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
+++ 
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
@@ -327,6 +327,12 @@ public class SparkAuronConfiguration extends 
AuronConfiguration {
             .withDescription("Enable Iceberg scan operation conversion to 
native Auron implementations.")
             .withDefaultValue(true);
 
+    public static final ConfigOption<Boolean> ENABLE_HUDI_SCAN = new 
SQLConfOption<>(Boolean.class)
+            .withKey("auron.enable.hudi.scan")
+            .withCategory("Operator Supports")
+            .withDescription("Enable Hudi scan operation conversion to native 
Auron implementations.")
+            .withDefaultValue(true);
+
     public static final ConfigOption<Boolean> ENABLE_PROJECT = new 
SQLConfOption<>(Boolean.class)
             .withKey("auron.enable.project")
             .withCategory("Operator Supports")
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index 72c038a4..b3c55da5 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -161,7 +161,10 @@ object AuronConverters extends Logging {
       case e: BroadcastExchangeExec if enableBroadcastExchange =>
         tryConvert(e, convertBroadcastExchangeExec)
       case e: FileSourceScanExec if enableScan => // scan
-        tryConvert(e, convertFileSourceScanExec)
+        extConvertProviders.find(p => p.isEnabled && p.isSupported(e)) match {
+          case Some(provider) => tryConvert(e, provider.convert)
+          case None => tryConvert(e, convertFileSourceScanExec)
+        }
       case e: ProjectExec if enableProject => // project
         tryConvert(e, convertProjectExec)
       case e: FilterExec if enableFilter => // filter
diff --git a/thirdparty/auron-hudi/pom.xml b/thirdparty/auron-hudi/pom.xml
new file mode 100644
index 00000000..4063065f
--- /dev/null
+++ b/thirdparty/auron-hudi/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.auron</groupId>
+    <artifactId>auron-parent_${scalaVersion}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>auron-hudi_${scalaVersion}</artifactId>
+  <packaging>jar</packaging>
+  <name>Apache Auron Hudi ${hudiVersion} ${scalaVersion}</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.auron</groupId>
+      <artifactId>spark-extension_${scalaVersion}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      
<artifactId>hudi-spark${shortSparkVersion}-bundle_${scalaVersion}</artifactId>
+      <version>${hudiVersion}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scalaVersion}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scalaVersion}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scalaVersion}</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scalaVersion}</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scalaVersion}</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_${scalaVersion}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.auron</groupId>
+      <artifactId>spark-extension-shims-spark_${scalaVersion}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>${maven-enforcer-plugin.version}</version>
+        <executions>
+          <execution>
+            <id>hudi-spark-version-compat</id>
+            <goals>
+              <goal>enforce</goal>
+            </goals>
+            <configuration>
+              <rules>
+                <requireProperty>
+                  <property>shortSparkVersion</property>
+                  <regex>^(3\.0|3\.1|3\.2|3\.3|3\.4|3\.5)$</regex>
+                  <regexMessage>Hudi integration supports Spark 3.0-3.5 only. 
Current: ${shortSparkVersion}</regexMessage>
+                </requireProperty>
+                <requireProperty>
+                  <property>hudiVersion</property>
+                  <regex>^0\.15\.0$</regex>
+                  <regexMessage>Hudi integration supports only Hudi 0.15.0. 
Current: ${hudiVersion}</regexMessage>
+                </requireProperty>
+              </rules>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/thirdparty/auron-hudi/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider
 
b/thirdparty/auron-hudi/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider
new file mode 100644
index 00000000..76be6809
--- /dev/null
+++ 
b/thirdparty/auron-hudi/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.spark.sql.auron.hudi.HudiConvertProvider
diff --git 
a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala
 
b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala
new file mode 100644
index 00000000..da96d564
--- /dev/null
+++ 
b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.hudi
+
+import scala.util.Try
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.{AuronConverters, AuronConvertProvider, 
NativeConverters, Shims}
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.SparkPlan
+
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+
+class HudiConvertProvider extends AuronConvertProvider with Logging {
+
+  override def isEnabled: Boolean = {
+    val sparkVersion = org.apache.spark.SPARK_VERSION
+    val versionParts = sparkVersion.split("[\\.-]", 3)
+    val maybeMajor = versionParts.headOption.flatMap(part => 
Try(part.toInt).toOption)
+    val maybeMinor =
+      if (versionParts.length >= 2) Try(versionParts(1).toInt).toOption else 
None
+    val supported = (for {
+      major <- maybeMajor
+      minor <- maybeMinor
+    } yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false)
+    SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported
+  }
+
+  override def isSupported(exec: SparkPlan): Boolean = {
+    exec match {
+      case scan: FileSourceScanExec =>
+        // Only handle Hudi-backed file scans; other scans fall through.
+        HudiScanSupport.isSupported(scan)
+      case _ => false
+    }
+  }
+
+  override def convert(exec: SparkPlan): SparkPlan = {
+    exec match {
+      case scan: FileSourceScanExec if HudiScanSupport.isSupported(scan) =>
+        HudiScanSupport.fileFormat(scan) match {
+          case Some(HudiScanSupport.ParquetFormat) =>
+            if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get()) {
+              return exec
+            }
+            // Hudi falls back to Spark when timestamp scanning is disabled.
+            if (!SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.get()) {
+              if (scan.requiredSchema.exists(e =>
+                  NativeConverters.existTimestampType(e.dataType))) {
+                return exec
+              }
+            }
+            logDebug(s"Applying native parquet scan for Hudi: 
${scan.relation.location}")
+            
AuronConverters.addRenameColumnsExec(Shims.get.createNativeParquetScanExec(scan))
+          case Some(HudiScanSupport.OrcFormat) =>
+            if (!SparkAuronConfiguration.ENABLE_SCAN_ORC.get()) {
+              return exec
+            }
+            // ORC follows the same timestamp fallback rule as Parquet.
+            if (!SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.get()) {
+              if (scan.requiredSchema.exists(e =>
+                  NativeConverters.existTimestampType(e.dataType))) {
+                return exec
+              }
+            }
+            logDebug(s"Applying native ORC scan for Hudi: 
${scan.relation.location}")
+            
AuronConverters.addRenameColumnsExec(Shims.get.createNativeOrcScanExec(scan))
+          case None => exec
+        }
+      case _ => exec
+    }
+  }
+}
diff --git 
a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala
 
b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala
new file mode 100644
index 00000000..63488d30
--- /dev/null
+++ 
b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.hudi
+
+import java.net.URI
+import java.util.{Locale, Properties}
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+
+object HudiScanSupport extends Logging {
+  sealed trait HudiFileFormat
+  case object ParquetFormat extends HudiFileFormat
+  case object OrcFormat extends HudiFileFormat
+
+  private val hudiParquetFileFormatSuffix = "HoodieParquetFileFormat"
+  private val newHudiParquetFileFormatSuffix = "NewHoodieParquetFileFormat"
+  private val hudiOrcFileFormatSuffix = "HoodieOrcFileFormat"
+  private val newHudiOrcFileFormatSuffix = "NewHoodieOrcFileFormat"
+  private val morTableTypes = Set("merge_on_read", "mor")
+  private val hudiTableTypeKeys = Seq(
+    "hoodie.datasource.write.table.type",
+    "hoodie.datasource.read.table.type",
+    "hoodie.table.type")
+  private val hudiBaseFileFormatKeys = Seq(
+    "hoodie.table.base.file.format",
+    "hoodie.datasource.write.base.file.format",
+    "hoodie.datasource.write.storage.type")
+
+  def fileFormat(scan: FileSourceScanExec): Option[HudiFileFormat] = {
+    val fileFormatName = scan.relation.fileFormat.getClass.getName
+    val fromClass = fileFormat(fileFormatName)
+    if (fromClass.nonEmpty) {
+      return fromClass
+    }
+    // Spark may report generic Orc/Parquet formats for Hudi; use metadata 
fallback
+    // only when the underlying file index indicates a Hudi table.
+    fileFormatFromMeta(scan, catalogTable(scan.relation), fileFormatName)
+  }
+
+  private[hudi] def fileFormat(fileFormatName: String): Option[HudiFileFormat] 
= {
+    logDebug(s"Hudi fileFormat resolved to: ${fileFormatName}")
+    if (fileFormatName.endsWith(newHudiParquetFileFormatSuffix) ||
+      fileFormatName.endsWith(newHudiOrcFileFormatSuffix)) {
+      return None
+    }
+    if (fileFormatName.endsWith(hudiParquetFileFormatSuffix)) {
+      return Some(ParquetFormat)
+    }
+    if (fileFormatName.endsWith(hudiOrcFileFormatSuffix)) {
+      return Some(OrcFormat)
+    }
+    None
+  }
+
+  def isSupported(scan: FileSourceScanExec): Boolean =
+    isSupported(fileFormat(scan), scan.relation.options, 
catalogTable(scan.relation))
+
+  private[hudi] def isSupported(fileFormatName: String, options: Map[String, 
String]): Boolean = {
+    isSupported(fileFormat(fileFormatName), options, None)
+  }
+
+  private[hudi] def isSupported(
+      fileFormat: Option[HudiFileFormat],
+      options: Map[String, String],
+      catalogTable: Option[CatalogTable]): Boolean = {
+    if (fileFormat.isEmpty) {
+      return false
+    }
+    if (hasTimeTravel(options)) {
+      return false
+    }
+
+    val tableType = tableTypeFromOptions(options)
+      .orElse(tableTypeFromCatalog(catalogTable))
+      .orElse(tableTypeFromMeta(options))
+      .map(_.toLowerCase(Locale.ROOT))
+
+    logDebug(s"Hudi tableType resolved to: ${tableType.getOrElse("unknown")}")
+
+    // Only support basic COW tables for the base version.
+    !tableType.exists(morTableTypes.contains)
+  }
+
+  private def tableTypeFromOptions(options: Map[String, String]): 
Option[String] = {
+    hudiTableTypeKeys
+      .flatMap(key => options.get(key))
+      .headOption
+  }
+
+  private def baseFileFormatFromOptions(options: Map[String, String]): 
Option[String] = {
+    hudiBaseFileFormatKeys
+      .flatMap(key => options.get(key))
+      .headOption
+  }
+
+  private def tableTypeFromMeta(options: Map[String, String]): Option[String] 
= {
+    val basePath = options.get("path").map(normalizePath)
+    basePath.flatMap { path =>
+      try {
+        val hadoopConf = SparkSession.active.sessionState.newHadoopConf()
+        val base = new Path(path)
+        val fs = base.getFileSystem(hadoopConf)
+        val propsPath = new Path(base, ".hoodie/hoodie.properties")
+        if (!fs.exists(propsPath)) {
+          if (log.isDebugEnabled()) {
+            logDebug(s"Hudi table properties not found at: $propsPath")
+          }
+          return None
+        }
+        val in = fs.open(propsPath)
+        try {
+          val props = new Properties()
+          props.load(in)
+          Option(props.getProperty("hoodie.table.type"))
+        } finally {
+          in.close()
+        }
+      } catch {
+        case t: Throwable =>
+          if (log.isDebugEnabled()) {
+            logDebug(s"Failed to load hudi table type from $path", t)
+          }
+          None
+      }
+    }
+  }
+
+  private def baseFileFormatFromMeta(options: Map[String, String]): 
Option[String] = {
+    val basePath = options.get("path").map(normalizePath)
+    basePath.flatMap { path =>
+      try {
+        val hadoopConf = SparkSession.active.sessionState.newHadoopConf()
+        val base = new Path(path)
+        val fs = base.getFileSystem(hadoopConf)
+        val propsPath = new Path(base, ".hoodie/hoodie.properties")
+        if (!fs.exists(propsPath)) {
+          if (log.isDebugEnabled()) {
+            logDebug(s"Hudi table properties not found at: $propsPath")
+          }
+          return None
+        }
+        val in = fs.open(propsPath)
+        try {
+          val props = new Properties()
+          props.load(in)
+          Option(props.getProperty("hoodie.table.base.file.format"))
+        } finally {
+          in.close()
+        }
+      } catch {
+        case t: Throwable =>
+          if (log.isDebugEnabled()) {
+            logDebug(s"Failed to load hudi base file format from $path", t)
+          }
+          None
+      }
+    }
+  }
+
+  private def baseFileFormatFromCatalog(catalogTable: Option[CatalogTable]): 
Option[String] = {
+    catalogTable.flatMap { table =>
+      val props = table.properties ++ table.storage.properties
+      hudiBaseFileFormatKeys.flatMap(props.get).headOption
+    }
+  }
+
+  private def fileFormatFromMeta(
+      scan: FileSourceScanExec,
+      catalogTable: Option[CatalogTable],
+      fileFormatName: String): Option[HudiFileFormat] = {
+    // Avoid treating non-Hudi tables as Hudi when Spark reports generic 
formats.
+    if (!isHudiFileIndex(scan.relation.location)) {
+      return None
+    }
+    val baseFormat = baseFileFormatFromOptions(scan.relation.options)
+      .orElse(baseFileFormatFromCatalog(catalogTable))
+      .orElse(baseFileFormatFromMeta(scan.relation.options))
+      .map(_.toLowerCase(Locale.ROOT))
+    baseFormat.flatMap {
+      case "orc" if fileFormatName.contains("OrcFileFormat") => Some(OrcFormat)
+      case "parquet" if fileFormatName.contains("ParquetFileFormat") => 
Some(ParquetFormat)
+      case _ => None
+    }
+  }
+
+  private def tableTypeFromCatalog(catalogTable: Option[CatalogTable]): 
Option[String] = {
+    catalogTable.flatMap { table =>
+      val props = table.properties ++ table.storage.properties
+      hudiTableTypeKeys.flatMap(props.get).headOption
+    }
+  }
+
+  private def catalogTable(relation: HadoopFsRelation): Option[CatalogTable] = 
{
+    val method = relation.getClass.getMethods.find(_.getName == "catalogTable")
+    method.flatMap { m =>
+      try {
+        m.invoke(relation) match {
+          case opt: Option[_] => opt.asInstanceOf[Option[CatalogTable]]
+          case table: CatalogTable => Some(table)
+          case _ => None
+        }
+      } catch {
+        case _: Throwable => None
+      }
+    }
+  }
+
+  private def isHudiFileIndex(fileIndex: AnyRef): Boolean = {
+    var current: Class[_] = fileIndex.getClass
+    while (current != null) {
+      if (current.getName.endsWith("HoodieFileIndex")) {
+        return true
+      }
+      current = current.getSuperclass
+    }
+    false
+  }
+
+  private def hasTimeTravel(options: Map[String, String]): Boolean = {
+    val keys = options.keys.map(_.toLowerCase(Locale.ROOT))
+    keys.exists {
+      case "as.of.instant" => true
+      case "as.of.timestamp" => true
+      case "hoodie.datasource.read.as.of.instant" => true
+      case "hoodie.datasource.read.as.of.timestamp" => true
+      case _ => false
+    }
+  }
+
+  private def normalizePath(rawPath: String): String = {
+    try {
+      val uri = new URI(rawPath)
+      if (uri.getScheme == null) rawPath else uri.toString
+    } catch {
+      case _: Throwable => rawPath
+    }
+  }
+}
diff --git a/thirdparty/auron-hudi/src/test/resources/log4j2.properties 
b/thirdparty/auron-hudi/src/test/resources/log4j2.properties
new file mode 100644
index 00000000..276d5511
--- /dev/null
+++ b/thirdparty/auron-hudi/src/test/resources/log4j2.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+rootLogger.level = info
+rootLogger.appenderRef.file.ref = File
+
+#File Appender
+appender.file.type = File
+appender.file.name = File
+appender.file.fileName = target/auron-hudi-tests.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex
+
+#Console Appender
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.target = SYSTEM_OUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss.SSS} %p %c: 
%maxLen{%m}{512}%n%ex{8}%n
+appender.console.filter.threshold.type = ThresholdFilter
+appender.console.filter.threshold.level = warn
diff --git 
a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala
 
b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala
new file mode 100644
index 00000000..a388507c
--- /dev/null
+++ 
b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.hudi
+
+import java.io.File
+import java.io.FileInputStream
+import java.nio.file.Files
+import java.util.Properties
+
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.auron.plan.NativeOrcScanBase
+import org.apache.spark.sql.execution.auron.plan.NativeParquetScanBase
+import org.apache.spark.sql.test.SharedSparkSession
+
+import org.apache.auron.util.SparkVersionUtil
+
+class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession {
+
+  private lazy val suiteWorkspace: File = {
+    val base = new File("target/tmp")
+    base.mkdirs()
+    Files.createTempDirectory(base.toPath, "auron-hudi-tests").toFile
+  }
+  private lazy val warehouseDir: String =
+    new File(suiteWorkspace, "spark-warehouse").getAbsolutePath
+
+  private lazy val hudiCatalogClassAvailable: Boolean = {
+    try {
+      Class.forName(
+        "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
+        false,
+        Thread.currentThread().getContextClassLoader)
+      true
+    } catch {
+      case _: ClassNotFoundException => false
+    }
+  }
+
+  override protected def sparkConf: SparkConf = {
+    if (!suiteWorkspace.exists()) {
+      suiteWorkspace.mkdirs()
+    }
+    new File(warehouseDir).mkdirs()
+    val extraJavaOptions =
+      "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+        "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " +
+        "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " +
+        "--add-opens=java.base/java.io=ALL-UNNAMED " +
+        "--add-opens=java.base/java.net=ALL-UNNAMED " +
+        "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+        "--add-opens=java.base/java.util=ALL-UNNAMED " +
+        "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " +
+        "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED " +
+        "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED " +
+        "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " +
+        "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED " +
+        "--add-opens=java.base/sun.security.action=ALL-UNNAMED " +
+        "--add-opens=java.base/sun.security.tools.keytool=ALL-UNNAMED " +
+        "--add-opens=java.base/sun.security.x509=ALL-UNNAMED " +
+        "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED " +
+        "-Djdk.reflect.useDirectMethodHandle=false " +
+        "-Dio.netty.tryReflectionSetAccessible=true"
+    val conf = super.sparkConf
+      .set(
+        "spark.sql.extensions",
+        "org.apache.spark.sql.auron.AuronSparkSessionExtension," +
+          "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+      .set(
+        "spark.shuffle.manager",
+        "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
+      .set("spark.auron.enable", "true")
+      .set("spark.sql.warehouse.dir", warehouseDir)
+      .set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.driver.extraJavaOptions", extraJavaOptions)
+      .set("spark.executor.extraJavaOptions", extraJavaOptions)
+      // Disable native timestamp scan to validate fallback behavior in tests.
+      .set("spark.auron.enable.scan.parquet.timestamp", "false")
+      .set("spark.auron.ui.enabled", "false")
+      .set("spark.ui.enabled", "false")
+    if (hudiCatalogClassAvailable) {
+      conf.set(
+        "spark.sql.catalog.spark_catalog",
+        "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+    } else {
+      info(
+        "Hudi HoodieCatalog not found on the classpath; leaving 
spark.sql.catalog.spark_catalog as default.")
+    }
+    conf
+  }
+
+  private def withTable(tableName: String)(f: => Unit): Unit = {
+    try {
+      f
+    } finally {
+      spark.sql(s"DROP TABLE IF EXISTS $tableName")
+    }
+  }
+
+  private def assertHasNativeParquetScan(plan: 
org.apache.spark.sql.execution.SparkPlan): Unit = {
+    assert(plan.find(_.isInstanceOf[NativeParquetScanBase]).nonEmpty)
+  }
+
+  private def assertHasNativeOrcScan(plan: 
org.apache.spark.sql.execution.SparkPlan): Unit = {
+    assert(plan.find(_.isInstanceOf[NativeOrcScanBase]).nonEmpty)
+  }
+
+  private def assertNoNativeParquetScan(df: org.apache.spark.sql.DataFrame): 
Unit = {
+    assert(df.queryExecution.executedPlan.collect { case _: 
NativeParquetScanBase =>
+      true
+    }.isEmpty)
+  }
+
+  private def logFileFormats(df: org.apache.spark.sql.DataFrame): Unit = {
+    val plan = materializedPlan(df)
+    val nodes = plan.collect { case p => p }
+    val scans = plan.collect { case scan: 
org.apache.spark.sql.execution.FileSourceScanExec =>
+      scan
+    }
+    val formats = scans.map(_.relation.fileFormat.getClass.getName)
+    val nativeScans = plan.collect {
+      case scan: NativeParquetScanBase => scan.nodeName
+      case scan: NativeOrcScanBase => scan.nodeName
+    }
+    val nativeScanNames = nodes
+      .map(_.nodeName)
+      .filter(name => name.contains("NativeParquetScan") || 
name.contains("NativeOrcScan"))
+    if (formats.nonEmpty) {
+      info(s"Detected file formats: ${formats.distinct.mkString(", ")}")
+      scans.foreach { scan =>
+        info(
+          s"Scan requiredSchema: ${scan.requiredSchema.simpleString}, " +
+            s"options: ${scan.relation.options}")
+      }
+    }
+    if (nativeScans.nonEmpty) {
+      info(s"Detected native scans: ${nativeScans.distinct.mkString(", ")}")
+    }
+    if (nativeScanNames.nonEmpty && nativeScans.isEmpty) {
+      info(s"Detected native scans (by nodeName): 
${nativeScanNames.distinct.mkString(", ")}")
+    }
+    if (formats.isEmpty && nativeScans.isEmpty) {
+      info(s"No FileSourceScanExec/Native scan found. Plan: 
${plan.simpleString(2)}")
+    }
+  }
+
+  private def materializedPlan(
+      df: org.apache.spark.sql.DataFrame): 
org.apache.spark.sql.execution.SparkPlan = {
+    // Ensure we inspect the post-AQE plan when adaptive execution is enabled.
+    df.queryExecution.executedPlan match {
+      case adaptive: 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec =>
+        adaptive.executedPlan
+      case other => other
+    }
+  }
+
+  private def hudiBaseFileFormat(tableName: String): Option[String] = {
+    // Read the base file format from Hudi table properties for assertions.
+    val propsFile = new File(new File(warehouseDir, tableName), 
".hoodie/hoodie.properties")
+    if (!propsFile.exists()) {
+      return None
+    }
+    val props = new Properties()
+    val in = new FileInputStream(propsFile)
+    try {
+      props.load(in)
+    } finally {
+      in.close()
+    }
+    Option(props.getProperty("hoodie.table.base.file.format"))
+  }
+
+  private def assumeSparkAtLeast(version: String): Unit = {
+    val current = SparkVersionUtil.SPARK_RUNTIME_VERSION
+    assume(current >= version, s"Requires Spark >= $version, current Spark 
$current")
+  }
+
+  test("hudi fileFormat detects parquet and orc classes") {
+    assert(
+      HudiScanSupport
+        
.fileFormat("org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat")
+        .contains(HudiScanSupport.ParquetFormat))
+    assert(HudiScanSupport
+      .fileFormat(
+        
"org.apache.spark.sql.execution.datasources.parquet.Spark35LegacyHoodieParquetFileFormat")
+      .contains(HudiScanSupport.ParquetFormat))
+    assert(
+      HudiScanSupport
+        
.fileFormat("org.apache.spark.sql.execution.datasources.orc.HoodieOrcFileFormat")
+        .contains(HudiScanSupport.OrcFormat))
+  }
+
+  test("hudi fileFormat rejects NewHoodie formats") {
+    assert(
+      HudiScanSupport
+        .fileFormat(
+          
"org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat")
+        .isEmpty)
+    assert(
+      HudiScanSupport
+        
.fileFormat("org.apache.spark.sql.execution.datasources.orc.NewHoodieOrcFileFormat")
+        .isEmpty)
+  }
+
+  test("hudi isSupported rejects MOR table types") {
+    val options = Map("hoodie.datasource.write.table.type" -> "MERGE_ON_READ")
+    assert(
+      !HudiScanSupport.isSupported(
+        
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
+        options))
+  }
+
+  test("hudi isSupported allows default COW") {
+    assert(
+      HudiScanSupport.isSupported(
+        
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat",
+        Map.empty))
+  }
+
+  test("hudi isSupported rejects non-Hudi formats") {
+    assert(
+      !HudiScanSupport.isSupported(
+        "org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat",
+        Map.empty))
+  }
+
+  test("hudi: time travel falls back to Spark") {
+    assumeSparkAtLeast("3.2")
+    withTable("hudi_tm") {
+      spark.sql("create table hudi_tm (id int, name string) using hudi")
+      spark.sql("insert into hudi_tm values (1, 'v1'), (2, 'v2')")
+      spark.sql("insert into hudi_tm values (3, 'v3'), (4, 'v4')")
+      val value = spark
+        .sql("select _hoodie_commit_time from hudi_tm")
+        .collectAsList()
+        .get(0)
+        .getAs[String](0)
+
+      val df1 = spark.sql(s"select id, name from hudi_tm timestamp AS OF 
$value")
+      // Time travel uses metadata and should stay on Spark path.
+      logFileFormats(df1)
+      val rows1 = df1.collect().toSeq
+      assert(rows1 == Seq(Row(1, "v1"), Row(2, "v2")))
+      assertNoNativeParquetScan(df1)
+
+      val df2 = spark.sql(s"select name from hudi_tm timestamp AS OF $value 
where id = 2")
+      // Filters shouldn't affect fallback for time travel queries.
+      logFileFormats(df2)
+      val rows2 = df2.collect().toSeq
+      assert(rows2 == Seq(Row("v2")))
+      assertNoNativeParquetScan(df2)
+    }
+  }
+
+  test("hudi: timestamp column falls back when native timestamp disabled") {
+    withTable("hudi_ts") {
+      spark.sql("create table hudi_ts (id int, ts timestamp) using hudi")
+      spark.sql("insert into hudi_ts values (1, timestamp('2026-01-01 
00:00:00'))")
+      val df = spark.sql("select * from hudi_ts")
+      // Timestamp columns are not supported when native timestamp scanning is 
disabled.
+      logFileFormats(df)
+      df.collect()
+      assert(df.queryExecution.executedPlan.collect { case _: 
NativeParquetScanBase =>
+        true
+      }.isEmpty)
+    }
+  }
+
+  test("hudi: ORC table scan converts to native (provider)") {
+    withTable("hudi_orc") {
+      spark.sql("""create table hudi_orc (id int, name string)
+          |using hudi
+          |tblproperties (
+          |  'hoodie.datasource.write.table.type' = 'cow',
+          |  'hoodie.datasource.write.storage.type' = 'ORC',
+          |  'hoodie.datasource.write.base.file.format' = 'ORC',
+          |  'hoodie.table.base.file.format' = 'ORC'
+          |)""".stripMargin)
+      spark.sql("insert into hudi_orc values (1, 'v1'), (2, 'v2')")
+      val baseFormat = hudiBaseFileFormat("hudi_orc").getOrElse("unknown")
+      info(s"Hudi base file format: $baseFormat")
+      assume(
+        baseFormat.equalsIgnoreCase("orc"),
+        s"Expected ORC base file format but found: $baseFormat")
+      val df = spark.sql("select id, name from hudi_orc where id = 2")
+      // Validate provider conversion even if Spark reports generic 
OrcFileFormat.
+      logFileFormats(df)
+      val rows = df.collect().toSeq
+      assert(rows == Seq(Row(2, "v2")))
+      val scan = df.queryExecution.sparkPlan.collectFirst {
+        case s: org.apache.spark.sql.execution.FileSourceScanExec => s
+      }
+      assert(scan.isDefined)
+      val provider = new HudiConvertProvider
+      assert(provider.isSupported(scan.get))
+      val converted = provider.convert(scan.get)
+      assertHasNativeOrcScan(converted)
+    }
+  }
+
+  test("hudi: Parquet table scan converts to native (provider)") {
+    withTable("hudi_native_simple") {
+      spark.sql("create table hudi_native_simple (id int, name string) using 
hudi")
+      spark.sql("insert into hudi_native_simple values (1, 'v1'), (2, 'v2')")
+      val df = spark.sql("select id, name from hudi_native_simple order by id")
+      df.explain(true)
+      // Validate provider conversion and correctness for the common COW 
parquet path.
+      logFileFormats(df)
+      val rows = df.collect().toSeq
+      assert(rows == Seq(Row(1, "v1"), Row(2, "v2")))
+      val scan = df.queryExecution.sparkPlan.collectFirst {
+        case s: org.apache.spark.sql.execution.FileSourceScanExec => s
+      }
+      assert(scan.isDefined)
+      val provider = new HudiConvertProvider
+      assert(provider.isSupported(scan.get))
+      val converted = provider.convert(scan.get)
+      assertHasNativeParquetScan(converted)
+    }
+  }
+}

Reply via email to