yihua commented on code in PR #10137:
URL: https://github.com/apache/hudi/pull/10137#discussion_r1419016929


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -123,4 +125,11 @@ private Object getFieldValueFromInternalRow(InternalRow 
row, Schema recordSchema
       return null;
     }
   }
+
+  @Override
+  public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to) {
+    UnsafeProjection projection = 
HoodieInternalRowUtils.generateUnsafeProjectionAlias(AvroConversionUtils.convertAvroSchemaToStructType(from),

Review Comment:
   I was referring to AvroConversionUtils.convertAvroSchemaToStructType which 
can be replaced by HoodieInternalRowUtils.getCachedSchema(schema) to avoid 
repeated parsing.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -77,14 +77,18 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: Option[Partitioned
           }
         }).asInstanceOf[ClosableIterator[InternalRow]]
     } else {
-      if (baseFileReader.isEmpty) {
-        throw new IllegalArgumentException("Base file reader is missing when 
instantiating "
-          + "SparkFileFormatInternalRowReaderContext.");
+      val key = schemaPairHashKey(dataSchema, requiredSchema)

Review Comment:
   ```suggestion
         Val schemaPairHashKey = generateSchemaPairHashKey(dataSchema, 
requiredSchema)
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +85,167 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext, 
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger, 
props)
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
+    this.baseFileIterator = makeBaseFileIterator();
     scanLogFiles();
     recordBuffer.setBaseFileIterator(baseFileIterator);
   }
 
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    //MergeOnReadSnapshotRelation.isProjectionCompatible. Should centralize 
the logic
+    if 
(!hoodieTableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
 {
+      return dataSchema;
+    }
+
+    List<Schema.Field> addedFields = new ArrayList<>();
+    for (String field : 
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+      if (requestedSchema.getField(field) == null) {
+        Schema.Field foundField  = dataSchema.getField(field);
+        if (foundField == null) {
+          throw new IllegalArgumentException("Field: " + field + " does not 
exist in the table schema");
+        }
+        addedFields.add(new Schema.Field(foundField.name(), 
foundField.schema(), foundField.doc(), foundField.defaultVal()));
+      }
+    }
+
+    if (addedFields.isEmpty()) {
+      return maybeReorderForBootstrap(requestedSchema);
+    }
+
+    return 
maybeReorderForBootstrap(AvroSchemaUtils.appendFieldsToSchema(requestedSchema, 
addedFields));
+  }
+
+  private Schema maybeReorderForBootstrap(Schema input) {
+    if (this.hoodieBaseFileOption.isPresent() && 
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+      Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
getDataAndMetaCols(input);
+      if (!(requiredFields.getLeft().isEmpty() || 
requiredFields.getRight().isEmpty())) {
+        return 
createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(), 
requiredFields.getRight().stream())
+            .collect(Collectors.toList()));
+      }
+    }
+    return input;
+  }
+
+  private static Pair<List<Schema.Field>,List<Schema.Field>> 
getDataAndMetaCols(Schema schema) {
+    Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
+        .collect(Collectors.partitioningBy(f -> 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
+    return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
+        fieldsByMeta.getOrDefault(false, Collections.emptyList()));
+  }
+
+  private Schema createSchemaFromFields(List<Schema.Field> fields) {

Review Comment:
   @jonvex could you follow up on this separately if not done in this PR?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,119 +84,213 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+
+  // requestedSchema: the schema that the caller requests
+  private final Schema requestedSchema;
+
+  // requiredSchema: the requestedSchema with any additional columns required 
for merging etc
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
-    this.recordMerger = readerContext.getRecordMerger(
-        getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
+    this.recordMerger = 
readerContext.getRecordMerger(tableConfig.getRecordMergerStrategy());
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? null
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
-    scanLogFiles();
-    recordBuffer.setBaseFileIterator(baseFileIterator);
+    ClosableIterator<T> iter = makeBaseFileIterator();
+    if (logFiles.isEmpty()) {
+      this.baseFileIterator = CachingIterator.wrap(iter, readerContext);
+    } else {
+      this.baseFileIterator = iter;
+      scanLogFiles();
+      recordBuffer.setBaseFileIterator(baseFileIterator);
+    }
+  }
+
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    List<Schema.Field> addedFields = new ArrayList<>();
+    for (String field : 
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+      if (requestedSchema.getField(field) == null) {
+        Option<Schema.Field> foundFieldOpt  = findNestedField(dataSchema, 
field);
+        if (!foundFieldOpt.isPresent()) {
+          throw new IllegalArgumentException("Field: " + field + " does not 
exist in the table schema");
+        }
+        Schema.Field foundField = foundFieldOpt.get();
+        addedFields.add(foundField);
+      }
+    }
+
+    if (addedFields.isEmpty()) {
+      return maybeReorderForBootstrap(requestedSchema);
+    }
+
+    return maybeReorderForBootstrap(appendFieldsToSchema(requestedSchema, 
addedFields));
+  }
+
+  private Schema maybeReorderForBootstrap(Schema input) {
+    if (this.hoodieBaseFileOption.isPresent() && 
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+      Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
getDataAndMetaCols(input);
+      if (!(requiredFields.getLeft().isEmpty() || 
requiredFields.getRight().isEmpty())) {
+        return 
createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(), 
requiredFields.getRight().stream())
+            .collect(Collectors.toList()));
+      }
+    }
+    return input;
+  }
+
+  private static Pair<List<Schema.Field>,List<Schema.Field>> 
getDataAndMetaCols(Schema schema) {
+    Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
+        .collect(Collectors.partitioningBy(f -> 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
+    return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
+        fieldsByMeta.getOrDefault(false, Collections.emptyList()));
+  }
+
+  private Schema createSchemaFromFields(List<Schema.Field> fields) {
+    //fields have positions set, so we need to remove them due to avro 
setFields implementation
+    for (int i = 0; i < fields.size(); i++) {
+      Schema.Field curr = fields.get(i);
+      fields.set(i, new Schema.Field(curr.name(), curr.schema(), curr.doc(), 
curr.defaultVal()));
+    }
+    Schema newSchema = Schema.createRecord(dataSchema.getName(), 
dataSchema.getDoc(), dataSchema.getNamespace(), dataSchema.isError());
+    newSchema.setFields(fields);
+    return newSchema;
+  }
+
+  private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile 
baseFile) {
+    BaseFile dataFile = baseFile.getBootstrapBaseFile().get();
+    Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
getDataAndMetaCols(requiredSchema);
+    Pair<List<Schema.Field>,List<Schema.Field>> allFields = 
getDataAndMetaCols(dataSchema);
+
+    Option<ClosableIterator<T>> dataFileIterator = 
requiredFields.getRight().isEmpty() ? Option.empty() :
+        
Option.of(readerContext.getFileRecordIterator(dataFile.getHadoopPath(), 0, 
dataFile.getFileLen(),
+            createSchemaFromFields(allFields.getRight()), 
createSchemaFromFields(requiredFields.getRight()), hadoopConf));
+
+    Option<ClosableIterator<T>> skeletonFileIterator = 
requiredFields.getLeft().isEmpty() ? Option.empty() :
+        
Option.of(readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 0, 
baseFile.getFileLen(),
+            createSchemaFromFields(allFields.getLeft()), 
createSchemaFromFields(requiredFields.getLeft()), hadoopConf));
+    if (!dataFileIterator.isPresent() && !skeletonFileIterator.isPresent()) {
+      throw new IllegalStateException("should not be here if only partition 
cols are required");
+    } else if (!dataFileIterator.isPresent()) {
+      return skeletonFileIterator.get();
+    } else if (!skeletonFileIterator.isPresent()) {
+      return  dataFileIterator.get();
+    } else {
+      return readerContext.mergeBootstrapReaders(skeletonFileIterator.get(), 
dataFileIterator.get());
+    }
   }
 
   /**
    * @return {@code true} if the next record exists; {@code false} otherwise.
    * @throws IOException on reader error.
    */
   public boolean hasNext() throws IOException {
-    return recordBuffer.hasNext();
+    if (recordBuffer == null) {
+      return baseFileIterator.hasNext();

Review Comment:
   We should revisit whether this can be directly put into the record buffer.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +86,170 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext, 
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger, 
props)
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
+    this.baseFileIterator = makeBaseFileIterator();
     scanLogFiles();
     recordBuffer.setBaseFileIterator(baseFileIterator);
   }
 
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    if 
(!HoodiePayloadProps.isProjectionCompatible(hoodieTableConfig.getPayloadClass()))
 {
+      return dataSchema;
+    }
+
+    List<Schema.Field> addedFields = new ArrayList<>();
+    for (String field : 
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+      //need to match HoodieFileGroupReaderBasedParquetFileFormat for now
+      //if (!findNestedField(requestedSchema, field).isPresent()) {
+      if (requestedSchema.getField(field) == null) {
+        Option<Schema.Field> foundFieldOpt  = findNestedField(dataSchema, 
field);
+        if (!foundFieldOpt.isPresent()) {
+          throw new IllegalArgumentException("Field: " + field + " does not 
exist in the table schema");
+        }
+        Schema.Field foundField = foundFieldOpt.get();
+        addedFields.add(foundField);
+      }
+    }
+
+    if (addedFields.isEmpty()) {
+      return maybeReorderForBootstrap(requestedSchema);
+    }
+
+    //return 
maybeReorderForBootstrap(appendFieldsToSchemaDedupNested(requestedSchema, 
addedFields));
+    return maybeReorderForBootstrap(appendFieldsToSchema(requestedSchema, 
addedFields));
+  }
+
+  private Schema maybeReorderForBootstrap(Schema input) {
+    if (this.hoodieBaseFileOption.isPresent() && 
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+      Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
getDataAndMetaCols(input);

Review Comment:
   @jonvex could you check this?



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -188,4 +189,8 @@ public Map<String, Object> 
updateSchemaAndResetOrderingValInMetadata(Map<String,
     meta.put(INTERNAL_META_SCHEMA, schema);
     return meta;
   }
+
+  public abstract ClosableIterator<T> 
mergeBootstrapReaders(ClosableIterator<T> skeletonFileIterator, 
ClosableIterator<T> dataFileIterator);
+
+  public abstract UnaryOperator<T> projectRecord(Schema from, Schema to);

Review Comment:
   docs here



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -62,7 +63,7 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: 
Option[Partitioned
                                      requiredSchema: Schema,
                                      conf: Configuration): 
ClosableIterator[InternalRow] = {
     val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
-      .createPartitionedFile(partitionValues, filePath, start, length)
+      .createPartitionedFile(InternalRow.empty, filePath, start, length)

Review Comment:
   @jonvex nit: add a comment/doc in the code to explain why the partition 
value is empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to