This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cacbb82254c [HUDI-6658] Inject filters for incremental query  (#10225)
cacbb82254c is described below

commit cacbb82254c840b96f879c8a577a6e91aff3f57e
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Dec 12 00:08:23 2023 -0500

    [HUDI-6658] Inject filters for incremental query  (#10225)
    
    Add incremental filters to the query plan
    
    Also fix some tests that use partition path as the precombine field
    
    Incremental queries will now work the new filegroup reader
---
 .../spark/sql/HoodieCatalystPlansUtils.scala       |  11 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |  14 +-
 .../scala/org/apache/hudi/HoodieCDCFileIndex.scala |   5 +
 .../hudi/HoodieHadoopFsRelationFactory.scala       |   6 +-
 .../apache/hudi/HoodieIncrementalFileIndex.scala   |   2 +-
 .../sql/FileFormatUtilsForFileGroupReader.scala    | 128 +++++++++++
 ...odieFileGroupReaderBasedParquetFileFormat.scala |  36 ++-
 .../parquet/NewHoodieParquetFileFormat.scala       |   2 +
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |   2 +-
 .../TestGlobalIndexEnableUpdatePartitions.java     |   6 +
 .../TestIncrementalReadWithFullTableScan.scala     |  27 +--
 .../hudi/functional/TestSparkDataSource.scala      |   4 +
 .../hudi/procedure/TestClusteringProcedure.scala   | 255 +++++++++++----------
 .../procedure/TestHoodieLogFileProcedure.scala     |  11 +-
 .../spark/sql/HoodieSpark2CatalystPlanUtils.scala  |  27 ++-
 .../spark/sql/HoodieSpark3CatalystPlanUtils.scala  |  25 +-
 .../spark/sql/HoodieSpark30CatalystPlanUtils.scala |   4 +-
 .../spark/sql/HoodieSpark31CatalystPlanUtils.scala |   4 +-
 .../spark/sql/HoodieSpark32CatalystPlanUtils.scala |   4 +-
 .../spark/sql/HoodieSpark33CatalystPlanUtils.scala |   4 +-
 .../spark/sql/HoodieSpark34CatalystPlanUtils.scala |   4 +-
 .../spark/sql/HoodieSpark35CatalystPlanUtils.scala |   4 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |   4 +-
 .../utilities/sources/TestHoodieIncrSource.java    |   6 +
 24 files changed, 372 insertions(+), 223 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index 64ee645ba0f..b9110f1ed93 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -102,9 +102,11 @@ trait HoodieCatalystPlansUtils {
    * Spark requires file formats to append the partition path fields to the 
end of the schema.
    * For tables where the partition path fields are not at the end of the 
schema, we don't want
    * to return the schema in the wrong order when they do a query like "select 
*". To fix this
-   * behavior, we apply a projection onto FileScan when the file format is 
NewHudiParquetFileFormat
+   * behavior, we apply a projection onto FileScan when the file format has 
HoodieFormatTrait
+   *
+   * Additionally, incremental queries require filters to be added to the plan
    */
-  def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan
+  def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan
 
   /**
    * Decomposes [[InsertIntoStatement]] into its arguments allowing to 
accommodate for API
@@ -140,4 +142,9 @@ trait HoodieCatalystPlansUtils {
   def failAnalysisForMIT(a: Attribute, cols: String): Unit = {}
 
   def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, 
condition: Option[Expression], hint: String): LogicalPlan
+
+  /**
+   * true if both plans produce the same attributes in the the same order
+   */
+  def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 168502b3f08..ac8286b1bde 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -285,7 +285,12 @@ object DefaultSource {
             resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
           }
         case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-          new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
+          if (fileFormatUtils.isDefined) {
+            new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+          } else {
+            new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
+          }
 
         case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
           if (fileFormatUtils.isDefined) {
@@ -304,7 +309,12 @@ object DefaultSource {
           }
 
         case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-          MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, 
userSchema)
+          if (fileFormatUtils.isDefined) {
+            new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
+              sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
+          } else {
+            MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, 
userSchema)
+          }
 
         case (_, _, true) =>
           if (fileFormatUtils.isDefined) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
index 37e42bc56f3..959f7d2637e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
GenericInternalRow}
 import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, 
NoopCache, PartitionDirectory}
+import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 import scala.jdk.CollectionConverters.{asScalaBufferConverter, 
mapAsScalaMapConverter}
@@ -78,4 +79,8 @@ class HoodieCDCFileIndex (override val spark: SparkSession,
       new Path(fileGroupId.getPartitionPath, fileGroupId.getFileId).toString
     }.toArray
   }
