codope commented on code in PR #10957:
URL: https://github.com/apache/hudi/pull/10957#discussion_r1625389180


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -71,28 +65,38 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                                                   isMOR: Boolean,
                                                   isBootstrap: Boolean,
                                                   isIncremental: Boolean,
+                                                  isCDC: Boolean,
+                                                  validCommits: String,
                                                   shouldUseRecordPosition: 
Boolean,
                                                   requiredFilters: Seq[Filter]
-                                           ) extends ParquetFileFormat with 
SparkAdapterSupport with HoodieFormatTrait {
+                                                 ) extends ParquetFileFormat 
with SparkAdapterSupport with HoodieFormatTrait {
 
   def getRequiredFilters: Seq[Filter] = requiredFilters
 
+  private val sanitizedTableName = 
AvroSchemaUtils.getAvroRecordQualifiedName(tableName)
+
   /**
    * Support batch needs to remain consistent, even if one side of a bootstrap 
merge can support
    * while the other side can't
    */
   private var supportBatchCalled = false
   private var supportBatchResult = false
 
-  private val sanitizedTableName = 
AvroSchemaUtils.getAvroRecordQualifiedName(tableName)
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     if (!supportBatchCalled || supportBatchResult) {
       supportBatchCalled = true
-      supportBatchResult = !isMOR && !isIncremental && !isBootstrap && 
super.supportBatch(sparkSession, schema)
+      supportBatchResult = !isCDC && !isIncremental && !isMOR && !isBootstrap 
&& super.supportBatch(sparkSession, schema)

Review Comment:
   Why disable columnar batch support for CDC? Schema evolution should not 
affect this right. I think if we disable perf is going to be adversely 
impacted. Please add a note above for why we disable in these conditions. Would 
be useful for reference.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -175,22 +182,31 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                   fileSliceMapping.getPartitionValues)
               }
 
-            // TODO: Use FileGroupReader here: HUDI-6942.
-            case _ => baseFileReader(file)
+            case _ => parquetFileReader.value.read(file, requiredSchema, 
partitionSchema, filters,
+              broadcastedStorageConf.value)
           }
         // CDC queries.
         case hoodiePartitionCDCFileGroupSliceMapping: 
HoodiePartitionCDCFileGroupMapping =>
           val fileSplits = 
hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
           val fileGroupSplit: HoodieCDCFileGroupSplit = 
HoodieCDCFileGroupSplit(fileSplits)
           buildCDCRecordIterator(
-            fileGroupSplit, cdcFileReader,
-            new HadoopStorageConfiguration(broadcastedHadoopConf.value.value), 
fileIndexProps, requiredSchema)
-        // TODO: Use FileGroupReader here: HUDI-6942.
-        case _ => baseFileReader(file)
+            fileGroupSplit, cdcFileReader.asInstanceOf[PartitionedFile => 
Iterator[InternalRow]],
+            broadcastedStorageConf.value, fileIndexProps, requiredSchema)
+
+        case _ => parquetFileReader.value.read(file, requiredSchema, 
partitionSchema, filters,
+          broadcastedStorageConf.value)
       }
     }
   }
 
