yihua commented on code in PR #10137:
URL: https://github.com/apache/hudi/pull/10137#discussion_r1411320238


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +86,170 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }

Review Comment:
   Can we use this constructor by just passing in the query type and meta 
client so it can automatically figure out the file slice to read for a file 
group reader?  Let's keep this constructor.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -99,4 +104,49 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned
     })
     deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
   }
+
+  override def mergeBootstrapReaders(skeletonFileIterator: 
ClosableIterator[InternalRow],
+                                     dataFileIterator: 
ClosableIterator[InternalRow]): ClosableIterator[InternalRow] = {
+    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
+      dataFileIterator.asInstanceOf[ClosableIterator[Any]])
+  }
+

Review Comment:
   remove redundant empty line.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +86,170 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext, 
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger, 
props)
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
+    this.baseFileIterator = makeBaseFileIterator();
     scanLogFiles();
     recordBuffer.setBaseFileIterator(baseFileIterator);
   }
 
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    if 
(!HoodiePayloadProps.isProjectionCompatible(hoodieTableConfig.getPayloadClass()))
 {
+      return dataSchema;
+    }
+
+    List<Schema.Field> addedFields = new ArrayList<>();
+    for (String field : 
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+      //need to match HoodieFileGroupReaderBasedParquetFileFormat for now
+      //if (!findNestedField(requestedSchema, field).isPresent()) {
+      if (requestedSchema.getField(field) == null) {
+        Option<Schema.Field> foundFieldOpt  = findNestedField(dataSchema, 
field);
+        if (!foundFieldOpt.isPresent()) {
+          throw new IllegalArgumentException("Field: " + field + " does not 
exist in the table schema");
+        }
+        Schema.Field foundField = foundFieldOpt.get();
+        addedFields.add(foundField);
+      }
+    }
+
+    if (addedFields.isEmpty()) {
+      return maybeReorderForBootstrap(requestedSchema);
+    }
+
+    //return 
maybeReorderForBootstrap(appendFieldsToSchemaDedupNested(requestedSchema, 
addedFields));

Review Comment:
   here too



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +86,170 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext, 
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger, 
props)

Review Comment:
   Let's avoid adding the new record buffer implementation and make exsiting 
record buffer work.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -122,6 +125,23 @@ default boolean shouldFlush(HoodieRecord record, Schema 
schema, TypedProperties
     return true;
   }
 