+
+  override def getRequiredFilters: Seq[Filter] = {
+    Seq.empty
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index 88f7224cff0..aee15189545 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -281,7 +281,7 @@ class 
HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val sqlContex
     sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), true, true)
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled && !isBootstrap) {
+    if (fileGroupReaderEnabled) {
       new HoodieFileGroupReaderBasedParquetFileFormat(
         tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
@@ -318,7 +318,7 @@ class 
HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext:
     shouldEmbedFileSlices = true)
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled && !isBootstrap) {
+    if (fileGroupReaderEnabled) {
       new HoodieFileGroupReaderBasedParquetFileFormat(
         tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
@@ -349,7 +349,7 @@ class 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val sqlContex
     sparkSession, metaClient, schemaSpec, options, 
FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled && !isBootstrap) {
+    if (fileGroupReaderEnabled) {
       new HoodieFileGroupReaderBasedParquetFileFormat(
         tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
index 7e765b4d08f..d541e73d8da 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 import java.util.stream.Collectors
-import scala.jdk.CollectionConverters.asScalaBufferConverter
+import scala.collection.JavaConverters._
 
 class HoodieIncrementalFileIndex(override val spark: SparkSession,
                                  override val metaClient: 
HoodieTableMetaClient,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
new file mode 100644
index 00000000000..0587f135467
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/FileFormatUtilsForFileGroupReader.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.{HoodieCDCFileIndex, SparkAdapterSupport, 
SparkHoodieTableFileIndex}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeSet, Contains, EndsWith, EqualNullSafe, EqualTo, Expression, 
GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, 
LessThanOrEqual, Literal, NamedExpression, Not, Or, StartsWith}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+import scala.util.Try
+
+object FileFormatUtilsForFileGroupReader extends SparkAdapterSupport {
+
+  def applyNewFileFormatChanges(scanOperation: LogicalPlan, logicalRelation: 
LogicalPlan, fs: HadoopFsRelation): LogicalPlan = {
+    val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait]
+    ff.isProjected = true
+    val tableSchema = fs.location match {
+      case index: HoodieCDCFileIndex => index.cdcRelation.schema
+      case index: SparkHoodieTableFileIndex => index.schema
+    }
+    val resolvedSchema = logicalRelation.resolve(tableSchema, 
fs.sparkSession.sessionState.analyzer.resolver)
+    val unfilteredPlan = if (!fs.partitionSchema.fields.isEmpty && 
sparkAdapter.getCatalystPlanUtils.produceSameOutput(scanOperation, 
logicalRelation)) {
+      Project(resolvedSchema, scanOperation)
+    } else {
+      scanOperation
+    }
+    applyFiltersToPlan(unfilteredPlan, tableSchema, resolvedSchema, 
ff.getRequiredFilters)
+  }
+
+  /**
+   * adapted from 
https://github.com/apache/spark/blob/20df062d85e80422a55afae80ddbf2060f26516c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/StructFilters.scala
+   */
+  def applyFiltersToPlan(plan: LogicalPlan, tableSchema: StructType, 
resolvedSchema: Seq[Attribute], filters: 
Seq[org.apache.spark.sql.sources.Filter]): LogicalPlan = {
+
+    def filterToExpression(
+                            filter: sources.Filter,
+                            toRef: String => Option[NamedExpression]): 
Option[Expression] = {
+      def zipAttributeAndValue(name: String, value: Any): 
Option[(NamedExpression, Literal)] = {
+        zip(toRef(name), toLiteral(value))
+      }
+
+      def translate(filter: sources.Filter): Option[Expression] = filter match 
{
+        case sources.And(left, right) =>
+          zip(translate(left), translate(right)).map(And.tupled)
+        case sources.Or(left, right) =>
+          zip(translate(left), translate(right)).map(Or.tupled)
+        case sources.Not(child) =>
+          translate(child).map(Not)
+        case sources.EqualTo(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(EqualTo.tupled)
+        case sources.EqualNullSafe(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(EqualNullSafe.tupled)
+        case sources.IsNull(attribute) =>
+          toRef(attribute).map(IsNull)
+        case sources.IsNotNull(attribute) =>
+          toRef(attribute).map(IsNotNull)
+        case sources.In(attribute, values) =>
+          val literals = values.toSeq.flatMap(toLiteral)
+          if (literals.length == values.length) {
+            toRef(attribute).map(In(_, literals))
+          } else {
+            None
+          }
+        case sources.GreaterThan(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(GreaterThan.tupled)
+        case sources.GreaterThanOrEqual(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(GreaterThanOrEqual.tupled)
+        case sources.LessThan(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(LessThan.tupled)
+        case sources.LessThanOrEqual(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(LessThanOrEqual.tupled)
+        case sources.StringContains(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(Contains.tupled)
+        case sources.StringStartsWith(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(StartsWith.tupled)
+        case sources.StringEndsWith(attribute, value) =>
+          zipAttributeAndValue(attribute, value).map(EndsWith.tupled)
+        /* Not supported in spark2. If needed, we will need to create separate 
spark 2 and 3 implementations
+      case sources.AlwaysTrue() =>
+        Some(Literal(true, BooleanType))
+      case sources.AlwaysFalse() =>
+        Some(Literal(false, BooleanType))
+         */
+      }
+
+      translate(filter)
+    }
+
+    def zip[A, B](a: Option[A], b: Option[B]): Option[(A, B)] = {
+      a.zip(b).headOption
+    }
+
+    def toLiteral(value: Any): Option[Literal] = {
+      Try(Literal(value)).toOption
+    }
+
+    def toRef(attr: String): Option[NamedExpression] = {
+      tableSchema.getFieldIndex(attr).map { index =>
+        resolvedSchema(index)
+      }
+    }
+
+    val expressionFilters = filters.map(f => filterToExpression(f, n => 
toRef(n)).get)
+    if (expressionFilters.nonEmpty) {
+      Filter(expressionFilters.reduceLeft(And), plan)
+    } else {
+      plan
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 3565c70e5eb..57fdaf80e74 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -47,6 +47,7 @@ trait HoodieFormatTrait {
 
   // Used so that the planner only projects once and does not stack overflow
   var isProjected: Boolean = false
+  def getRequiredFilters: Seq[Filter]
 }
 
 /**
@@ -65,6 +66,8 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: 
HoodieTableState,
                                                   requiredFilters: Seq[Filter]
                                            ) extends ParquetFileFormat with 
SparkAdapterSupport with HoodieFormatTrait {
 
+  def getRequiredFilters: Seq[Filter] = requiredFilters
+
   /**
    * Support batch needs to remain consistent, even if one side of a bootstrap 
merge can support
    * while the other side can't
@@ -104,8 +107,8 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
     val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
     val (baseFileReader, preMergeBaseFileReader, readerMaps, cdcFileReader) = 
buildFileReaders(
-      spark, dataSchema, partitionSchema, if (isIncremental) 
requiredSchemaWithMandatory else requiredSchema,
-      filters, options, augmentedHadoopConf, requiredSchemaWithMandatory, 
requiredWithoutMeta, requiredMeta)
+      spark, dataSchema, partitionSchema, requiredSchema, filters, options, 
augmentedHadoopConf,
+      requiredSchemaWithMandatory, requiredWithoutMeta, requiredMeta)
 
     val requestedAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, 
sanitizedTableName)
     val dataAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName)
@@ -242,25 +245,20 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       metaFields.find(f => f.name == name)
     }
 
-    // If not MergeOnRead or if projection is compatible
-    if (isIncremental) {
-      StructType(dataSchema.toArray ++ partitionSchema.fields)
-    } else {
-      val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
-      for (field <- mandatoryFields) {
-        if (requiredSchema.getFieldIndex(field).isEmpty) {
-          // Support for nested fields
-          val fieldParts = field.split("\\.")
-          val fieldToAdd = findNestedField(dataSchema, fieldParts)
-            .orElse(findNestedField(partitionSchema, fieldParts))
-            .orElse(findMetaField(field))
-            .getOrElse(throw new IllegalArgumentException(s"Field $field does 
not exist in the table schema"))
-          added.append(fieldToAdd)
-        }
+    val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
+    for (field <- mandatoryFields) {
+      if (requiredSchema.getFieldIndex(field).isEmpty) {
+        // Support for nested fields
+        val fieldParts = field.split("\\.")
+        val fieldToAdd = findNestedField(dataSchema, fieldParts)
+          .orElse(findNestedField(partitionSchema, fieldParts))
+          .orElse(findMetaField(field))
+          .getOrElse(throw new IllegalArgumentException(s"Field $field does 
not exist in the table schema"))
+        added.append(fieldToAdd)
       }
-      val addedFields = StructType(added.toArray)
-      StructType(requiredSchema.toArray ++ addedFields.fields)
     }
+    val addedFields = StructType(added.toArray)
+    StructType(requiredSchema.toArray ++ addedFields.fields)
   }
 
   protected def buildFileReaders(sparkSession: SparkSession, dataSchema: 
StructType, partitionSchema: StructType,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index 44381a5db4a..0751d99f79c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -56,6 +56,8 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
                                  requiredFilters: Seq[Filter]
                                 ) extends ParquetFileFormat with 
SparkAdapterSupport with HoodieFormatTrait {
 
+  def getRequiredFilters: Seq[Filter] = requiredFilters
+
   override def isSplitable(sparkSession: SparkSession,
                            options: Map[String, String],
                            path: Path): Boolean = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 9d4014021e5..eb69f2b637c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -281,7 +281,7 @@ object HoodieAnalysis extends SparkAdapterSupport {
             ut.copy(table = relation)
 
           case logicalPlan: LogicalPlan if logicalPlan.resolved =>
-            
sparkAdapter.getCatalystPlanUtils.applyNewHoodieParquetFileFormatProjection(logicalPlan)
+            
sparkAdapter.getCatalystPlanUtils.maybeApplyForNewFileFormat(logicalPlan)
         }
       }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
index b0454f7f2aa..a7ef56d1390 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
@@ -31,6 +31,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
+import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -59,6 +60,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TestGlobalIndexEnableUpdatePartitions extends 
SparkClientFunctionalTestHarness {
 
+  @Override
+  public SparkConf conf() {
+    return conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
+  }
+
   private static Stream<Arguments> getTableTypeAndIndexType() {
     return Stream.of(
         Arguments.of(COPY_ON_WRITE, GLOBAL_SIMPLE),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
index 75a7026ad3c..efcc935e018 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
@@ -127,12 +127,12 @@ class TestIncrementalReadWithFullTableScan extends 
HoodieSparkClientTestBase {
     runIncrementalQueryAndCompare(startArchivedCommitTs, endArchivedCommitTs, 
1, true)
 
     // Test start commit is archived, end commit is not archived
-    shouldThrowIfFallbackIsFalse(tableType,
+    shouldThrowIfFallbackIsFalse(
       () => runIncrementalQueryAndCompare(startArchivedCommitTs, 
endUnarchivedCommitTs, nArchivedInstants + 1, false))
     runIncrementalQueryAndCompare(startArchivedCommitTs, 
endUnarchivedCommitTs, nArchivedInstants + 1, true)
 
     // Test both start commit and end commits are not archived but got cleaned
-    shouldThrowIfFallbackIsFalse(tableType,
+    shouldThrowIfFallbackIsFalse(
       () => runIncrementalQueryAndCompare(startUnarchivedCommitTs, 
endUnarchivedCommitTs, 1, false))
     runIncrementalQueryAndCompare(startUnarchivedCommitTs, 
endUnarchivedCommitTs, 1, true)
 
@@ -169,22 +169,13 @@ class TestIncrementalReadWithFullTableScan extends 
HoodieSparkClientTestBase {
     assertEquals(perBatchSize * batchNum, hoodieIncViewDF.count())
   }
 
-  private def shouldThrowIfFallbackIsFalse(tableType: HoodieTableType, fn: () 
=> Unit): Unit = {
+  private def shouldThrowIfFallbackIsFalse(fn: () => Unit): Unit = {
     val msg = "Should fail with Path does not exist"
-    tableType match {
-      case HoodieTableType.COPY_ON_WRITE =>
-        assertThrows(classOf[HoodieIncrementalPathNotFoundException], new 
Executable {
-          override def execute(): Unit = {
-            fn()
-          }
-        }, msg)
-      case HoodieTableType.MERGE_ON_READ =>
-        val exp = assertThrows(classOf[SparkException], new Executable {
-          override def execute(): Unit = {
-            fn()
-          }
-        }, msg)
-        assertTrue(exp.getMessage.contains("FileNotFoundException"))
-    }
+    val exp = assertThrows(classOf[SparkException], new Executable {
+      override def execute(): Unit = {
+        fn()
+      }
+    }, msg)
+    assertTrue(exp.getMessage.contains("FileNotFoundException"))
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 3f64e24dfc9..682a118ebd8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -27,7 +27,9 @@ import 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
+import org.apache.spark.SparkConf
 import org.apache.spark.sql._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
@@ -37,6 +39,8 @@ import scala.collection.JavaConversions._
 
 class TestSparkDataSource extends SparkClientFunctionalTestHarness {
 
+  override def conf: SparkConf = conf(getSparkSqlConf)
+
   val parallelism: Integer = 4
 
   val commonOpts: Map[String, String] = Map(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index b2c73332567..ac0afd67132 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -50,14 +50,15 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts long,
+             |  partition long
              |) using hudi
              | options (
              |  primaryKey ='id',
              |  type = '$tableType',
              |  preCombineField = 'ts'
              | )
-             | partitioned by(ts)
+             | partitioned by(partition)
              | location '$basePath'
        """.stripMargin)
         // disable automatic inline compaction so that 
HoodieDataSourceHelpers.allCompletedCommitsCompactions
@@ -65,16 +66,16 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
         spark.sql("set hoodie.compact.inline=false")
         spark.sql("set hoodie.compact.schedule.inline=false")
 
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
-        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1002)")
         val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, 
Map.empty, Option(tableName))
         // Generate the first clustering plan
         val firstScheduleInstant = client.createNewInstantTime()
         client.scheduleClusteringAtInstant(firstScheduleInstant, 
HOption.empty())
 
         // Generate the second clustering plan
-        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003, 1003)")
         val secondScheduleInstant = client.createNewInstantTime()
         client.scheduleClusteringAtInstant(secondScheduleInstant, 
HOption.empty())
         checkAnswer(s"call show_clustering('$tableName')")(
@@ -85,9 +86,9 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
         // Do clustering for all clustering plan generated above, and no new 
clustering
         // instant will be generated because of there is no commit after the 
second
         // clustering plan generated
-        checkAnswer(s"call run_clustering(table => '$tableName', order => 
'ts', show_involved_partition => true)")(
-          Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), 
"ts=1003"),
-          Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), 
"ts=1000,ts=1001,ts=1002")
+        checkAnswer(s"call run_clustering(table => '$tableName', order => 
'partition', show_involved_partition => true)")(
+          Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), 
"partition=1003"),
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), 
"partition=1000,partition=1001,partition=1002")
         )
 
         // No new commits
@@ -102,11 +103,11 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
           .toSeq
         assertResult(2)(finishedClustering.size)
 
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1000),
-          Seq(2, "a2", 10.0, 1001),
-          Seq(3, "a3", 10.0, 1002),
-          Seq(4, "a4", 10.0, 1003)
+        checkAnswer(s"select id, name, price, ts, partition from $tableName 
order by id")(
+          Seq(1, "a1", 10.0, 1000, 1000),
+          Seq(2, "a2", 10.0, 1001, 1001),
+          Seq(3, "a3", 10.0, 1002, 1002),
+          Seq(4, "a4", 10.0, 1003, 1003)
         )
 
         // After clustering there should be no pending clustering and all 
clustering instants should be completed
@@ -116,9 +117,9 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
         )
 
         // Do clustering without manual schedule(which will do the schedule if 
no pending clustering exists)
-        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
-        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
-        spark.sql(s"call run_clustering(table => '$tableName', order => 'ts', 
show_involved_partition => true)").show()
+        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004, 1004)")
+        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005, 1005)")
+        spark.sql(s"call run_clustering(table => '$tableName', order => 
'partition', show_involved_partition => true)").show()
 
         val thirdClusteringInstant = 
HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
           .findInstantsAfter(secondScheduleInstant)
@@ -129,13 +130,13 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
         // Should have a new replace commit after the second clustering 
command.
         assertResult(1)(thirdClusteringInstant.size)
 
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1000),
-          Seq(2, "a2", 10.0, 1001),
-          Seq(3, "a3", 10.0, 1002),
-          Seq(4, "a4", 10.0, 1003),
-          Seq(5, "a5", 10.0, 1004),
-          Seq(6, "a6", 10.0, 1005)
+        checkAnswer(s"select id, name, price, ts, partition from $tableName 
order by id")(
+          Seq(1, "a1", 10.0, 1000, 1000),
+          Seq(2, "a2", 10.0, 1001, 1001),
+          Seq(3, "a3", 10.0, 1002, 1002),
+          Seq(4, "a4", 10.0, 1003, 1003),
+          Seq(5, "a5", 10.0, 1004, 1004),
+          Seq(6, "a6", 10.0, 1005, 1005)
         )
       }
     }
@@ -152,39 +153,40 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts long,
+             |  partition long
              |) using hudi
              | options (
              |  primaryKey ='id',
              |  type = '$tableType',
              |  preCombineField = 'ts'
              | )
-             | partitioned by(ts)
+             | partitioned by(partition)
              | location '$basePath'
        """.stripMargin)
 
         spark.sql(s"call run_clustering(path => '$basePath')").show()
         checkAnswer(s"call show_clustering(path => '$basePath')")()
 
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
-        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1002)")
         val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, 
Map.empty, Option(tableName))
         // Generate the first clustering plan
         val firstScheduleInstant = client.createNewInstantTime()
         client.scheduleClusteringAtInstant(firstScheduleInstant, 
HOption.empty())
         checkAnswer(s"call show_clustering(path => '$basePath', 
show_involved_partition => true)")(
-          Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), 
"ts=1000,ts=1001,ts=1002")
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), 
"partition=1000,partition=1001,partition=1002")
         )
         // Do clustering for all the clustering plan
-        checkAnswer(s"call run_clustering(path => '$basePath', order => 
'ts')")(
+        checkAnswer(s"call run_clustering(path => '$basePath', order => 
'partition')")(
           Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), 
"*")
         )
 
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1000),
-          Seq(2, "a2", 10.0, 1001),
-          Seq(3, "a3", 10.0, 1002)
+        checkAnswer(s"select id, name, price, ts, partition from $tableName 
order by id")(
+          Seq(1, "a1", 10.0, 1000, 1000),
+          Seq(2, "a2", 10.0, 1001, 1001),
+          Seq(3, "a3", 10.0, 1002, 1002)
         )
 
         val fs = new 
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
@@ -199,20 +201,20 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
         assertResult(1)(finishedClustering.size)
 
         // Do clustering without manual schedule(which will do the schedule if 
no pending clustering exists)
-        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
-        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
-        val resultA = spark.sql(s"call run_clustering(table => '$tableName', 
predicate => 'ts >= 1003L', show_involved_partition => true)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003, 1003)")
+        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004, 1004)")
+        val resultA = spark.sql(s"call run_clustering(table => '$tableName', 
predicate => 'partition >= 1003L', show_involved_partition => true)")
           .collect()
           .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), 
row.getString(3)))
         assertResult(1)(resultA.length)
-        assertResult("ts=1003,ts=1004")(resultA(0)(3))
-
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1000),
-          Seq(2, "a2", 10.0, 1001),
-          Seq(3, "a3", 10.0, 1002),
-          Seq(4, "a4", 10.0, 1003),
-          Seq(5, "a5", 10.0, 1004)
+        assertResult("partition=1003,partition=1004")(resultA(0)(3))
+
+        checkAnswer(s"select id, name, price, ts, partition from $tableName 
order by id")(
+          Seq(1, "a1", 10.0, 1000, 1000),
+          Seq(2, "a2", 10.0, 1001, 1001),
+          Seq(3, "a3", 10.0, 1002, 1002),
+          Seq(4, "a4", 10.0, 1003, 1003),
+          Seq(5, "a5", 10.0, 1004, 1004)
         )
 
         finishedClustering = 
HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -236,14 +238,15 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts long,
+             |  partition long
              |) using hudi
              | options (
              |  primaryKey ='id',
              |  type = '$tableType',
              |  preCombineField = 'ts'
              | )
-             | partitioned by(ts)
+             | partitioned by(partition)
              | location '$basePath'
        """.stripMargin)
 
@@ -253,20 +256,20 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
         var resultA: Array[Seq[Any]] = Array.empty
 
         {
-          spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-          spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
-          spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+          spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+          spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1001)")
+          spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1002)")
 
           checkException(
-            s"call run_clustering(table => '$tableName', predicate => 'ts <= 
1001L and id = 10', order => 'ts')"
+            s"call run_clustering(table => '$tableName', predicate => 
'partition <= 1001L and id = 10', order => 'partition')"
           )("Only partition predicates are allowed")
 
           // Do clustering table with partition predicate
-          resultA = spark.sql(s"call run_clustering(table => '$tableName', 
predicate => 'ts <= 1001L', order => 'ts', show_involved_partition => true)")
+          resultA = spark.sql(s"call run_clustering(table => '$tableName', 
predicate => 'partition <= 1001L', order => 'partition', 
show_involved_partition => true)")
             .collect()
             .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), 
row.getString(3)))
           assertResult(1)(resultA.length)
-          assertResult("ts=1000,ts=1001")(resultA(0)(3))
+          assertResult("partition=1000,partition=1001")(resultA(0)(3))
 
           // There is 1 completed clustering instant
           val clusteringInstants = 
HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -285,13 +288,13 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
 
           // All clustering instants are completed
           checkAnswer(s"call show_clustering(table => '$tableName', 
show_involved_partition => true)")(
-            Seq(resultA(0).head, resultA(0)(1), 
HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001")
+            Seq(resultA(0).head, resultA(0)(1), 
HoodieInstant.State.COMPLETED.name(), "partition=1000,partition=1001")
           )
 
-          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(1, "a1", 10.0, 1000),
-            Seq(2, "a2", 10.0, 1001),
-            Seq(3, "a3", 10.0, 1002)
+          checkAnswer(s"select id, name, price, ts, partition from $tableName 
order by id")(
+            Seq(1, "a1", 10.0, 1000, 1000),
+            Seq(2, "a2", 10.0, 1001, 1001),
+            Seq(3, "a3", 10.0, 1002, 1002)
           )
         }
 
@@ -299,20 +302,20 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
         var resultB: Array[Seq[Any]] = Array.empty
 
         {
-          spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
-          spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
-          spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)")
+          spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003, 1003)")
+          spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004, 1004)")
+          spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005, 1005)")
 
           checkException(
-            s"call run_clustering(table => '$tableName', predicate => 'ts > 
1001L and ts <= 1005L and id = 10', order => 'ts')"
+            s"call run_clustering(table => '$tableName', predicate => 
'partition > 1001L and partition <= 1005L and id = 10', order => 'partition')"
           )("Only partition predicates are allowed")
 
           // Do clustering table with partition predicate
-          resultB = spark.sql(s"call run_clustering(table => '$tableName', 
predicate => 'ts > 1001L and ts <= 1005L', order => 'ts', 
show_involved_partition => true)")
+          resultB = spark.sql(s"call run_clustering(table => '$tableName', 
predicate => 'partition > 1001L and partition <= 1005L', order => 'partition', 
show_involved_partition => true)")
             .collect()
             .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), 
row.getString(3)))
           assertResult(1)(resultB.length)
-          assertResult("ts=1002,ts=1003,ts=1004,ts=1005")(resultB(0)(3))
+          
assertResult("partition=1002,partition=1003,partition=1004,partition=1005")(resultB(0)(3))
 
           // There are 2 completed clustering instants
           val clusteringInstants = 
HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -330,17 +333,17 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
 
           // All clustering instants are completed
           checkAnswer(s"call show_clustering(table => '$tableName', 
show_involved_partition => true)")(
-            Seq(resultA(0).head, resultA(0)(1), 
HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"),
-            Seq(resultB(0).head, resultB(0)(1), 
HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005")
+            Seq(resultA(0).head, resultA(0)(1), 
HoodieInstant.State.COMPLETED.name(), "partition=1000,partition=1001"),
+            Seq(resultB(0).head, resultB(0)(1), 
HoodieInstant.State.COMPLETED.name(), 
"partition=1002,partition=1003,partition=1004,partition=1005")
           )
 
-          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(1, "a1", 10.0, 1000),
-            Seq(2, "a2", 10.0, 1001),
-            Seq(3, "a3", 10.0, 1002),
-            Seq(4, "a4", 10.0, 1003),
-            Seq(5, "a5", 10.0, 1004),
-            Seq(6, "a6", 10.0, 1005)
+          checkAnswer(s"select id, name, price, ts, partition from $tableName 
order by id")(
+            Seq(1, "a1", 10.0, 1000, 1000),
+            Seq(2, "a2", 10.0, 1001, 1001),
+            Seq(3, "a3", 10.0, 1002, 1002),
+            Seq(4, "a4", 10.0, 1003, 1003),
+            Seq(5, "a5", 10.0, 1004, 1004),
+            Seq(6, "a6", 10.0, 1005, 1005)
           )
         }
 
@@ -348,21 +351,21 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
         var resultC: Array[Seq[Any]] = Array.empty
 
         {
-          spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006)")
-          spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007)")
-          spark.sql(s"insert into $tableName values(9, 'a9', 10, 1008)")
-          spark.sql(s"insert into $tableName values(10, 'a10', 10, 1009)")
+          spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006, 1006)")
+          spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007, 1007)")
+          spark.sql(s"insert into $tableName values(9, 'a9', 10, 1008, 1008)")
+          spark.sql(s"insert into $tableName values(10, 'a10', 10, 1009, 
1009)")
 
           checkException(
-            s"call run_clustering(table => '$tableName', predicate => 'ts < 
1007L or ts >= 1008L or id = 10', order => 'ts')"
+            s"call run_clustering(table => '$tableName', predicate => 
'partition < 1007L or partition >= 1008L or id = 10', order => 'partition')"
           )("Only partition predicates are allowed")
 
           // Do clustering table with partition predicate
-          resultC = spark.sql(s"call run_clustering(table => '$tableName', 
predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts', 
show_involved_partition => true)")
+          resultC = spark.sql(s"call run_clustering(table => '$tableName', 
predicate => '(partition >= 1006L and partition < 1008L) or partition >= 
1009L', order => 'partition', show_involved_partition => true)")
             .collect()
             .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), 
row.getString(3)))
           assertResult(1)(resultC.length)
-          assertResult("ts=1006,ts=1007,ts=1009")(resultC(0)(3))
+          
assertResult("partition=1006,partition=1007,partition=1009")(resultC(0)(3))
 
           // There are 3 completed clustering instants
           val clusteringInstants = 
HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath)
@@ -380,28 +383,28 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
 
           // All clustering instants are completed
           checkAnswer(s"call show_clustering(table => '$tableName', 
show_involved_partition => true)")(
-            Seq(resultA(0).head, resultA(0)(1), 
HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"),
-            Seq(resultB(0).head, resultB(0)(1), 
HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005"),
-            Seq(resultC(0).head, resultC(0)(1), 
HoodieInstant.State.COMPLETED.name(), "ts=1006,ts=1007,ts=1009")
+            Seq(resultA(0).head, resultA(0)(1), 
HoodieInstant.State.COMPLETED.name(), "partition=1000,partition=1001"),
+            Seq(resultB(0).head, resultB(0)(1), 
HoodieInstant.State.COMPLETED.name(), 
"partition=1002,partition=1003,partition=1004,partition=1005"),
+            Seq(resultC(0).head, resultC(0)(1), 
HoodieInstant.State.COMPLETED.name(), 
"partition=1006,partition=1007,partition=1009")
           )
 
-          checkAnswer(s"select id, name, price, ts from $tableName order by 
id")(
-            Seq(1, "a1", 10.0, 1000),
-            Seq(2, "a2", 10.0, 1001),
-            Seq(3, "a3", 10.0, 1002),
-            Seq(4, "a4", 10.0, 1003),
-            Seq(5, "a5", 10.0, 1004),
-            Seq(6, "a6", 10.0, 1005),
-            Seq(7, "a7", 10.0, 1006),
-            Seq(8, "a8", 10.0, 1007),
-            Seq(9, "a9", 10.0, 1008),
-            Seq(10, "a10", 10.0, 1009)
+          checkAnswer(s"select id, name, price, ts, partition from $tableName 
order by id")(
+            Seq(1, "a1", 10.0, 1000, 1000),
+            Seq(2, "a2", 10.0, 1001, 1001),
+            Seq(3, "a3", 10.0, 1002, 1002),
+            Seq(4, "a4", 10.0, 1003, 1003),
+            Seq(5, "a5", 10.0, 1004, 1004),
+            Seq(6, "a6", 10.0, 1005, 1005),
+            Seq(7, "a7", 10.0, 1006, 1006),
+            Seq(8, "a8", 10.0, 1007, 1007),
+            Seq(9, "a9", 10.0, 1008, 1008),
+            Seq(10, "a10", 10.0, 1009, 1009)
           )
         }
 
         // Test partition pruning with invalid predicates
         {
-          val resultD = spark.sql(s"call run_clustering(table => '$tableName', 
predicate => 'ts > 1111L', order => 'ts', show_involved_partition => true)")
+          val resultD = spark.sql(s"call run_clustering(table => '$tableName', 
predicate => 'partition > 1111L', order => 'partition', show_involved_partition 
=> true)")
             .collect()
             .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), 
row.getString(3)))
           assertResult(0)(resultD.length)
@@ -460,7 +463,7 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
 
       // test with operator schedule
       checkExceptionContain(
-      s"call run_clustering(table => '$tableName', instants => '000000', op => 
'schedule')"
+        s"call run_clustering(table => '$tableName', instants => '000000', op 
=> 'schedule')"
       )("specific instants only can be used in 'execute' op or not specific 
op")
 
       // test with operator scheduleAndExecute
@@ -628,56 +631,57 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts long,
+           |  partition long
            |) using hudi
            | options (
            |  primaryKey ='id',
            |  type = 'cow',
            |  preCombineField = 'ts'
            | )
-           | partitioned by(ts)
+           | partitioned by(partition)
            | location '$basePath'
      """.stripMargin)
 
       // Test clustering with PARTITION_SELECTED config set, choose only a 
part of all partitions to schedule
       {
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
-        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
-        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010, 1010)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010, 1010)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011, 1011)")
         // Do
         val result = spark.sql(s"call run_clustering(table => '$tableName', " +
-          s"selected_partitions => 'ts=1010', show_involved_partition => 
true)")
+          s"selected_partitions => 'partition=1010', show_involved_partition 
=> true)")
           .collect()
           .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), 
row.getString(3)))
         assertResult(1)(result.length)
-        assertResult("ts=1010")(result(0)(3))
+        assertResult("partition=1010")(result(0)(3))
 
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1010),
-          Seq(2, "a2", 10.0, 1010),
-          Seq(3, "a3", 10.0, 1011)
+        checkAnswer(s"select id, name, price, ts, partition from $tableName 
order by id")(
+          Seq(1, "a1", 10.0, 1010, 1010),
+          Seq(2, "a2", 10.0, 1010, 1010),
+          Seq(3, "a3", 10.0, 1011, 1011)
         )
       }
 
       // Test clustering with PARTITION_SELECTED, choose all partitions to 
schedule
       {
-        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
-        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
-        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010, 1010)")
+        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011, 1011)")
+        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012, 1012)")
         val result = spark.sql(s"call run_clustering(table => '$tableName', " +
-          s"selected_partitions => 'ts=1010,ts=1011,ts=1012', 
show_involved_partition => true)")
+          s"selected_partitions => 
'partition=1010,partition=1011,partition=1012', show_involved_partition => 
true)")
           .collect()
           .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), 
row.getString(3)))
         assertResult(1)(result.length)
-        assertResult("ts=1010,ts=1011,ts=1012")(result(0)(3))
-
-        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
-          Seq(1, "a1", 10.0, 1010),
-          Seq(2, "a2", 10.0, 1010),
-          Seq(3, "a3", 10.0, 1011),
-          Seq(4, "a4", 10.0, 1010),
-          Seq(5, "a5", 10.0, 1011),
-          Seq(6, "a6", 10.0, 1012)
+        
assertResult("partition=1010,partition=1011,partition=1012")(result(0)(3))
+
+        checkAnswer(s"select id, name, price, ts, partition from $tableName 
order by id")(
+          Seq(1, "a1", 10.0, 1010, 1010),
+          Seq(2, "a2", 10.0, 1010, 1010),
+          Seq(3, "a3", 10.0, 1011, 1011),
+          Seq(4, "a4", 10.0, 1010, 1010),
+          Seq(5, "a5", 10.0, 1011, 1011),
+          Seq(6, "a6", 10.0, 1012, 1012)
         )
       }
     }
@@ -693,7 +697,8 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts long,
+           |  partition long
            |) using hudi
            | tblproperties (
            |  primaryKey ='id',
@@ -702,12 +707,12 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
            |  hoodie.index.type = 'BUCKET',
            |  hoodie.bucket.index.hash.field = 'id'
            | )
-           | partitioned by (ts)
+           | partitioned by (partition)
            | location '$basePath'
      """.stripMargin)
 
-      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
-      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010, 1010)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010, 1010)")
 
       checkExceptionContain(s"call run_clustering(table => '$tableName')")(
         "Executor SparkExecuteClusteringCommitActionExecutor is not compatible 
with table layout HoodieSimpleBucketLayout")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
index 10ee9501422..c9a8c8ec847 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
@@ -31,9 +31,10 @@ class TestHoodieLogFileProcedure extends 
HoodieSparkProcedureTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts long,
+           |  partition int
            |) using hudi
-           | partitioned by (ts)
+           | partitioned by (partition)
            | location '$tablePath'
            | tblproperties (
            |  type = 'mor',
@@ -42,8 +43,8 @@ class TestHoodieLogFileProcedure extends 
HoodieSparkProcedureTestBase {
            | )
        """.stripMargin)
       // insert data to table
-      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
-      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500, 1500")
       spark.sql(s"update $tableName set name = 'b1', price = 100 where id = 1")
 
       // Check required fields
@@ -52,7 +53,7 @@ class TestHoodieLogFileProcedure extends 
HoodieSparkProcedureTestBase {
 
       // collect result for table
       val result = spark.sql(
-        s"""call show_logfile_metadata(table => '$tableName', 
log_file_path_pattern => '$tablePath/ts=1000/*.log.*')""".stripMargin).collect()
+        s"""call show_logfile_metadata(table => '$tableName', 
log_file_path_pattern => 
'$tablePath/partition=1000/*.log.*')""".stripMargin).collect()
       assertResult(1) {
         result.length
       }
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 7458d16641c..9f3a5ce03a8 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -98,22 +98,21 @@ object HoodieSpark2CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
     Join(left, right, joinType, condition)
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+  override def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean = {
+    val thisOutput = a.output
+    val otherOutput = b.output
+    thisOutput.length == otherOutput.length && 
thisOutput.zip(otherOutput).forall {
+      case (a1, a2) => a1.semanticEquals(a2)
+    }
+  }
+
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case physicalOperation@PhysicalOperation(_, _,
-      logicalRelation@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait] && 
!fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
-        val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait]
-        ff.isProjected = true
-        val tableSchema = fs.location match {
-          case index: HoodieCDCFileIndex => index.cdcRelation.schema
-          case index: SparkHoodieTableFileIndex => index.schema
-        }
-        val resolvedSchema = logicalRelation.resolve(tableSchema, 
fs.sparkSession.sessionState.analyzer.resolver)
-        if (!fs.partitionSchema.fields.isEmpty) {
-          Project(resolvedSchema, physicalOperation)
-        } else {
-          physicalOperation
-        }
+      logicalRelation@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+        if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+          && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
+        
FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(physicalOperation, 
logicalRelation, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
index f5757e16b41..399e05334e0 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
@@ -17,17 +17,15 @@
 
 package org.apache.spark.sql
 
-import org.apache.hudi.{HoodieCDCFileIndex, SparkAdapterSupport, 
SparkHoodieTableFileIndex}
+import org.apache.hudi.SparkAdapterSupport
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableOutputResolver
 import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, 
JoinHint, LeafNode, LogicalPlan}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.command.{CreateTableLikeCommand, 
ExplainCommand}
-import org.apache.spark.sql.execution.datasources.HadoopFsRelation
-import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -82,25 +80,14 @@ trait HoodieSpark3CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
   override def createMITJoin(left: LogicalPlan, right: LogicalPlan, joinType: 
JoinType, condition: Option[Expression], hint: String): LogicalPlan = {
     Join(left, right, joinType, condition, JoinHint.NONE)
   }
+
+  override def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean = {
+    a.sameOutput(b)
+  }
 }
 
 object HoodieSpark3CatalystPlanUtils extends SparkAdapterSupport {
 
-  def applyNewFileFormatChanges(scanOperation: LogicalPlan, logicalRelation: 
LogicalPlan, fs: HadoopFsRelation): LogicalPlan = {
-    val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait]
-    ff.isProjected = true
-    val tableSchema = fs.location match {
-      case index: HoodieCDCFileIndex => index.cdcRelation.schema
-      case index: SparkHoodieTableFileIndex => index.schema
-    }
-    val resolvedSchema = logicalRelation.resolve(tableSchema, 
fs.sparkSession.sessionState.analyzer.resolver)
-    if (!fs.partitionSchema.fields.isEmpty && 
scanOperation.sameOutput(logicalRelation)) {
-      Project(resolvedSchema, scanOperation)
-    } else {
-      scanOperation
-    }
-  }
-
   /**
    * This is an extractor to accommodate for [[ResolvedTable]] signature 
change in Spark 3.2
    */
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
index 34a1e084227..dbe68bb33e5 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
@@ -45,13 +45,13 @@ object HoodieSpark30CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index dd3ac530017..765a9b06de5 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -46,13 +46,13 @@ object HoodieSpark31CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index 7a30aebd856..8562ca14d3e 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -48,13 +48,13 @@ object HoodieSpark32CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index 9bcc40a29e6..54dbaa0db7b 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -46,13 +46,13 @@ object HoodieSpark33CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
index e49adebc14b..2c50d21cbe5 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
@@ -45,13 +45,13 @@ object HoodieSpark34CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
index eac3153b3f9..b95ee94e482 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
@@ -45,13 +45,13 @@ object HoodieSpark35CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
     }
   }
 
-  override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
+  override def maybeApplyForNewFileFormat(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _, _,
       l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
         if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
           && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
-        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
+        FileFormatUtilsForFileGroupReader.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 03208a0c0e5..b39c5f260df 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -60,7 +60,6 @@ import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieIncrementalPathNotFoundException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncClient;
@@ -105,6 +104,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.AnalysisException;
@@ -2390,7 +2390,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
     //No change as this fails with Path not exist error
-    assertThrows(HoodieIncrementalPathNotFoundException.class, () -> new 
HoodieDeltaStreamer(downstreamCfg, jsc).sync());
+    assertThrows(SparkException.class, () -> new 
HoodieDeltaStreamer(downstreamCfg, jsc).sync());
     assertRecordCount(1000, downstreamTableBasePath, sqlContext);
 
     if (downstreamCfg.configs == null) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index 1b534c22c7e..b9e20fb3a19 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -46,6 +46,7 @@ import 
org.apache.hudi.utilities.sources.helpers.TestSnapshotQuerySplitterImpl;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -72,6 +73,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
 
+  @Override
+  public SparkConf conf() {
+    return conf(SparkClientFunctionalTestHarness.getSparkSqlConf());
+  }
+
   private HoodieTestDataGenerator dataGen;
   private HoodieTableMetaClient metaClient;
   private HoodieTableType tableType = COPY_ON_WRITE;

Reply via email to