This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 78a84ea7521 [HUDI-7137] Implement Bootstrap for new FG reader (#10137)
78a84ea7521 is described below

commit 78a84ea7521e6295732199b68aca775cd5d79f63
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Dec 7 18:00:07 2023 -0500

    [HUDI-7137] Implement Bootstrap for new FG reader (#10137)
---
 .../hudi/common/table/log/CachingIterator.java     |  41 ----
 .../hudi/common/table/log/LogFileIterator.java     |   6 +
 .../hudi/common/model/HoodieSparkRecord.java       |   3 +-
 .../hudi/BaseSparkInternalRowReaderContext.java    |  11 +-
 .../SparkFileFormatInternalRowReaderContext.scala  |  76 +++++-
 .../apache/spark/sql/HoodieInternalRowUtils.scala  |   7 +
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java |  78 +++++-
 .../hudi/common/engine/HoodieReaderContext.java    |  21 ++
 .../apache/hudi/common/model/HoodieLogFile.java    |   5 +
 .../hudi/common/model/HoodieRecordMerger.java      |  23 ++
 .../table/log/BaseHoodieLogRecordReader.java       |   2 +-
 .../table/log/HoodieMergedLogRecordReader.java     |   9 +-
 .../common/table/read/HoodieFileGroupReader.java   | 263 ++++++++++++++-------
 .../common/util/collection/CachingIterator.java    |  78 ++++++
 .../org/apache/hudi/avro/TestAvroSchemaUtils.java  |  72 ++++++
 .../table/read/TestHoodieFileGroupReaderBase.java  |  13 +-
 .../hudi/HoodieHadoopFsRelationFactory.scala       |   2 +-
 ...odieFileGroupReaderBasedParquetFileFormat.scala | 224 ++++++++----------
 .../parquet/NewHoodieParquetFileFormat.scala       |   7 +-
 .../hudi/TestHoodieMergeHandleWithSparkMerger.java |   7 +-
 ...stHoodiePositionBasedFileGroupRecordBuffer.java |   2 +-
 .../functional/TestNewHoodieParquetFileFormat.java |  17 +-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |  32 +--
 .../hudi/TestQueryMergeOnReadOptimizedTable.scala  |  15 +-
 .../procedure/TestHoodieLogFileProcedure.scala     |   9 +-
 .../spark/sql/HoodieSpark2CatalystPlanUtils.scala  |  22 +-
 .../spark/sql/HoodieSpark3CatalystPlanUtils.scala  |  21 +-
 .../spark/sql/HoodieSpark30CatalystPlanUtils.scala |  12 +-
 .../spark/sql/HoodieSpark31CatalystPlanUtils.scala |   9 +-
 .../spark/sql/HoodieSpark32CatalystPlanUtils.scala |  12 +-
 .../spark/sql/HoodieSpark33CatalystPlanUtils.scala |  12 +-
 .../spark/sql/HoodieSpark34CatalystPlanUtils.scala |  12 +-
 .../spark/sql/HoodieSpark35CatalystPlanUtils.scala |  12 +-
 33 files changed, 765 insertions(+), 370 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java
deleted file mode 100644
index d022b92ae22..00000000000
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.table.log;
-
-import java.util.Iterator;
-
-public abstract class CachingIterator<T> implements Iterator<T> {
-
-  protected T nextRecord;
-
-  protected abstract boolean doHasNext();
-
-  @Override
-  public final boolean hasNext() {
-    return nextRecord != null || doHasNext();
-  }
-
-  @Override
-  public final T next() {
-    T record = nextRecord;
-    nextRecord = null;
-    return record;
-  }
-
-}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java
index bf55a6ba06e..7331f7d3a9b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.CachingIterator;
 
 import java.util.Iterator;
 import java.util.Map;
@@ -54,4 +55,9 @@ public class LogFileIterator<T> extends 
CachingIterator<HoodieRecord<T>> {
   protected boolean doHasNext() {
     return hasNextInternal();
   }
+
+  @Override
+  public void close() {
+    scanner.close();
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 5cb8800411c..334a1e9ddc3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -449,7 +449,8 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
         data instanceof HoodieInternalRow
             || data instanceof GenericInternalRow
             || data instanceof SpecificInternalRow
-            || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+            || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data))
+            || data instanceof JoinedRow;
 
     ValidationUtils.checkState(isValid);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 5360535620f..a4d14d1eb4a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -37,12 +37,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.HoodieUnsafeRowUtils;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.types.StructType;
 
 import java.util.Map;
+import java.util.function.UnaryOperator;
 
 import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static org.apache.spark.sql.HoodieInternalRowUtils.getCachedSchema;
 
 /**
  * An abstract class implementing {@link HoodieReaderContext} to handle {@link 
InternalRow}s.
@@ -113,7 +116,7 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
   }
 
   private Object getFieldValueFromInternalRow(InternalRow row, Schema 
recordSchema, String fieldName) {
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType structType = getCachedSchema(recordSchema);
     scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
         HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
     if (cachedNestedFieldPath.isDefined()) {
@@ -123,4 +126,10 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
       return null;
     }
   }
+
+  @Override
+  public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to) {
+    UnsafeProjection projection = 
HoodieInternalRowUtils.generateUnsafeProjectionAlias(getCachedSchema(from), 
getCachedSchema(to));
+    return projection::apply;
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index beca8852686..963035caf21 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -25,16 +25,18 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.collection.{ClosableIterator, 
CloseableMappingIterator}
 import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, 
HoodieSparkParquetReader}
 import org.apache.hudi.util.CloseableInternalRowIterator
-import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.avro.HoodieAvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection, 
UnsafeRow}
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+import org.apache.spark.sql.HoodieInternalRowUtils
 
 import scala.collection.mutable
 
@@ -44,13 +46,11 @@ import scala.collection.mutable
  *
  * This uses Spark parquet reader to read parquet data files or parquet log 
blocks.
  *
- * @param baseFileReader  A reader that transforms a {@link PartitionedFile} 
to an iterator of
- *                        {@link InternalRow}. This is required for reading 
the base file and
- *                        not required for reading a file group with only log 
files.
- * @param partitionValues The values for a partition in which the file group 
lives.
+ * @param readermaps our intention is to build the reader inside of 
getFileRecordIterator, but since it is called from
+ *                   the executor, we will need to port a bunch of the code 
from ParquetFileFormat for each spark version
+ *                   for now, we pass in a map of the different readers we 
expect to create
  */
-class SparkFileFormatInternalRowReaderContext(baseFileReader: 
Option[PartitionedFile => Iterator[InternalRow]],
-                                              partitionValues: InternalRow) 
extends BaseSparkInternalRowReaderContext {
+class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, 
PartitionedFile => Iterator[InternalRow]]) extends 
BaseSparkInternalRowReaderContext {
   lazy val sparkAdapter = SparkAdapterSupport.sparkAdapter
   lazy val sparkFileReaderFactory = new HoodieSparkFileReaderFactory
   val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = 
mutable.Map()
@@ -61,8 +61,10 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned
                                      dataSchema: Schema,
                                      requiredSchema: Schema,
                                      conf: Configuration): 
ClosableIterator[InternalRow] = {
+    // partition value is empty because the spark parquet reader will append 
the partition columns to
+    // each row if they are given. That is the only usage of the partition 
values in the reader.
     val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
-      .createPartitionedFile(partitionValues, filePath, start, length)
+      .createPartitionedFile(InternalRow.empty, filePath, start, length)
     if (FSUtils.isLogFile(filePath)) {
       val structType: StructType = 
HoodieInternalRowUtils.getCachedSchema(requiredSchema)
       val projection: UnsafeProjection = 
HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType)
@@ -77,14 +79,18 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned
           }
         }).asInstanceOf[ClosableIterator[InternalRow]]
     } else {
-      if (baseFileReader.isEmpty) {
-        throw new IllegalArgumentException("Base file reader is missing when 
instantiating "
-          + "SparkFileFormatInternalRowReaderContext.");
+      val schemaPairHashKey = generateSchemaPairHashKey(dataSchema, 
requiredSchema)
+      if (!readerMaps.contains(schemaPairHashKey)) {
+        throw new IllegalStateException("schemas don't hash to a known reader")
       }
-      new CloseableInternalRowIterator(baseFileReader.get.apply(fileInfo))
+      new 
CloseableInternalRowIterator(readerMaps(schemaPairHashKey).apply(fileInfo))
     }
   }
 