+  default String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg) {
+    ArrayList<String> requiredFields = new ArrayList<>();
+    if (cfg.populateMetaFields()) {
+      requiredFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    } else {
+      cfg.getRecordKeyFieldStream().forEach(requiredFields::add);
+    }
+    String preCombine = cfg.getPreCombineField();
+
+    //maybe throw exception otherwise

Review Comment:
   Could you remove the comment here? I think we need to fix the enforcement of 
preCombine in a different PR.  The reader does not have to fail here.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java:
##########
@@ -147,6 +148,10 @@ public String getFileExtension() {
     return fileExtension;
   }
 
+  public boolean isCDC() {

Review Comment:
   I think this is ok given this is part of the log format.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -540,6 +541,15 @@ public Option<String[]> getRecordKeyFields() {
     }
   }
 
+  public Stream<String> getRecordKeyFieldStream() {

Review Comment:
   Let's reuse the existing method?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -67,8 +71,8 @@
  */
 public final class HoodieFileGroupReader<T> implements Closeable {
   private final HoodieReaderContext<T> readerContext;
-  private final Option<HoodieBaseFile> baseFilePath;
-  private final Option<List<String>> logFilePathList;
+  private final Option<HoodieBaseFile> hoodieBaseFileOption;
+  private final List<HoodieLogFile> logFiles;

Review Comment:
   Should the file slice instance be stored directly instead of base and log 
files, given the file slice is passed in?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +86,170 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext, 
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger, 
props)
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
+    this.baseFileIterator = makeBaseFileIterator();
     scanLogFiles();
     recordBuffer.setBaseFileIterator(baseFileIterator);
   }
 
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    if 
(!HoodiePayloadProps.isProjectionCompatible(hoodieTableConfig.getPayloadClass()))
 {
+      return dataSchema;
+    }
+
+    List<Schema.Field> addedFields = new ArrayList<>();
+    for (String field : 
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+      //need to match HoodieFileGroupReaderBasedParquetFileFormat for now
+      //if (!findNestedField(requestedSchema, field).isPresent()) {

Review Comment:
   remove unused code



##########
hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java:
##########
@@ -229,4 +230,75 @@ public void testIsCompatibleProjectionAllowed(boolean 
shouldValidate) {
   public void testIsCompatiblePartitionDropCols(boolean shouldValidate) {
     AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, 
shouldValidate, false, Collections.singleton("c"));
   }
+
+  /* behavior is slightly modified currently
+  @Test
+  public void testAppendFieldsToSchemaDedupNested() {

Review Comment:
   JIRA to track the disabled test.



##########
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala:
##########
@@ -84,6 +86,19 @@ trait HoodieSpark3CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
 
 object HoodieSpark3CatalystPlanUtils extends SparkAdapterSupport {
 
+  def applyNewFileFormatChanges(scanOperation: LogicalPlan, logicalRelation: 
LogicalPlan, fs: HadoopFsRelation): LogicalPlan = {
+    val ff = fs.fileFormat.asInstanceOf[ParquetFileFormat with 
HoodieFormatTrait]
+    ff.isProjected = true
+    val tableSchema = 
fs.location.asInstanceOf[SparkHoodieTableFileIndex].schema
+    val resolvedSchema = logicalRelation.resolve(tableSchema, 
fs.sparkSession.sessionState.analyzer.resolver)
+    if (!fs.partitionSchema.fields.isEmpty && 
scanOperation.sameOutput(logicalRelation)) {
+      Project(resolvedSchema, scanOperation)
+    } else {
+      scanOperation
+    }
+  }
+
+

Review Comment:
   remote: redundant line



##########
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala:
##########
@@ -25,9 +25,9 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
Expression}
 import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.JoinType
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, 
LogicalPlan, MergeIntoTable, Project}
+import org.apache.spark.sql.catalyst.plans.logical._

Review Comment:
   nit: revert



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +86,170 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,

Review Comment:
   Keep only one of two as they serve the same purpose.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +86,170 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext, 
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger, 
props)
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
+    this.baseFileIterator = makeBaseFileIterator();
     scanLogFiles();
     recordBuffer.setBaseFileIterator(baseFileIterator);
   }
 
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    if 
(!HoodiePayloadProps.isProjectionCompatible(hoodieTableConfig.getPayloadClass()))
 {

Review Comment:
   This should be based on the merger strategy instead of payload type.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +86,170 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext, 
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger, 
props)
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
+    this.baseFileIterator = makeBaseFileIterator();
     scanLogFiles();
     recordBuffer.setBaseFileIterator(baseFileIterator);
   }
 
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    if 
(!HoodiePayloadProps.isProjectionCompatible(hoodieTableConfig.getPayloadClass()))
 {
+      return dataSchema;
+    }
+
+    List<Schema.Field> addedFields = new ArrayList<>();
+    for (String field : 
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+      //need to match HoodieFileGroupReaderBasedParquetFileFormat for now
+      //if (!findNestedField(requestedSchema, field).isPresent()) {
+      if (requestedSchema.getField(field) == null) {
+        Option<Schema.Field> foundFieldOpt  = findNestedField(dataSchema, 
field);
+        if (!foundFieldOpt.isPresent()) {
+          throw new IllegalArgumentException("Field: " + field + " does not 
exist in the table schema");
+        }
+        Schema.Field foundField = foundFieldOpt.get();
+        addedFields.add(foundField);
+      }
+    }
+
+    if (addedFields.isEmpty()) {
+      return maybeReorderForBootstrap(requestedSchema);
+    }
+
+    //return 
maybeReorderForBootstrap(appendFieldsToSchemaDedupNested(requestedSchema, 
addedFields));
+    return maybeReorderForBootstrap(appendFieldsToSchema(requestedSchema, 
addedFields));
+  }
+
+  private Schema maybeReorderForBootstrap(Schema input) {
+    if (this.hoodieBaseFileOption.isPresent() && 
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+      Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
getDataAndMetaCols(input);

Review Comment:
   Some code are not properly formatted.  Do you have checkstyle enabled in IDE?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +85,166 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;

Review Comment:
   This is the read schema which can be different from the write schema.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +85,166 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+

Review Comment:
   @jonvex could you add docs in the code on these schemas?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to