+  protected def setSchemaEvolutionConfigs(conf: StorageConfiguration[_]): Unit 
= {

Review Comment:
   make private?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -46,21 +49,27 @@ import scala.collection.mutable
  *
  * This uses Spark parquet reader to read parquet data files or parquet log 
blocks.
  *
- * @param readermaps our intention is to build the reader inside of 
getFileRecordIterator, but since it is called from
- *                   the executor, we will need to port a bunch of the code 
from ParquetFileFormat for each spark version
- *                   for now, we pass in a map of the different readers we 
expect to create
+ * @param parquetFileReader A reader that transforms a {@link PartitionedFile} 
to an iterator of
+ *                        {@link InternalRow}. This is required for reading 
the base file and
+ *                        not required for reading a file group with only log 
files.
+ * @param recordKeyColumn column name for the recordkey
+ * @param filters spark filters that might be pushed down into the reader
  */
-class SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, 
PartitionedFile => Iterator[InternalRow]]) extends 
BaseSparkInternalRowReaderContext {
+class SparkFileFormatInternalRowReaderContext(parquetFileReader: 
SparkParquetReader,
+                                              recordKeyColumn: String,
+                                              filters: Seq[Filter]) extends 
BaseSparkInternalRowReaderContext {
   lazy val sparkAdapter = SparkAdapterSupport.sparkAdapter
   val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = 
mutable.Map()
+  lazy val recordKeyFilters: Seq[Filter] = filters.filter(f => 
f.references.exists(c => c.equalsIgnoreCase(recordKeyColumn)))

Review Comment:
   does it handle nested columns? let's add a test and take as a followup if it 
does not.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+
+import org.apache.avro.Schema;
+
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
+
+public class HoodiePositionBasedSchemaHandler<T> extends 
HoodieFileGroupReaderSchemaHandler<T> {
+  public HoodiePositionBasedSchemaHandler(HoodieReaderContext<T> readerContext,
+                                          Schema dataSchema,
+                                          Schema requestedSchema,
+                                          Option<InternalSchema> 
internalSchemaOpt,
+                                          HoodieTableConfig hoodieTableConfig) 
{
+    super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, 
hoodieTableConfig);
+
+  }
+
+  @Override
+  protected Schema prepareSchema() {
+    Schema preMergeSchema = super.prepareSchema();
+    return readerContext.getHasLogFiles()
+        ? addPositionalMergeCol(preMergeSchema)
+        : preMergeSchema;
+  }
+
+  private Schema addPositionalMergeCol(Schema input) {
+    return appendFieldsToSchemaDedupNested(input, 
Collections.singletonList(getPositionalMergeField()));
+  }
+
+  private Schema.Field getPositionalMergeField() {
+    return new 
Schema.Field(HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME,
+        Schema.create(Schema.Type.LONG), "", 0L);

Review Comment:
   Is default of 0 suitable? Should we make this field nullable?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java:
##########
@@ -92,10 +94,22 @@ public void processDataBlock(HoodieDataBlock dataBlock, 
Option<KeySpec> keySpecO
       // partial merging.
       enablePartialMerging = true;
     }
-    
+
     // Extract positions from data block.
     List<Long> recordPositions = extractRecordPositions(dataBlock);
 
+    Option<Pair<Function<T,T>, Schema>> schemaEvolutionTransformerOpt =

Review Comment:
   there is some duplicate code between subclass and superclass. Can we extract 
to a method?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -101,46 +121,150 @@ class 
SparkFileFormatInternalRowReaderContext(readerMaps: mutable.Map[Long, Part
   }
 
   override def mergeBootstrapReaders(skeletonFileIterator: 
ClosableIterator[InternalRow],
-                                     dataFileIterator: 
ClosableIterator[InternalRow]): ClosableIterator[InternalRow] = {
-    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
-      dataFileIterator.asInstanceOf[ClosableIterator[Any]])
+                                     skeletonRequiredSchema: Schema,
+                                     dataFileIterator: 
ClosableIterator[InternalRow],
+                                     dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
+    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], 
skeletonRequiredSchema,
+      dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema)
   }
 
-  protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], 
dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = {
-    new ClosableIterator[Any] {
-      val combinedRow = new JoinedRow()
-
-      override def hasNext: Boolean = {
-        //If the iterators are out of sync it is probably due to filter 
pushdown
-        checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
-          "Bootstrap data-file iterator and skeleton-file iterator have to be 
in-sync!")
-        dataFileIterator.hasNext && skeletonFileIterator.hasNext
+  protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
+                                 skeletonRequiredSchema: Schema,
+                                 dataFileIterator: ClosableIterator[Any],
+                                 dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
+    if (getUseRecordPosition) {
+      assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+      assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+      val javaSet = new java.util.HashSet[String]()
+      javaSet.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
+      val skeletonProjection = projectRecord(skeletonRequiredSchema,
+        AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema, 
javaSet))
+      //If we have log files, we will want to do position based merging with 
those as well,
+      //so leave the row index column at the end
+      val dataProjection = if (getHasLogFiles) {
+        getIdentityProjection
+      } else {
+        projectRecord(dataRequiredSchema,
+          AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema, javaSet))
       }
 
-      override def next(): Any = {
-        (skeletonFileIterator.next(), dataFileIterator.next()) match {
-          case (s: ColumnarBatch, d: ColumnarBatch) =>
-            val numCols = s.numCols() + d.numCols()
-            val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
-            for (i <- 0 until numCols) {
-              if (i < s.numCols()) {
-                vecs(i) = s.column(i)
+      //Always use internal row for positional merge because
+      //we need to iterate row by row when merging
+      new CachingIterator[InternalRow] {
+        val combinedRow = new JoinedRow()
+
+        //position column will always be at the end of the row
+        private def getPos(row: InternalRow): Long = {
+          row.getLong(row.numFields-1)
+        }
+
+        private def getNextSkeleton: (InternalRow, Long) = {
+          val nextSkeletonRow = 
skeletonFileIterator.next().asInstanceOf[InternalRow]
+          (nextSkeletonRow, getPos(nextSkeletonRow))
+        }
+
+        private def getNextData: (InternalRow, Long) = {
+          val nextSkeletonRow = 
skeletonFileIterator.next().asInstanceOf[InternalRow]
+          (nextSkeletonRow, getPos(nextSkeletonRow))
+        }
+
+        override def close(): Unit = {
+          skeletonFileIterator.close()
+          dataFileIterator.close()
+        }
+
+        override protected def doHasNext(): Boolean = {
+          if (!dataFileIterator.hasNext || !skeletonFileIterator.hasNext) {
+            false
+          } else {
+            var nextSkeleton = getNextSkeleton
+            var nextData = getNextData
+            while (nextSkeleton._2 != nextData._2) {
+              if (nextSkeleton._2 > nextData._2) {
+                if (!dataFileIterator.hasNext) {
+                  return false
+                } else {
+                  nextData = getNextData
+                }
               } else {
-                vecs(i) = d.column(i - s.numCols())
+                if (!skeletonFileIterator.hasNext) {
+                  return false
+                } else {
+                  nextSkeleton = getNextSkeleton
+                }
               }
             }
-            assert(s.numRows() == d.numRows())
-            sparkAdapter.makeColumnarBatch(vecs, s.numRows())
-          case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
-          case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
-          case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+            nextRecord = 
combinedRow(skeletonProjection.apply(nextSkeleton._1), 
dataProjection.apply(nextData._1))
+            true
+          }
         }
       }
+    } else {
+      new ClosableIterator[Any] {
+        val combinedRow = new JoinedRow()
 
-      override def close(): Unit = {
-        skeletonFileIterator.close()
-        dataFileIterator.close()
-      }
-    }.asInstanceOf[ClosableIterator[InternalRow]]
+        override def hasNext: Boolean = {
+          //If the iterators are out of sync it is probably due to filter 
pushdown
+          checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+            "Bootstrap data-file iterator and skeleton-file iterator have to 
be in-sync!")
+          dataFileIterator.hasNext && skeletonFileIterator.hasNext
+        }
+
+        override def next(): Any = {
+          (skeletonFileIterator.next(), dataFileIterator.next()) match {
+            case (s: ColumnarBatch, d: ColumnarBatch) =>
+              //This will not be used until [HUDI-7693] is implemented
+              val numCols = s.numCols() + d.numCols()
+              val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
+              for (i <- 0 until numCols) {
+                if (i < s.numCols()) {
+                  vecs(i) = s.column(i)
+                } else {
+                  vecs(i) = d.column(i - s.numCols())
+                }
+              }
+              assert(s.numRows() == d.numRows())
+              sparkAdapter.makeColumnarBatch(vecs, s.numRows())
+            case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+            case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+            case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+          }
+        }
+
+        override def close(): Unit = {
+          skeletonFileIterator.close()
+          dataFileIterator.close()
+        }
+      }.asInstanceOf[ClosableIterator[InternalRow]]
+    }
   }
 }
+
+object SparkFileFormatInternalRowReaderContext {
+  // From "ParquetFileFormat.scala": The names of the field for record 
position.
+  private val ROW_INDEX = "row_index"
+  private val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+
+  // From "namedExpressions.scala": Used to construct to record position field 
metadata.
+  private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = 
"__file_source_generated_metadata_col"
+  private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
+  private val METADATA_COL_ATTR_KEY = "__metadata_col"
+
+  def getRecordKeyRelatedFilters(filters: Seq[Filter], recordKeyColumn: 
String): Seq[Filter] = {
+    filters.filter(f => f.references.exists(c => 
c.equalsIgnoreCase(recordKeyColumn)))
+  }
+
+  def isIndexTempColumn(field: StructField): Boolean = {
+    field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
+  }
+
+  def getAppliedRequiredSchema(requiredSchema: StructType): StructType = {
+      val metadata = new MetadataBuilder()
+        .putString(METADATA_COL_ATTR_KEY, ROW_INDEX_TEMPORARY_COLUMN_NAME)
+        .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true)
+        .putString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY, 
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+        .build()
+      val rowIndexField = StructField(ROW_INDEX_TEMPORARY_COLUMN_NAME, 
LongType, nullable = false, metadata)

Review Comment:
   why is `nullable` set to false?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java:
##########
@@ -119,6 +119,8 @@ public HoodieFileGroupReader<IndexedRecord> build(
           fileSlice,
           schema,
           schema,
+          Option.empty(),
+          null,

Review Comment:
   Instead of null, can we change the argument to use `Option`? Or pass the 
metaclient from the test.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -168,96 +147,40 @@ private ClosableIterator<T> makeBaseFileIterator() throws 
IOException {
     }
 
     return readerContext.getFileRecordIterator(
-        baseFile.getStoragePath(), start,
-        length,
-        dataSchema, requiredSchema, storage);
-  }
-
-  private Schema generateRequiredSchema() {

Review Comment:
   note to self: all the logic here and below has moved to 
`HoodieFileGroupReaderSchemaHandler` which is subclassed by 
`HoodiePositionBasedSchemaHandler` to handle positional merge.



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java:
##########
@@ -81,6 +86,17 @@ public InternalSchema mergeSchema() {
     return new InternalSchema(record);
   }
 
+  /**
+   * Create final read schema to read avro/parquet file.
+   *
+   * @return read schema to read avro/parquet file.
+   */
+  public Pair<InternalSchema, Map<String, String>> mergeSchemaGetRenamed() {

Review Comment:
   let's reuse method above and add a test



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -242,7 +250,44 @@ protected Pair<ClosableIterator<T>, Schema> 
getRecordsIterator(HoodieDataBlock d
     } else {
       blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext);
     }
-    return Pair.of(blockRecordsIterator, dataBlock.getSchema());
+    Option<Pair<Function<T,T>, Schema>> schemaEvolutionTransformerOpt =
+        composeEvolvedSchemaTransformer(dataBlock);
+
+    // In case when schema has been evolved original persisted records will 
have to be
+    // transformed to adhere to the new schema
+    Function<T,T> transformer =
+        schemaEvolutionTransformerOpt.map(Pair::getLeft)
+            .orElse(Function.identity());
+
+    Schema schema = schemaEvolutionTransformerOpt.map(Pair::getRight)
+        .orElseGet(dataBlock::getSchema);
+
+    return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator, 
transformer), schema);
+  }
+
+  /**
+   * Get final Read Schema for support evolution.
+   * step1: find the fileSchema for current dataBlock.
+   * step2: determine whether fileSchema is compatible with the final read 
internalSchema.
+   * step3: merge fileSchema and read internalSchema to produce final read 
schema.
+   *
+   * @param dataBlock current processed block
+   * @return final read schema.
+   */
+  protected Option<Pair<Function<T,T>, Schema>> 
composeEvolvedSchemaTransformer(
+      HoodieDataBlock dataBlock) {
+    if (internalSchema.isEmptySchema()) {
+      return Option.empty();
+    }
+
+    long currentInstantTime = 
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
+    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
+        hoodieTableMetaClient, false);

Review Comment:
   there is a write config to enable cache, can we use that? Would be useful 
for base file record buffers at least.



##########
hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestHoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -34,63 +32,17 @@ class TestHoodieFileGroupReaderBasedParquetFileFormat 
extends SparkClientFunctio
       IsNotNull("non_key_column"),
       EqualTo("non_key_column", 1)
     )