+  private def generateSchemaPairHashKey(dataSchema: Schema, requestedSchema: 
Schema): Long = {
+    dataSchema.hashCode() + requestedSchema.hashCode()
+  }
+
   /**
    * Converts an Avro record, e.g., serialized in the log files, to an 
[[InternalRow]].
    *
@@ -99,4 +105,48 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned
     })
     deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
   }
+
+  override def mergeBootstrapReaders(skeletonFileIterator: 
ClosableIterator[InternalRow],
+                                     dataFileIterator: 
ClosableIterator[InternalRow]): ClosableIterator[InternalRow] = {
+    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
+      dataFileIterator.asInstanceOf[ClosableIterator[Any]])
+  }
+
+  protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], 
dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = {
+    new ClosableIterator[Any] {
+      val combinedRow = new JoinedRow()
+
+      override def hasNext: Boolean = {
+        //If the iterators are out of sync it is probably due to filter 
pushdown
+        checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+          "Bootstrap data-file iterator and skeleton-file iterator have to be 
in-sync!")
+        dataFileIterator.hasNext && skeletonFileIterator.hasNext
+      }
+
+      override def next(): Any = {
+        (skeletonFileIterator.next(), dataFileIterator.next()) match {
+          case (s: ColumnarBatch, d: ColumnarBatch) =>
+            val numCols = s.numCols() + d.numCols()
+            val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
+            for (i <- 0 until numCols) {
+              if (i < s.numCols()) {
+                vecs(i) = s.column(i)
+              } else {
+                vecs(i) = d.column(i - s.numCols())
+              }
+            }
+            assert(s.numRows() == d.numRows())
+            sparkAdapter.makeColumnarBatch(vecs, s.numRows())
+          case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+          case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+          case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+        }
+      }
+
+      override def close(): Unit = {
+        skeletonFileIterator.close()
+        dataFileIterator.close()
+      }
+    }.asInstanceOf[ClosableIterator[InternalRow]]
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index d5831be7d91..afeb9969c5f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -73,6 +73,13 @@ object HoodieInternalRowUtils {
       .getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
   }
 
+  /**
+   * due to scala2.11 and HoodieCatalystExpressionUtils is both an object and 
trait,
+   * we can't directly call generateUnsafeProjection from java code
+   */
+  def generateUnsafeProjectionAlias(from: StructType, to: StructType): 
UnsafeProjection = {
+    generateUnsafeProjection(from, to)
+  }
   /**
    * Provides cached instance of [[UnsafeRowWriter]] transforming provided 
[[InternalRow]]s from
    * one [[StructType]] and into another [[StructType]]
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 3c5486c47c7..a8a72e77717 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -18,12 +18,14 @@
 
 package org.apache.hudi.avro;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaCompatibility;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -218,6 +220,65 @@ public class AvroSchemaUtils {
     return atomicTypeEqualityPredicate.apply(sourceSchema, targetSchema);
   }
 
+  public static Option<Schema.Field> findNestedField(Schema schema, String 
fieldName) {
+    return findNestedField(schema, fieldName.split("\\."), 0);
+  }
+
+  private static Option<Schema.Field> findNestedField(Schema schema, String[] 
fieldParts, int index) {
+    if (schema.getType().equals(Schema.Type.UNION)) {
+      Option<Schema.Field> notUnion = 
findNestedField(resolveNullableSchema(schema), fieldParts, index);
+      if (!notUnion.isPresent()) {
+        return Option.empty();
+      }
+      Schema.Field nu = notUnion.get();
+      return Option.of(new Schema.Field(nu.name(), nu.schema(), nu.doc(), 
nu.defaultVal()));
+    }
+    if (fieldParts.length <= index) {
+      return Option.empty();
+    }
+
+    Schema.Field foundField = schema.getField(fieldParts[index]);
+    if (foundField == null) {
+      return Option.empty();
+    }
+
+    if (index == fieldParts.length - 1) {
+      return Option.of(new Schema.Field(foundField.name(), 
foundField.schema(), foundField.doc(), foundField.defaultVal()));
+    }
+
+    Schema foundSchema = foundField.schema();
+    Option<Schema.Field> nestedPart = findNestedField(foundSchema, fieldParts, 
index + 1);
+    if (!nestedPart.isPresent()) {
+      return Option.empty();
+    }
+    return nestedPart;
+  }
+
+  public static Schema appendFieldsToSchemaDedupNested(Schema schema, 
List<Schema.Field> newFields) {
+    return appendFieldsToSchemaBase(schema, newFields, true);
+  }
+
+  public static Schema mergeSchemas(Schema a, Schema b) {
+    if (!a.getType().equals(Schema.Type.RECORD)) {
+      return a;
+    }
+    List<Schema.Field> fields = new ArrayList<>();
+    for (Schema.Field f : a.getFields()) {
+      Schema.Field foundField = b.getField(f.name());
+      fields.add(new Schema.Field(f.name(), foundField == null ? f.schema() : 
mergeSchemas(f.schema(), foundField.schema()),
+          f.doc(), f.defaultVal()));
+    }
+    for (Schema.Field f : b.getFields()) {
+      if (a.getField(f.name()) == null) {
+        fields.add(new Schema.Field(f.name(), f.schema(), f.doc(), 
f.defaultVal()));
+      }
+    }
+
+    Schema newSchema = Schema.createRecord(a.getName(), a.getDoc(), 
a.getNamespace(), a.isError());
+    newSchema.setFields(fields);
+    return newSchema;
+  }
+
   /**
    * Appends provided new fields at the end of the given schema
    *
@@ -225,10 +286,25 @@ public class AvroSchemaUtils {
    *       of the source schema as is
    */
   public static Schema appendFieldsToSchema(Schema schema, List<Schema.Field> 
newFields) {
+    return appendFieldsToSchemaBase(schema, newFields, false);
+  }
+
+  private static Schema appendFieldsToSchemaBase(Schema schema, 
List<Schema.Field> newFields, boolean dedupNested) {
     List<Schema.Field> fields = schema.getFields().stream()
         .map(field -> new Schema.Field(field.name(), field.schema(), 
field.doc(), field.defaultVal()))
         .collect(Collectors.toList());
-    fields.addAll(newFields);
+    if (dedupNested) {
+      for (Schema.Field f : newFields) {
+        Schema.Field foundField = schema.getField(f.name());
+        if (foundField != null) {
+          fields.set(foundField.pos(), new Schema.Field(foundField.name(), 
mergeSchemas(foundField.schema(), f.schema()), foundField.doc(), 
foundField.defaultVal()));
+        } else {
+          fields.add(f);
+        }
+      }
+    } else {
+      fields.addAll(newFields);
+    }
 
     Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), 
schema.getNamespace(), schema.isError());
     newSchema.setFields(fields);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index b24a4109e75..7b8b2888983 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.UnaryOperator;
 
 /**
  * An abstract reader context class for {@code HoodieFileGroupReader} to use, 
containing APIs for
@@ -188,4 +189,24 @@ public abstract class HoodieReaderContext<T> {
     meta.put(INTERNAL_META_SCHEMA, schema);
     return meta;
   }
+
+  /**
+   * Merge the skeleton file and data file iterators into a single iterator 
that will produce rows that contain all columns from the
+   * skeleton file iterator, followed by all columns in the data file iterator
+   *
+   * @param skeletonFileIterator iterator over bootstrap skeleton files that 
contain hudi metadata columns
+   * @param dataFileIterator iterator over data files that were bootstrapped 
into the hudi table
+   * @return iterator that concatenates the skeletonFileIterator and 
dataFileIterator
+   */
+  public abstract ClosableIterator<T> 
mergeBootstrapReaders(ClosableIterator<T> skeletonFileIterator, 
ClosableIterator<T> dataFileIterator);
+
+  /**
+   * Creates a function that will reorder records of schema "from" to schema 
of "to"
+   * all fields in "to" must be in "from", but not all fields in "from" must 
be in "to"
+   *
+   * @param from the schema of records to be passed into UnaryOperator
+   * @param to the schema of records produced by UnaryOperator
+   * @return a function that takes in a record and returns the record with 
reordered columns
+   */
+  public abstract UnaryOperator<T> projectRecord(Schema from, Schema to);
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
index d35211b7970..c1dbefb57c3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.model;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.exception.InvalidHoodiePathException;
 import org.apache.hudi.hadoop.CachingPath;
 