-    val filtersWithoutKeyColumn = 
HoodieFileGroupReaderBasedParquetFileFormat.getRecordKeyRelatedFilters(
+    val filtersWithoutKeyColumn = 
SparkFileFormatInternalRowReaderContext.getRecordKeyRelatedFilters(
       filters, "key_column");
     assertEquals(0, filtersWithoutKeyColumn.size)
 
     val filtersWithKeys = Seq(
       EqualTo("key_column", 1),
       GreaterThan("non_key_column", 2)
     )
-    val filtersWithKeyColumn = 
HoodieFileGroupReaderBasedParquetFileFormat.getRecordKeyRelatedFilters(
+    val filtersWithKeyColumn = 
SparkFileFormatInternalRowReaderContext.getRecordKeyRelatedFilters(
       filtersWithKeys, "key_column")
     assertEquals(1, filtersWithKeyColumn.size)
     assertEquals("key_column", filtersWithKeyColumn.head.references.head)
   }
-
-  @Test
-  def testGetAppliedRequiredSchema(): Unit = {
-    val fields = Array(
-      StructField("column_a", LongType, nullable = false),
-      StructField("column_b", StringType, nullable = false))
-    val requiredSchema = StructType(fields)
-
-    val appliedSchema: StructType = 
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedRequiredSchema(
-      requiredSchema, shouldUseRecordPosition = true, "row_index")
-    if (HoodieSparkUtils.gteqSpark3_5) {
-      assertEquals(3, appliedSchema.fields.length)
-    } else {
-      assertEquals(2, appliedSchema.fields.length)
-    }
-
-    val schemaWithoutRowIndexColumn = 
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedRequiredSchema(
-      requiredSchema, shouldUseRecordPosition = false, "row_index")
-    assertEquals(2, schemaWithoutRowIndexColumn.fields.length)
-  }
-
-  @Test
-  def testGetAppliedFilters(): Unit = {
-    val filters = Seq(

Review Comment:
   I still see some methods moved to `SparkFileFormatInternalRowReaderContext` 
e.g. `getAppliedRequiredSchema`. Please keep the test but change it accordingly.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -161,15 +167,13 @@ abstract class HoodieBaseHadoopFsRelationFactory(val 
sqlContext: SQLContext,
     val shouldExtractPartitionValueFromPath =
       
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
         
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
-    val shouldUseBootstrapFastRead = 
optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean
-
-    shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || 
shouldUseBootstrapFastRead
+    shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
   }
 
   protected lazy val mandatoryFieldsForMerging: Seq[String] =
     Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
 
-  protected lazy val shouldUseRecordPosition: Boolean = 
checkIfAConfigurationEnabled(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS)
+  protected lazy val shouldUseRecordPosition: Boolean = 
checkIfAConfigurationEnabled(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS) && 
HoodieSparkUtils.gteqSpark3_5

Review Comment:
   Why can't we support for lower versions of Spark? Can you please add a note?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -232,211 +248,6 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     makeCloseableFileGroupMappingRecordIterator(iter, d => unsafeProjection(d))
   }
 
-  private def generateRequiredSchemaWithMandatory(requiredSchema: StructType,
-                                                  dataSchema: StructType,
-                                                  partitionSchema: 
StructType): StructType = {
-    val metaFields = Seq(
-      StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType))
-
-    // Helper method to get the StructField for nested fields
-    @tailrec
-    def findNestedField(schema: StructType, fieldParts: Array[String]): 
Option[StructField] = {
-      fieldParts.toList match {
-        case head :: Nil => schema.fields.find(_.name == head) // If it's the 
last part, find and return the field
-        case head :: tail => // If there are more parts, find the field and 
its nested fields
-          schema.fields.find(_.name == head) match {
-            case Some(StructField(_, nested: StructType, _, _)) => 
findNestedField(nested, tail.toArray)
-            case _ => None // The path is not valid
-          }
-        case _ => None // Empty path, should not happen if the input is correct
-      }
-    }
-
-    def findMetaField(name: String): Option[StructField] = {
-      metaFields.find(f => f.name == name)
-    }
-
-    val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
-    for (field <- mandatoryFields) {
-      if (requiredSchema.getFieldIndex(field).isEmpty) {
-        // Support for nested fields
-        val fieldParts = field.split("\\.")
-        val fieldToAdd = findNestedField(dataSchema, fieldParts)
-          .orElse(findNestedField(partitionSchema, fieldParts))
-          .orElse(findMetaField(field))
-          .getOrElse(throw new IllegalArgumentException(s"Field $field does 
not exist in the table schema"))
-        added.append(fieldToAdd)
-      }
-    }
-    val addedFields = StructType(added.toArray)
-    StructType(requiredSchema.toArray ++ addedFields.fields)
-  }
-
-  protected def buildFileReaders(sparkSession: SparkSession, dataSchema: 
StructType, partitionSchema: StructType,
-                                 requiredSchema: StructType, filters: 
Seq[Filter], options: Map[String, String],
-                                 hadoopConf: Configuration, 
requiredSchemaWithMandatory: StructType,
-                                 requiredWithoutMeta: StructType, 
requiredMeta: StructType):
-  (PartitionedFile => Iterator[InternalRow],
-    PartitionedFile => Iterator[InternalRow],
-    mutable.Map[Long, PartitionedFile => Iterator[InternalRow]],
-    PartitionedFile => Iterator[InternalRow]) = {
-
-    val m = scala.collection.mutable.Map[Long, PartitionedFile => 
Iterator[InternalRow]]()
-
-    val recordKeyRelatedFilters = getRecordKeyRelatedFilters(filters, 
tableState.recordKeyField)
-    val baseFileReader = super.buildReaderWithPartitionValues(sparkSession, 
dataSchema, partitionSchema, requiredSchema,
-      filters ++ requiredFilters, options, new Configuration(hadoopConf))
-    m.put(generateKey(dataSchema, requiredSchema), baseFileReader)
-
-    // File reader for reading a Hoodie base file that needs to be merged with 
log files
-    // Add support for reading files using inline file system.
-    val appliedRequiredSchema: StructType = getAppliedRequiredSchema(
-      requiredSchemaWithMandatory, shouldUseRecordPosition, 
ROW_INDEX_TEMPORARY_COLUMN_NAME)
-    val appliedFilters = getAppliedFilters(
-      requiredFilters, recordKeyRelatedFilters, shouldUseRecordPosition)
-    val preMergeBaseFileReader = super.buildReaderWithPartitionValues(
-      sparkSession,
-      dataSchema,
-      StructType(Nil),
-      appliedRequiredSchema,
-      appliedFilters,
-      options,
-      new Configuration(hadoopConf))
-    m.put(generateKey(dataSchema, appliedRequiredSchema), 
preMergeBaseFileReader)
-
-    val cdcFileReader = super.buildReaderWithPartitionValues(
-      sparkSession,
-      tableSchema.structTypeSchema,
-      StructType(Nil),
-      tableSchema.structTypeSchema,
-      Nil,
-      options + (OPTION_RETURNING_BATCH -> super.supportBatch(sparkSession, 
tableSchema.structTypeSchema).toString),
-      new Configuration(hadoopConf))
-
-    //Rules for appending partitions and filtering in the bootstrap readers:
-    // 1. if it is mor, we don't want to filter data or append partitions
-    // 2. if we need to merge the bootstrap base and skeleton files then we 
cannot filter
-    // 3. if we need to merge the bootstrap base and skeleton files then we 
should never append partitions to the
-    //    skeleton reader
-    val needMetaCols = requiredMeta.nonEmpty
-    val needDataCols = requiredWithoutMeta.nonEmpty
-
-    //file reader for bootstrap skeleton files
-    if (needMetaCols && isBootstrap) {
-      val key = generateKey(HoodieSparkUtils.getMetaSchema, requiredMeta)
-      if (needDataCols || isMOR) {
-        // no filter and no append
-        m.put(key, super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
-          requiredMeta, Seq.empty, options, new Configuration(hadoopConf)))
-      } else {
-        // filter
-        m.put(key, super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
-          requiredMeta, filters ++ requiredFilters, options, new 
Configuration(hadoopConf)))
-      }
-
-      val requestedMeta = StructType(requiredSchema.fields.filter(sf => 
isMetaField(sf.name)))
-      m.put(generateKey(HoodieSparkUtils.getMetaSchema, requestedMeta),
-        super.buildReaderWithPartitionValues(sparkSession, 
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty), requestedMeta,
-          Seq.empty, options, new Configuration(hadoopConf)))
-    }
-
-    //file reader for bootstrap base files
-    if (needDataCols && isBootstrap) {
-      val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf => 
isMetaField(sf.name)))
-      val key = generateKey(dataSchemaWithoutMeta, requiredWithoutMeta)
-      if (isMOR || needMetaCols) {
-        m.put(key, super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
-          Seq.empty, options, new Configuration(hadoopConf)))
-        // no filter and no append
-
-      } else {
-        // filter
-        m.put(key, super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
-          filters ++ requiredFilters, options, new Configuration(hadoopConf)))
-      }
-
-      val requestedWithoutMeta = StructType(requiredSchema.fields.filterNot(sf 
=> isMetaField(sf.name)))
-      m.put(generateKey(dataSchemaWithoutMeta, requestedWithoutMeta),
-        super.buildReaderWithPartitionValues(sparkSession, 
dataSchemaWithoutMeta, StructType(Seq.empty), requestedWithoutMeta,
-          Seq.empty, options, new Configuration(hadoopConf)))
-    }
-
-    (baseFileReader, preMergeBaseFileReader, m, cdcFileReader)
-  }
-
-  protected def generateKey(dataSchema: StructType, requestedSchema: 
StructType): Long = {
-    AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName).hashCode() + 
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, 
sanitizedTableName).hashCode()
-  }
-}
-
-object HoodieFileGroupReaderBasedParquetFileFormat {
-  // From "ParquetFileFormat.scala": The names of the field for record 
position.
-  private val ROW_INDEX = "row_index"
-  private val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
-
-  // From "namedExpressions.scala": Used to construct to record position field 
metadata.
-  private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = 
"__file_source_generated_metadata_col"
-  private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
-  private val METADATA_COL_ATTR_KEY = "__metadata_col"
-
-  /**
-   * A required option (since Spark 3.3.2) to pass to 
buildReaderWithPartitionValues to return columnar batch output or not.
-   * For ParquetFileFormat and OrcFileFormat, passing this option is required.
-   * This should only be passed as true if it can actually be supported, which 
can be checked
-   * by calling supportBatch.
-   */
-  private val OPTION_RETURNING_BATCH = "returning_batch"

Review Comment:
   We need this for Spark 3.3+. See 
https://github.com/apache/spark/blob/560c08332b35941260169124b4f522bdc82b84d8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala#L63-L68



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala:
##########
@@ -69,81 +64,53 @@ class TestSpark35RecordPositionMetadataColumn extends 
SparkClientFunctionalTestH
 
   @Test
   def testRecordPositionColumn(): Unit = {
-    val _spark = spark
-    // Prepare the schema
-    val dataSchema = new StructType(
-      Array(
-        StructField("userid", IntegerType, nullable = false),
-        StructField("country", StringType, nullable = false),
-        StructField("ts", StringType, nullable = false)
+    if (HoodieSparkUtils.gteqSpark3_5) {

Review Comment:
   I commented above to add a note on why recod positions can only be used for 
Spark 3.5 onwards. It's important for devs to have this context.



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java:
##########
@@ -70,6 +73,75 @@ public static Schema convert(InternalSchema internalSchema, 
String name) {
     return buildAvroSchemaFromInternalSchema(internalSchema, name);
   }
 
+  public static InternalSchema pruneAvroSchemaToInternalSchema(Schema schema, 
InternalSchema originSchema) {

Review Comment:
   Let's add a UT for this method separately.



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java:
##########
@@ -70,6 +73,75 @@ public static Schema convert(InternalSchema internalSchema, 
String name) {
     return buildAvroSchemaFromInternalSchema(internalSchema, name);
   }
 
+  public static InternalSchema pruneAvroSchemaToInternalSchema(Schema schema, 
InternalSchema originSchema) {
+    List<String> pruneNames = collectColNamesFromSchema(schema);
+    return InternalSchemaUtils.pruneInternalSchema(originSchema, pruneNames);
+  }
+
+  /**
+   * Collect all the leaf nodes names.
+   *
+   * @param schema a avro schema.
+   * @return leaf nodes full names.
+   */
+  private static List<String> collectColNamesFromSchema(Schema schema) {

Review Comment:
   does it handle null types recursively?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to