@@ -147,6 +148,10 @@ public class HoodieLogFile implements Serializable {
     return fileExtension;
   }
 
+  public boolean isCDC() {
+    return getSuffix().equals(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
+  }
+
   public String getSuffix() {
     if (suffix == null) {
       parseFieldsFromPath();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 7a259d0bd9a..cdd41242ef1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -22,13 +22,17 @@ import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.avro.Schema;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
 
 /**
  * HoodieMerge defines how to merge two records. It is a stateless component.
@@ -122,6 +126,25 @@ public interface HoodieRecordMerger extends Serializable {
     return true;
   }
 
+  default String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg) {
+    ArrayList<String> requiredFields = new ArrayList<>();
+
+    if (cfg.populateMetaFields()) {
+      requiredFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    } else {
+      Option<String[]> fields = cfg.getRecordKeyFields();
+      if (fields.isPresent()) {
+        requiredFields.addAll(Arrays.asList(fields.get()));
+      }
+    }
+
+    String preCombine = cfg.getPreCombineField();
+    if (!StringUtils.isNullOrEmpty(preCombine)) {
+      requiredFields.add(preCombine);
+    }
+    return requiredFields.toArray(new String[0]);
+  }
+
   /**
    * The record type handled by the current merger.
    * SPARK, AVRO, FLINK
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 3f2ceefaf58..2c62fcd70cb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -843,7 +843,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
 
     public abstract Builder withBasePath(String basePath);
 
-    public abstract Builder withLogFilePaths(List<String> logFilePaths);
+    public abstract Builder withLogFiles(List<HoodieLogFile> hoodieLogFiles);
 
     public abstract Builder withReaderSchema(Schema schema);
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 6f417be072b..44c4c973eae 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -20,10 +20,10 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -259,9 +259,10 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     }
 
     @Override
-    public Builder<T> withLogFilePaths(List<String> logFilePaths) {
-      this.logFilePaths = logFilePaths.stream()
-          .filter(p -> !p.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+    public Builder<T> withLogFiles(List<HoodieLogFile> hoodieLogFiles) {
+      this.logFilePaths = hoodieLogFiles.stream()
+          .filter(l -> !l.isCDC())
+          .map(l -> l.getPath().toString())
           .collect(Collectors.toList());
       return this;
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 2850a77d709..8413ef8a5e2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -23,19 +23,19 @@ import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.model.HoodieTableQueryType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.CachingIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.EmptyIterator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.Schema;
@@ -45,16 +45,19 @@ import org.apache.hadoop.fs.Path;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.findNestedField;
 import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
-import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
-import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 
 /**
  * A file group reader that iterates through the records in a single file 
group.
@@ -67,8 +70,8 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
  */
 public final class HoodieFileGroupReader<T> implements Closeable {
   private final HoodieReaderContext<T> readerContext;
-  private final Option<HoodieBaseFile> baseFilePath;
-  private final Option<List<String>> logFilePathList;
+  private final Option<HoodieBaseFile> hoodieBaseFileOption;
+  private final List<HoodieLogFile> logFiles;
   private final Configuration hadoopConf;
   private final TypedProperties props;
   // Byte offset to start reading from the base file
@@ -81,80 +84,168 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+
+  // requestedSchema: the schema that the caller requests
+  private final Schema requestedSchema;
+
+  // requiredSchema: the requestedSchema with any additional columns required 
for merging etc
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
-    this.recordMerger = readerContext.getRecordMerger(
-        getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
+    this.recordMerger = 
readerContext.getRecordMerger(tableConfig.getRecordMergerStrategy());
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? null
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
-    scanLogFiles();
-    recordBuffer.setBaseFileIterator(baseFileIterator);
+    ClosableIterator<T> iter = makeBaseFileIterator();
+    if (logFiles.isEmpty()) {
+      this.baseFileIterator = CachingIterator.wrap(iter, readerContext);
+    } else {
+      this.baseFileIterator = iter;
+      scanLogFiles();
+      recordBuffer.setBaseFileIterator(baseFileIterator);
+    }
+  }
+
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    List<Schema.Field> addedFields = new ArrayList<>();
+    for (String field : 
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+      if (requestedSchema.getField(field) == null) {
+        Option<Schema.Field> foundFieldOpt  = findNestedField(dataSchema, 
field);
+        if (!foundFieldOpt.isPresent()) {
+          throw new IllegalArgumentException("Field: " + field + " does not 
exist in the table schema");
+        }
+        Schema.Field foundField = foundFieldOpt.get();
+        addedFields.add(foundField);
+      }
+    }
+
+    if (addedFields.isEmpty()) {
+      return maybeReorderForBootstrap(requestedSchema);
+    }
+
+    return maybeReorderForBootstrap(appendFieldsToSchema(requestedSchema, 
addedFields));
+  }
+
+  private Schema maybeReorderForBootstrap(Schema input) {
+    if (this.hoodieBaseFileOption.isPresent() && 
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+      Pair<List<Schema.Field>, List<Schema.Field>> requiredFields = 
getDataAndMetaCols(input);
+      if (!(requiredFields.getLeft().isEmpty() || 
requiredFields.getRight().isEmpty())) {
+        return 
createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(), 
requiredFields.getRight().stream())
+            .collect(Collectors.toList()));
+      }
+    }
+    return input;
+  }
+
+  private static Pair<List<Schema.Field>, List<Schema.Field>> 
getDataAndMetaCols(Schema schema) {
+    Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
+        .collect(Collectors.partitioningBy(f -> 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
+    return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
+        fieldsByMeta.getOrDefault(false, Collections.emptyList()));
+  }
+
+  private Schema createSchemaFromFields(List<Schema.Field> fields) {
+    //fields have positions set, so we need to remove them due to avro 
setFields implementation
+    for (int i = 0; i < fields.size(); i++) {
+      Schema.Field curr = fields.get(i);
+      fields.set(i, new Schema.Field(curr.name(), curr.schema(), curr.doc(), 
curr.defaultVal()));
+    }
+    Schema newSchema = Schema.createRecord(dataSchema.getName(), 
dataSchema.getDoc(), dataSchema.getNamespace(), dataSchema.isError());
+    newSchema.setFields(fields);
+    return newSchema;
+  }
+
+  private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile 
baseFile) {
+    BaseFile dataFile = baseFile.getBootstrapBaseFile().get();
+    Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
getDataAndMetaCols(requiredSchema);
+    Pair<List<Schema.Field>,List<Schema.Field>> allFields = 
getDataAndMetaCols(dataSchema);
+
+    Option<ClosableIterator<T>> dataFileIterator = 
requiredFields.getRight().isEmpty() ? Option.empty() :
+        
Option.of(readerContext.getFileRecordIterator(dataFile.getHadoopPath(), 0, 
dataFile.getFileLen(),
+            createSchemaFromFields(allFields.getRight()), 
createSchemaFromFields(requiredFields.getRight()), hadoopConf));
+
+    Option<ClosableIterator<T>> skeletonFileIterator = 
requiredFields.getLeft().isEmpty() ? Option.empty() :
+        
Option.of(readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 0, 
baseFile.getFileLen(),
+            createSchemaFromFields(allFields.getLeft()), 
createSchemaFromFields(requiredFields.getLeft()), hadoopConf));
+    if (!dataFileIterator.isPresent() && !skeletonFileIterator.isPresent()) {
+      throw new IllegalStateException("should not be here if only partition 
cols are required");
+    } else if (!dataFileIterator.isPresent()) {
+      return skeletonFileIterator.get();
+    } else if (!skeletonFileIterator.isPresent()) {
+      return  dataFileIterator.get();
+    } else {
+      return readerContext.mergeBootstrapReaders(skeletonFileIterator.get(), 
dataFileIterator.get());
+    }
   }
 
   /**
@@ -162,38 +253,44 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
    * @throws IOException on reader error.
    */
   public boolean hasNext() throws IOException {
-    return recordBuffer.hasNext();
+    if (recordBuffer == null) {
+      return baseFileIterator.hasNext();
+    } else {
+      return recordBuffer.hasNext();
+    }
   }
 
   /**
    * @return The next record after calling {@link #hasNext}.
    */
   public T next() {
-    return recordBuffer.next();
+    T nextVal = recordBuffer == null ? baseFileIterator.next() : 
recordBuffer.next();
+    if (outputConverter.isPresent()) {
+      return outputConverter.get().apply(nextVal);
+    }
+    return nextVal;
   }
 
   private void scanLogFiles() {
-    if (logFilePathList.isPresent()) {
-      String path = baseFilePath.isPresent() ? baseFilePath.get().getPath() : 
logFilePathList.get().get(0);
-      FileSystem fs = readerContext.getFs(path, hadoopConf);
-
-      HoodieMergedLogRecordReader logRecordReader = 
HoodieMergedLogRecordReader.newBuilder()
-          .withHoodieReaderContext(readerContext)
-          .withFileSystem(fs)
-          .withBasePath(readerState.tablePath)
-          .withLogFilePaths(logFilePathList.get())
-          .withLatestInstantTime(readerState.latestCommitTime)
-          .withReaderSchema(readerState.logRecordAvroSchema)
-          .withReadBlocksLazily(getBooleanWithAltKeys(props, 
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
-          .withReverseReader(false)
-          .withBufferSize(getIntWithAltKeys(props, 
HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE))
-          .withPartition(getRelativePartitionPath(
-              new Path(readerState.tablePath), new 
Path(logFilePathList.get().get(0)).getParent()))
-          .withRecordMerger(recordMerger)
-          .withRecordBuffer(recordBuffer)
-          .build();
-      logRecordReader.close();
-    }
+    String path = readerState.tablePath;
+    FileSystem fs = readerContext.getFs(path, hadoopConf);
+
+    HoodieMergedLogRecordReader logRecordReader = 
HoodieMergedLogRecordReader.newBuilder()
+        .withHoodieReaderContext(readerContext)
+        .withFileSystem(fs)
+        .withBasePath(readerState.tablePath)
+        .withLogFiles(logFiles)
+        .withLatestInstantTime(readerState.latestCommitTime)
+        .withReaderSchema(readerState.logRecordAvroSchema)
+        .withReadBlocksLazily(getBooleanWithAltKeys(props, 
HoodieReaderConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE))
+        .withReverseReader(false)
+        .withBufferSize(getIntWithAltKeys(props, 
HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE))
+        .withPartition(getRelativePartitionPath(
+            new Path(readerState.tablePath), 
logFiles.get(0).getPath().getParent()))
+        .withRecordMerger(recordMerger)
+        .withRecordBuffer(recordBuffer)
+        .build();
+    logRecordReader.close();
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CachingIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CachingIterator.java
new file mode 100644
index 00000000000..58d5aa78709
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CachingIterator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+
+public abstract class CachingIterator<T> implements ClosableIterator<T> {
+
+  protected T nextRecord;
+
+  protected abstract boolean doHasNext();
+
+  @Override
+  public final boolean hasNext() {
+    return nextRecord != null || doHasNext();
+  }
+
+  @Override
+  public final T next() {
+    T record = nextRecord;
+    nextRecord = null;
+    return record;
+  }
+
+  public static <U> CachingIterator<U> wrap(ClosableIterator<U> iterator) {
+    return new CachingIterator<U>() {
+      @Override
+      protected boolean doHasNext() {
+        if (iterator.hasNext()) {
+          nextRecord = iterator.next();
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public void close() {
+        iterator.close();
+      }
+    };
+  }
+
+  public static <U> CachingIterator<U> wrap(ClosableIterator<U> iterator, 
HoodieReaderContext<U> readerContext) {
+    return new CachingIterator<U>() {
+      @Override
+      protected boolean doHasNext() {
+        if (iterator.hasNext()) {
+          nextRecord = readerContext.seal(iterator.next());
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public void close() {
+        iterator.close();
+      }
+    };
+  }
+
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
index c05683e605c..6d8fa651e51 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
@@ -24,6 +24,7 @@ import org.apache.avro.Schema;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+
 import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -229,4 +230,75 @@ public class TestAvroSchemaUtils {
   public void testIsCompatiblePartitionDropCols(boolean shouldValidate) {
     AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, 
shouldValidate, false, Collections.singleton("c"));
   }
+
+  /* [HUDI-7045] should uncomment this test
+  @Test
+  public void testAppendFieldsToSchemaDedupNested() {
+    Schema full_schema = new Schema.Parser().parse("{\n"
+        + "  \"type\": \"record\",\n"
+        + "  \"namespace\": \"example.schema\",\n"
+        + "  \"name\": \"source\",\n"
+        + "  \"fields\": [\n"
+        + "    {\n"
+        + "      \"name\": \"number\",\n"
+        + "      \"type\": [\"null\", \"int\"]\n"
+        + "    },\n"
+        + "    {\n"
+        + "      \"name\": \"nested_record\",\n"
+        + "      \"type\": {\n"
+        + "        \"name\": \"nested\",\n"
+        + "        \"type\": \"record\",\n"
+        + "        \"fields\": [\n"
+        + "          {\n"
+        + "            \"name\": \"string\",\n"
+        + "            \"type\": [\"null\", \"string\"]\n"
+        + "          },\n"
+        + "          {\n"
+        + "            \"name\": \"long\",\n"
+        + "            \"type\": [\"null\", \"long\"]\n"
+        + "          }\n"
+        + "        ]\n"
+        + "      }\n"
+        + "    },\n"
+        + "    {\n"
+        + "      \"name\": \"other\",\n"
+        + "      \"type\": [\"null\", \"int\"]\n"
+        + "    }\n"
+        + "  ]\n"
+        + "}\n");
+
+    Schema missing_field_schema = new Schema.Parser().parse("{\n"
+        + "  \"type\": \"record\",\n"
+        + "  \"namespace\": \"example.schema\",\n"
+        + "  \"name\": \"source\",\n"
+        + "  \"fields\": [\n"
+        + "    {\n"
+        + "      \"name\": \"number\",\n"
+        + "      \"type\": [\"null\", \"int\"]\n"
+        + "    },\n"
+        + "    {\n"
+        + "      \"name\": \"nested_record\",\n"
+        + "      \"type\": {\n"
+        + "        \"name\": \"nested\",\n"
+        + "        \"type\": \"record\",\n"
+        + "        \"fields\": [\n"
+        + "          {\n"
+        + "            \"name\": \"string\",\n"
+        + "            \"type\": [\"null\", \"string\"]\n"
+        + "          }\n"
+        + "        ]\n"
+        + "      }\n"
+        + "    },\n"
+        + "    {\n"
+        + "      \"name\": \"other\",\n"
+        + "      \"type\": [\"null\", \"int\"]\n"
+        + "    }\n"
+        + "  ]\n"
+        + "}\n");
+
+      Option<Schema.Field> missingField = 
AvroSchemaUtils.findNestedField(full_schema, "nested_record.long");
+      assertTrue(missingField.isPresent());
+      assertEquals(full_schema, 
AvroSchemaUtils.appendFieldsToSchemaDedupNested(missing_field_schema, 
Collections.singletonList(missingField.get())));
+  }
+  */
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 439948a6cc9..59c078e929b 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -34,7 +34,6 @@ import 
org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 
@@ -45,7 +44,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -69,7 +67,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
 
   public abstract String getBasePath();
 
-  public abstract HoodieReaderContext<T> getHoodieReaderContext(String 
tablePath, String[] partitionPath);
+  public abstract HoodieReaderContext<T> getHoodieReaderContext(String 
tablePath, Schema avroSchema);
 
   public abstract void commitToTable(List<String> recordList, String operation,
                                      Map<String, String> writeConfigs);
@@ -158,7 +156,6 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     SyncableFileSystemView fsView = viewManager.getFileSystemView(metaClient);
     FileSlice fileSlice = 
fsView.getAllFileSlices(partitionPaths[0]).findFirst().get();
     List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
-    Collections.sort(logFilePathList);
     assertEquals(expectedLogFileNum, logFilePathList.size());
 
     List<T> actualRecordList = new ArrayList<>();
@@ -169,17 +166,17 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
       props.setProperty(PARTITION_FIELDS.key(), 
metaClient.getTableConfig().getString(PARTITION_FIELDS));
     }
-    String[] partitionValues = partitionPaths[0].isEmpty() ? new String[] {} : 
new String[] {partitionPaths[0]};
     assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
     HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
-        getHoodieReaderContext(tablePath, partitionValues),
+        getHoodieReaderContext(tablePath, avroSchema),
         hadoopConf,
         tablePath,
         metaClient.getActiveTimeline().lastInstant().get().getTimestamp(),
-        fileSlice.getBaseFile(),
-        logFilePathList.isEmpty() ? Option.empty() : 
Option.of(logFilePathList),
+        fileSlice,
+        avroSchema,
         avroSchema,
         props,
+        metaClient.getTableConfig(),
         0,
         fileSlice.getTotalFileSize(),
         false);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index f90e9bf5d7b..88f7224cff0 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -234,7 +234,7 @@ class 
HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
   override def buildFileIndex(): FileIndex = fileIndex
 
   override def buildFileFormat(): FileFormat = {
-    if (fileGroupReaderEnabled && !isBootstrap) {
+    if (fileGroupReaderEnabled) {
       new HoodieFileGroupReaderBasedParquetFileFormat(
         tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
         metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 1728affac5b..3565c70e5eb 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -17,19 +17,18 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import kotlin.NotImplementedError
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
+import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, 
HoodieCDCFileGroupSplit}
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model._
-import org.apache.hudi.common.table.read.HoodieFileGroupReader
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
-import org.apache.hudi.common.util.{Option => HOption}
-import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, 
MergeOnReadSnapshotRelation, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.common.table.read.HoodieFileGroupReader
+import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, 
MergeOnReadSnapshotRelation, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -41,10 +40,15 @@ import org.apache.spark.sql.types.{StringType, StructField, 
StructType}
 import org.apache.spark.util.SerializableConfiguration
 
 import scala.annotation.tailrec
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.jdk.CollectionConverters.asScalaIteratorConverter
 
+trait HoodieFormatTrait {
+
+  // Used so that the planner only projects once and does not stack overflow
+  var isProjected: Boolean = false
+}
+
 /**
  * This class utilizes {@link HoodieFileGroupReader} and its related classes 
to support reading
  * from Parquet formatted base files and their log files.
@@ -59,8 +63,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: 
HoodieTableState,
                                                   isIncremental: Boolean,
                                                   shouldUseRecordPosition: 
Boolean,
                                                   requiredFilters: Seq[Filter]
-                                           ) extends ParquetFileFormat with 
SparkAdapterSupport {
-  var isProjected = false
+                                           ) extends ParquetFileFormat with 
SparkAdapterSupport with HoodieFormatTrait {
 
   /**
    * Support batch needs to remain consistent, even if one side of a bootstrap 
merge can support
@@ -69,10 +72,11 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
   private var supportBatchCalled = false
   private var supportBatchResult = false
 
+  private val sanitizedTableName = 
AvroSchemaUtils.getAvroRecordQualifiedName(tableName)
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     if (!supportBatchCalled || supportBatchResult) {
       supportBatchCalled = true
-      supportBatchResult = !isMOR && !isIncremental && 
super.supportBatch(sparkSession, schema)
+      supportBatchResult = !isMOR && !isIncremental && !isBootstrap && 
super.supportBatch(sparkSession, schema)
     }
     supportBatchResult
   }
@@ -88,17 +92,27 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                                               filters: Seq[Filter],
                                               options: Map[String, String],
                                               hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
+    //dataSchema is not always right due to spark bugs
+    val partitionColumns = partitionSchema.fieldNames
+    val dataSchema = 
StructType(tableSchema.structTypeSchema.fields.filterNot(f => 
partitionColumns.contains(f.name)))
     val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
     spark.conf.set("spark.sql.parquet.enableVectorizedReader", 
supportBatchResult)
     val requiredSchemaWithMandatory = 
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema, partitionSchema)
+    val isCount = requiredSchemaWithMandatory.isEmpty
     val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f 
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
     val requiredMeta = StructType(requiredSchemaSplits._1)
     val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
     val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
-    val (baseFileReader, preMergeBaseFileReader, _, _, cdcFileReader) = 
buildFileReaders(
+    val (baseFileReader, preMergeBaseFileReader, readerMaps, cdcFileReader) = 
buildFileReaders(
       spark, dataSchema, partitionSchema, if (isIncremental) 
requiredSchemaWithMandatory else requiredSchema,
       filters, options, augmentedHadoopConf, requiredSchemaWithMandatory, 
requiredWithoutMeta, requiredMeta)
+
+    val requestedAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, 
sanitizedTableName)
+    val dataAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName)
+
     val broadcastedHadoopConf = spark.sparkContext.broadcast(new 
SerializableConfiguration(augmentedHadoopConf))
+    val broadcastedDataSchema =  spark.sparkContext.broadcast(dataAvroSchema)
+    val broadcastedRequestedSchema =  
spark.sparkContext.broadcast(requestedAvroSchema)
     val props: TypedProperties = HoodieFileIndex.getConfigProperties(spark, 
options)
 
     (file: PartitionedFile) => {
@@ -106,58 +120,47 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
         // Snapshot or incremental queries.
         case fileSliceMapping: HoodiePartitionFileSliceMapping =>
           val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
-          if (FSUtils.isLogFile(filePath)) {
-            val partitionValues = fileSliceMapping.getPartitionValues
-            val fileSlice = 
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
-            buildFileGroupIterator(
-              Option.empty[PartitionedFile => Iterator[InternalRow]],
-              partitionValues,
-              Option.empty[HoodieBaseFile],
-              getLogFilesFromSlice(fileSlice),
-              requiredSchemaWithMandatory,
-              outputSchema,
-              partitionSchema,
-              broadcastedHadoopConf.value.value,
-              -1,
-              -1,
-              shouldUseRecordPosition
-            )
+          val filegroupName = if (FSUtils.isLogFile(filePath)) {
+            FSUtils.getFileId(filePath.getName).substring(1)
           } else {
-            fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName)) 
match {
-              case Some(fileSlice) =>
+            FSUtils.getFileId(filePath.getName)
+          }
+          fileSliceMapping.getSlice(filegroupName) match {
+            case Some(fileSlice) if !isCount =>
+              if (requiredSchema.isEmpty && 
!fileSlice.getLogFiles.findAny().isPresent) {
                 val hoodieBaseFile = fileSlice.getBaseFile.get()
-                val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
-                val partitionValues = fileSliceMapping.getPartitionValues
-                val logFiles = getLogFilesFromSlice(fileSlice)
-                if (requiredSchemaWithMandatory.isEmpty) {
-                  val baseFile = createPartitionedFile(partitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
-                  baseFileReader(baseFile)
-                } else if (bootstrapFileOpt.isPresent) {
-                  // TODO: Use FileGroupReader here: HUDI-6942.
-                  throw new NotImplementedError("Not support reading bootstrap 
file")
-                } else {
-                  if (logFiles.isEmpty) {
-                    throw new IllegalStateException(
-                      "should not be here since file slice should not have 
been broadcasted "
-                        + "since it has no log or data files")
-                  }
-                  buildFileGroupIterator(
-                    Option(preMergeBaseFileReader),
-                    partitionValues,
-                    Option(hoodieBaseFile),
-                    logFiles,
-                    requiredSchemaWithMandatory,
-                    outputSchema,
-                    partitionSchema,
-                    broadcastedHadoopConf.value.value,
-                    0,
-                    hoodieBaseFile.getFileLen,
-                    shouldUseRecordPosition
-                  )
-                }
-              // TODO: Use FileGroupReader here: HUDI-6942.
-              case _ => baseFileReader(file)
-            }
+                
baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen))
+              } else {
+                val readerContext: HoodieReaderContext[InternalRow] = new 
SparkFileFormatInternalRowReaderContext(
+                  readerMaps)
+                val serializedHadoopConf = broadcastedHadoopConf.value.value
+                val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
+                  
.builder().setConf(serializedHadoopConf).setBasePath(tableState.tablePath).build
+                val reader = new HoodieFileGroupReader[InternalRow](
+                  readerContext,
+                  serializedHadoopConf,
+                  tableState.tablePath,
+                  tableState.latestCommitTimestamp.get,
+                  fileSlice,
+                  broadcastedDataSchema.value,
+                  broadcastedRequestedSchema.value,
+                  metaClient.getTableConfig.getProps,
+                  metaClient.getTableConfig,
+                  file.start,
+                  file.length,
+                  shouldUseRecordPosition)
+                reader.initRecordIterators()
+                // Append partition values to rows and project to output schema
+                appendPartitionAndProject(
+                  
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala,
+                  requiredSchema,
+                  partitionSchema,
+                  outputSchema,
+                  fileSliceMapping.getPartitionValues)
+              }
+
+            // TODO: Use FileGroupReader here: HUDI-6942.
+            case _ => baseFileReader(file)
           }
         // CDC queries.
         case hoodiePartitionCDCFileGroupSliceMapping: 
HoodiePartitionCDCFileGroupMapping =>
@@ -190,43 +193,6 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       props)
   }
 
-  protected def buildFileGroupIterator(preMergeBaseFileReader: 
Option[PartitionedFile => Iterator[InternalRow]],
-                                       partitionValues: InternalRow,
-                                       baseFile: Option[HoodieBaseFile],
-                                       logFiles: List[HoodieLogFile],
-                                       requiredSchemaWithMandatory: StructType,
-                                       outputSchema: StructType,
-                                       partitionSchema: StructType,
-                                       hadoopConf: Configuration,
-                                       start: Long,
-                                       length: Long,
-                                       shouldUseRecordPosition: Boolean): 
Iterator[InternalRow] = {
-    val readerContext: HoodieReaderContext[InternalRow] = new 
SparkFileFormatInternalRowReaderContext(
-      preMergeBaseFileReader, partitionValues)
-    val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
-      .builder().setConf(hadoopConf).setBasePath(tableState.tablePath).build
-    val reader = new HoodieFileGroupReader[InternalRow](
-      readerContext,
-      hadoopConf,
-      tableState.tablePath,
-      tableState.latestCommitTimestamp.get,
-      if (baseFile.nonEmpty) HOption.of(baseFile.get) else HOption.empty(),
-      HOption.of(logFiles.map(f => f.getPath.toString).asJava),
-      HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, 
tableName),
-      metaClient.getTableConfig.getProps,
-      start,
-      length,
-      shouldUseRecordPosition)
-    reader.initRecordIterators()
-    // Append partition values to rows and project to output schema
-    appendPartitionAndProject(
-      
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala,
-      requiredSchemaWithMandatory,
-      partitionSchema,
-      outputSchema,
-      partitionValues)
-  }
-
   private def appendPartitionAndProject(iter: Iterator[InternalRow],
                                         inputSchema: StructType,
                                         partitionSchema: StructType,
@@ -279,7 +245,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     // If not MergeOnRead or if projection is compatible
     if (isIncremental) {
       StructType(dataSchema.toArray ++ partitionSchema.fields)
-    } else if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState)) {
+    } else {
       val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
       for (field <- mandatoryFields) {
         if (requiredSchema.getFieldIndex(field).isEmpty) {
@@ -294,8 +260,6 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       }
       val addedFields = StructType(added.toArray)
       StructType(requiredSchema.toArray ++ addedFields.fields)
-    } else {
-      dataSchema
     }
   }
 
@@ -305,23 +269,26 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                                  requiredWithoutMeta: StructType, 
requiredMeta: StructType):
   (PartitionedFile => Iterator[InternalRow],
     PartitionedFile => Iterator[InternalRow],
-    PartitionedFile => Iterator[InternalRow],
-    PartitionedFile => Iterator[InternalRow],
+    mutable.Map[Long, PartitionedFile => Iterator[InternalRow]],
     PartitionedFile => Iterator[InternalRow]) = {
 
+    val m = scala.collection.mutable.Map[Long, PartitionedFile => 
Iterator[InternalRow]]()
+
     val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters, 
tableState.recordKeyField)
     val baseFileReader = super.buildReaderWithPartitionValues(sparkSession, 
dataSchema, partitionSchema, requiredSchema,
       filters ++ requiredFilters, options, new Configuration(hadoopConf))
+    m.put(generateKey(dataSchema, requiredSchema), baseFileReader)
 
     //file reader for reading a hudi base file that needs to be merged with 
log files
     val preMergeBaseFileReader = if (isMOR) {
       // Add support for reading files using inline file system.
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchemaWithMandatory,
+      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
StructType(Seq.empty), requiredSchemaWithMandatory,
         if (shouldUseRecordPosition) requiredFilters else 
recordKeyRelatedFilters ++ requiredFilters,
         options, new Configuration(hadoopConf))
     } else {
       _: PartitionedFile => Iterator.empty
     }
+    m.put(generateKey(dataSchema, requiredSchemaWithMandatory), 
preMergeBaseFileReader)
 
     val cdcFileReader = super.buildReaderWithPartitionValues(
       sparkSession,
@@ -341,41 +308,50 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     val needDataCols = requiredWithoutMeta.nonEmpty
 
     //file reader for bootstrap skeleton files
-    val skeletonReader = if (needMetaCols && isBootstrap) {
+    if (needMetaCols && isBootstrap) {
+      val key = generateKey(HoodieSparkUtils.getMetaSchema, requiredMeta)
       if (needDataCols || isMOR) {
         // no filter and no append
-        super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
-          requiredMeta, Seq.empty, options, new Configuration(hadoopConf))
+        m.put(key, super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
+          requiredMeta, Seq.empty, options, new Configuration(hadoopConf)))
       } else {
-        // filter and append
-        super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, partitionSchema,
-          requiredMeta, filters ++ requiredFilters, options, new 
Configuration(hadoopConf))
+        // filter
+        m.put(key, super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
+          requiredMeta, filters ++ requiredFilters, options, new 
Configuration(hadoopConf)))
       }
-    } else {
-      _: PartitionedFile => Iterator.empty
+
+      val requestedMeta = StructType(requiredSchema.fields.filter(sf => 
isMetaField(sf.name)))
+      m.put(generateKey(HoodieSparkUtils.getMetaSchema, requestedMeta),
+        super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty), requestedMeta,
+          Seq.empty, options, new Configuration(hadoopConf)))
     }
 
     //file reader for bootstrap base files
-    val bootstrapBaseReader = if (needDataCols && isBootstrap) {
+    if (needDataCols && isBootstrap) {
       val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf => 
isMetaField(sf.name)))
-      if (isMOR) {
+      val key = generateKey(dataSchemaWithoutMeta, requiredWithoutMeta)
+      if (isMOR || needMetaCols) {
+        m.put(key, super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
+          Seq.empty, options, new Configuration(hadoopConf)))
         // no filter and no append
-        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
-          Seq.empty, options, new Configuration(hadoopConf))
-      } else if (needMetaCols) {
-        // no filter but append
-        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
-          Seq.empty, options, new Configuration(hadoopConf))
+
       } else {
-        // filter and append
-        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
-          filters ++ requiredFilters, options, new Configuration(hadoopConf))
+        // filter
+        m.put(key, super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
+          filters ++ requiredFilters, options, new Configuration(hadoopConf)))
       }
-    } else {
-      _: PartitionedFile => Iterator.empty
+
+      val requestedWithoutMeta = StructType(requiredSchema.fields.filterNot(sf 
=> isMetaField(sf.name)))
+      m.put(generateKey(dataSchemaWithoutMeta, requestedWithoutMeta),
+        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, StructType(Seq.empty), requestedWithoutMeta,
+          Seq.empty, options, new Configuration(hadoopConf)))
     }
 
-    (baseFileReader, preMergeBaseFileReader, skeletonReader, 
bootstrapBaseReader, cdcFileReader)
+    (baseFileReader, preMergeBaseFileReader, m, cdcFileReader)
+  }
+
+  protected def generateKey(dataSchema: StructType, requestedSchema: 
StructType): Long = {
+    AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName).hashCode() + 
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, 
sanitizedTableName).hashCode()
   }
 
   protected def getRecordKeyRelatedFilters(filters: Seq[Filter], 
recordKeyColumn: String): Seq[Filter] = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
index afa85ae6475..44381a5db4a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/NewHoodieParquetFileFormat.scala
@@ -26,7 +26,7 @@ import 
org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{BaseFile, FileSlice, HoodieLogFile, 
HoodieRecord}
 import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils, 
HoodieTableSchema, HoodieTableState, LogFileIterator, 
MergeOnReadSnapshotRelation, HoodiePartitionFileSliceMapping, 
RecordMergingFileIterator, SkipMergeIterator, SparkAdapterSupport}
+import org.apache.hudi.{HoodieBaseRelation, HoodiePartitionFileSliceMapping, 
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, LogFileIterator, 
MergeOnReadSnapshotRelation, RecordMergingFileIterator, SkipMergeIterator, 
SparkAdapterSupport}
 import org.apache.spark.broadcast.Broadcast
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
@@ -54,7 +54,7 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
                                  isBootstrap: Boolean,
                                  isIncremental: Boolean,
                                  requiredFilters: Seq[Filter]
-                                ) extends ParquetFileFormat with 
SparkAdapterSupport {
+                                ) extends ParquetFileFormat with 
SparkAdapterSupport with HoodieFormatTrait {
 
   override def isSplitable(sparkSession: SparkSession,
                            options: Map[String, String],
@@ -62,9 +62,6 @@ class NewHoodieParquetFileFormat(tableState: 
Broadcast[HoodieTableState],
     false
   }
 
-  //Used so that the planner only projects once and does not stack overflow
-  var isProjected = false
-
   /**
    * Support batch needs to remain consistent, even if one side of a bootstrap 
merge can support
    * while the other side can't
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
index ee91b2bcf85..9847918adc1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
@@ -86,9 +86,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalT
     properties.setProperty(
         HoodieTableConfig.BASE_FILE_FORMAT.key(),
         HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
-    properties.setProperty(
-        PAYLOAD_ORDERING_FIELD_PROP_KEY,
-        
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
+    properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELD.key(), 
"record_key");
     
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),"partition_path");
     
properties.setProperty(HoodieTableConfig.PARTITION_FIELDS.key(),"partition_path");
     metaClient = getHoodieMetaClient(hadoopConf(), basePath(), 
HoodieTableType.MERGE_ON_READ, properties);
@@ -168,8 +166,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalT
         LOGFILE_DATA_BLOCK_FORMAT.key(),
         "parquet");
     extraProperties.setProperty(
-        PAYLOAD_ORDERING_FIELD_PROP_KEY,
-        
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
+        HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "record_key");
     extraProperties.setProperty(
         FILE_GROUP_READER_ENABLED.key(),
         "true");
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index 20d2edfaa8d..682558f79f9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -97,7 +97,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
         ? Option.empty() : Option.of(partitionPaths[0]);
 
     buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
-        getHoodieReaderContext(getBasePath(), partitionValues),
+        getHoodieReaderContext(getBasePath(), avroSchema),
         avroSchema,
         avroSchema,
         partitionNameOpt,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
index ec719414dc8..efc803666e5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.functional;
 
 import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.model.HoodieTableType;
 
 import org.apache.spark.sql.Dataset;
@@ -103,9 +104,13 @@ public class TestNewHoodieParquetFileFormat extends 
TestBootstrapReadBase {
 
   protected void testCount(String tableBasePath) {
     Dataset<Row> legacyDf = sparkSession.read().format("hudi")
-        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), 
"false").load(tableBasePath);
+        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "false")
+        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+        .load(tableBasePath);
     Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
-        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), 
"true").load(tableBasePath);
+        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "true")
+        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
+        .load(tableBasePath);
     assertEquals(legacyDf.count(), fileFormatDf.count());
   }
 
@@ -119,9 +124,13 @@ public class TestNewHoodieParquetFileFormat extends 
TestBootstrapReadBase {
 
   protected void runIndividualComparison(String tableBasePath, String 
firstColumn, String... columns) {
     Dataset<Row> legacyDf = sparkSession.read().format("hudi")
-        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), 
"false").load(tableBasePath);
+        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "false")
+        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
+        .load(tableBasePath);
     Dataset<Row> fileFormatDf = sparkSession.read().format("hudi")
-        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), 
"true").load(tableBasePath);
+        
.option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(), "true")
+        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true")
+        .load(tableBasePath);
     if (firstColumn.isEmpty()) {
       //df.except(df) does not work with map type cols
       legacyDf = legacyDf.drop("city_to_state");
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index a5f25d8bd34..5c1d04ee477 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -26,15 +26,13 @@ import 
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLE
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
-import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.{AvroConversionUtils, 
SparkFileFormatInternalRowReaderContext}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils, 
HoodieUnsafeUtils, Row, SaveMode, SparkSession}
-import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
@@ -83,31 +81,16 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     tempDir.toAbsolutePath.toUri.toString
   }
 
-  override def getHoodieReaderContext(tablePath: String,
-                                      partitionValues: Array[String]): 
HoodieReaderContext[InternalRow] = {
+  override def getHoodieReaderContext(tablePath: String, avroSchema: Schema): 
HoodieReaderContext[InternalRow] = {
     val parquetFileFormat = new ParquetFileFormat
-    val metaClient = 
HoodieTableMetaClient.builder.setConf(getHadoopConf).setBasePath(tablePath).build
-    val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
     val structTypeSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
-    val partitionFields = metaClient.getTableConfig.getPartitionFields
-    val partitionSchema = if (partitionFields.isPresent) {
-      new StructType(structTypeSchema.fields.filter(f => 
partitionFields.get().contains(f.name)))
-    } else {
-      new StructType()
-    }
 
     val recordReaderIterator = 
parquetFileFormat.buildReaderWithPartitionValues(
-      spark, structTypeSchema, partitionSchema, structTypeSchema, Seq.empty, 
Map.empty, getHadoopConf)
-    val numPartitionFields = if (partitionFields.isPresent) 
partitionFields.get().length else 0
-    assertEquals(numPartitionFields, partitionValues.length)
-
-    val partitionValuesEncoded = new Array[UTF8String](partitionValues.length)
-    for (i <- Range(0, numPartitionFields)) {
-      partitionValuesEncoded.update(i, 
UTF8String.fromString(partitionValues.apply(i)))
-    }
+      spark, structTypeSchema, StructType(Seq.empty), structTypeSchema, 
Seq.empty, Map.empty, getHadoopConf)
 
-    val partitionValueRow = new 
GenericInternalRow(partitionValuesEncoded.toArray[Any])
-    new SparkFileFormatInternalRowReaderContext(Option(recordReaderIterator), 
partitionValueRow)
+    val m = scala.collection.mutable.Map[Long, PartitionedFile => 
Iterator[InternalRow]]()
+    m.put(2*avroSchema.hashCode(), recordReaderIterator)
+    new SparkFileFormatInternalRowReaderContext(m)
   }
 
   override def commitToTable(recordList: util.List[String], operation: String, 
options: util.Map[String, String]): Unit = {
@@ -135,6 +118,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     assertEquals(expectedDf.count, actualRecordList.size)
     val actualDf = HoodieUnsafeUtils.createDataFrameFromInternalRows(
       spark, actualRecordList, HoodieInternalRowUtils.getCachedSchema(schema))
-    assertEquals(expectedDf.count, expectedDf.intersect(actualDf).count)
+    assertEquals(0, expectedDf.except(actualDf).count())
+    assertEquals(0, actualDf.except(expectedDf).count())
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
index 3f6934d9734..16d56373442 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala
@@ -29,9 +29,10 @@ class TestQueryMergeOnReadOptimizedTable extends 
HoodieSparkSqlTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts long,
+           |  partition long
            |) using hudi
-           | partitioned by (ts)
+           | partitioned by (partition)
            | location '$tablePath'
            | tblproperties (
            |  type = 'mor',
@@ -41,10 +42,10 @@ class TestQueryMergeOnReadOptimizedTable extends 
HoodieSparkSqlTestBase {
        """.stripMargin)
       // insert data to table
       spark.sql("set hoodie.parquet.max.file.size = 10000")
-      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
-      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
-      spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 1000)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 1000)")
+      spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000, 1000)")
       spark.sql(s"update $tableName set price = 11 where id = 1")
       spark.sql(s"update $tableName set price = 21 where id = 2")
       spark.sql(s"update $tableName set price = 31 where id = 3")
@@ -60,7 +61,7 @@ class TestQueryMergeOnReadOptimizedTable extends 
HoodieSparkSqlTestBase {
       // expect that all complete parquet files can be scanned with a pending 
compaction job
       assertQueryResult(4, tablePath)
 
-      spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
+      spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000, 1000)")
 
       // expect that all complete parquet files can be scanned with a pending 
compaction job
       assertQueryResult(5, tablePath)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
index be6a2bb7624..10ee9501422 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala
@@ -70,9 +70,10 @@ class TestHoodieLogFileProcedure extends 
HoodieSparkProcedureTestBase {
            |  id int,
            |  name string,
            |  price double,
-           |  ts long
+           |  ts long,
+           |  partition long
            |) using hudi
-           | partitioned by (ts)
+           | partitioned by (partition)
            | location '$tablePath'
            | tblproperties (
            |  type = 'mor',
@@ -81,8 +82,8 @@ class TestHoodieLogFileProcedure extends 
HoodieSparkProcedureTestBase {
            | )
        """.stripMargin)
       // insert data to table
-      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
-      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500, 1500")
       spark.sql(s"update $tableName set name = 'b1' where id = 1")
       spark.sql(s"update $tableName set name = 'b2' where id = 2")
 
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 069a4608073..7458d16641c 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import org.apache.hudi.SparkHoodieTableFileIndex
+import org.apache.hudi.{HoodieCDCFileIndex, SparkHoodieTableFileIndex}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
 import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
@@ -27,7 +27,7 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, 
LogicalPlan, MergeIntoTable, Project}
 import 
org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, 
ExplainCommand}
-import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.internal.SQLConf
 
@@ -100,10 +100,20 @@ object HoodieSpark2CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
 
   override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
     plan match {
-      case p@PhysicalOperation(_, _,
-      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
-        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
-        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), p)
+      case physicalOperation@PhysicalOperation(_, _,
+      logicalRelation@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait] && 
!fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
+        val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait]
+        ff.isProjected = true
+        val tableSchema = fs.location match {
+          case index: HoodieCDCFileIndex => index.cdcRelation.schema
+          case index: SparkHoodieTableFileIndex => index.schema
+        }
+        val resolvedSchema = logicalRelation.resolve(tableSchema, 
fs.sparkSession.sessionState.analyzer.resolver)
+        if (!fs.partitionSchema.fields.isEmpty) {
+          Project(resolvedSchema, physicalOperation)
+        } else {
+          physicalOperation
+        }
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
index a01cce70c1f..f5757e16b41 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
@@ -17,15 +17,17 @@
 
 package org.apache.spark.sql
 
-import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.{HoodieCDCFileIndex, SparkAdapterSupport, 
SparkHoodieTableFileIndex}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableOutputResolver
 import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, 
JoinHint, LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.command.{CreateTableLikeCommand, 
ExplainCommand}
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -84,6 +86,21 @@ trait HoodieSpark3CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
 
 object HoodieSpark3CatalystPlanUtils extends SparkAdapterSupport {
 
+  def applyNewFileFormatChanges(scanOperation: LogicalPlan, logicalRelation: 
LogicalPlan, fs: HadoopFsRelation): LogicalPlan = {
+    val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait]
+    ff.isProjected = true
+    val tableSchema = fs.location match {
+      case index: HoodieCDCFileIndex => index.cdcRelation.schema
+      case index: SparkHoodieTableFileIndex => index.schema
+    }
+    val resolvedSchema = logicalRelation.resolve(tableSchema, 
fs.sparkSession.sessionState.analyzer.resolver)
+    if (!fs.partitionSchema.fields.isEmpty && 
scanOperation.sameOutput(logicalRelation)) {
+      Project(resolvedSchema, scanOperation)
+    } else {
+      scanOperation
+    }
+  }
+
   /**
    * This is an extractor to accommodate for [[ResolvedTable]] signature 
change in Spark 3.2
    */
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
index 6cd5da79b86..34a1e084227 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
@@ -18,15 +18,14 @@
 
 package org.apache.spark.sql
 
-import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.ResolvedTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import 
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
-import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
@@ -49,9 +48,10 @@ object HoodieSpark30CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
   override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
-      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
-        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
-        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), s)
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+        if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+          && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
+        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index 8a56d0fba25..dd3ac530017 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, Project}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import 
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
-import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
@@ -49,9 +49,10 @@ object HoodieSpark31CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
   override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
-      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
-        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
-        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), s)
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+        if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+          && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
+        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index 1bb4638fcdb..7a30aebd856 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -19,17 +19,16 @@
 package org.apache.spark.sql
 
 import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt
 import org.apache.spark.sql.catalyst.analysis.ResolvedTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex, 
LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.command.RepairTableCommand
-import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
@@ -52,9 +51,10 @@ object HoodieSpark32CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
   override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
-      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
-        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
-        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), s)
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+        if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+          && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
+        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index 85bd4a2c5e5..9bcc40a29e6 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -18,16 +18,15 @@
 
 package org.apache.spark.sql
 
-import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt
 import org.apache.spark.sql.catalyst.analysis.ResolvedTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex, 
LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.command.RepairTableCommand
-import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
@@ -50,9 +49,10 @@ object HoodieSpark33CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
   override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _,
-      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
-        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
-        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), s)
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+        if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+          && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
+        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
index 41b629aac8e..e49adebc14b 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
@@ -18,15 +18,14 @@
 
 package org.apache.spark.sql
 
-import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, ResolvedTable}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex, 
LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.command.RepairTableCommand
-import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
@@ -49,9 +48,10 @@ object HoodieSpark34CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
   override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _, _,
-      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
-        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
-        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), s)
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+        if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+          && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
+        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
index 3ef6abdf8e1..eac3153b3f9 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
@@ -18,15 +18,14 @@
 
 package org.apache.spark.sql
 
-import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, ResolvedTable}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex, 
LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.command.RepairTableCommand
-import 
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
@@ -49,9 +48,10 @@ object HoodieSpark35CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
   override def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): 
LogicalPlan = {
     plan match {
       case s@ScanOperation(_, _, _,
-      l@LogicalRelation(fs: HadoopFsRelation, _, _, _)) if 
fs.fileFormat.isInstanceOf[NewHoodieParquetFileFormat] && 
!fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected =>
-        fs.fileFormat.asInstanceOf[NewHoodieParquetFileFormat].isProjected = 
true
-        
Project(l.resolve(fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema, 
fs.sparkSession.sessionState.analyzer.resolver), s)
+      l@LogicalRelation(fs: HadoopFsRelation, _, _, _))
+        if fs.fileFormat.isInstanceOf[ParquetFileFormat with HoodieFormatTrait]
+          && !fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait].isProjected =>
+        HoodieSpark3CatalystPlanUtils.applyNewFileFormatChanges(s, l, fs)
       case _ => plan
     }
   }

Reply via email to