codope commented on code in PR #10137:
URL: https://github.com/apache/hudi/pull/10137#discussion_r1407078231


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -449,7 +449,8 @@ private static void validateRow(InternalRow data, 
StructType schema) {
         data instanceof HoodieInternalRow
             || data instanceof GenericInternalRow
             || data instanceof SpecificInternalRow
-            || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+            || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data))
+            || data instanceof JoinedRow;

Review Comment:
   when is the data as `JoinedRow`?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +85,167 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext, 
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger, 
props)
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
+    this.baseFileIterator = makeBaseFileIterator();
     scanLogFiles();
     recordBuffer.setBaseFileIterator(baseFileIterator);
   }
 
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    //MergeOnReadSnapshotRelation.isProjectionCompatible. Should centralize 
the logic
+    if 
(!hoodieTableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
 {
+      return dataSchema;
+    }
+
+    List<Schema.Field> addedFields = new ArrayList<>();
+    for (String field : 
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+      if (requestedSchema.getField(field) == null) {
+        Schema.Field foundField  = dataSchema.getField(field);
+        if (foundField == null) {
+          throw new IllegalArgumentException("Field: " + field + " does not 
exist in the table schema");
+        }
+        addedFields.add(new Schema.Field(foundField.name(), 
foundField.schema(), foundField.doc(), foundField.defaultVal()));
+      }
+    }
+
+    if (addedFields.isEmpty()) {
+      return maybeReorderForBootstrap(requestedSchema);
+    }
+
+    return 
maybeReorderForBootstrap(AvroSchemaUtils.appendFieldsToSchema(requestedSchema, 
addedFields));
+  }
+
+  private Schema maybeReorderForBootstrap(Schema input) {
+    if (this.hoodieBaseFileOption.isPresent() && 
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+      Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
getDataAndMetaCols(input);
+      if (!(requiredFields.getLeft().isEmpty() || 
requiredFields.getRight().isEmpty())) {
+        return 
createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(), 
requiredFields.getRight().stream())
+            .collect(Collectors.toList()));
+      }
+    }
+    return input;
+  }
+
+  private static Pair<List<Schema.Field>,List<Schema.Field>> 
getDataAndMetaCols(Schema schema) {

Review Comment:
   Would prefer to have this method in some util class and tested.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieSimpleFileGroupRecordBuffer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class HoodieSimpleFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupRecordBuffer<T> {

Review Comment:
   add javadoc



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -41,10 +40,15 @@ import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
 import scala.annotation.tailrec
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.jdk.CollectionConverters.asScalaIteratorConverter
 
+trait HoodieFormatTrait {
+
+  //Used so that the planner only projects once and does not stack overflow

Review Comment:
   ```suggestion
     // Used so that the planner only projects once and does not stack overflow
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -67,8 +71,8 @@
  */
 public final class HoodieFileGroupReader<T> implements Closeable {
   private final HoodieReaderContext<T> readerContext;
-  private final Option<HoodieBaseFile> baseFilePath;
-  private final Option<List<String>> logFilePathList;
+  private final Option<HoodieBaseFile> hoodieBaseFileOption;
+  private final List<HoodieLogFile> logFiles;

Review Comment:
   Why remove `Option`?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -122,6 +125,23 @@ default boolean shouldFlush(HoodieRecord record, Schema 
schema, TypedProperties
     return true;
   }
 
+  default String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg) {
+    ArrayList<String> requiredFields = new ArrayList<>();
+    if (cfg.populateMetaFields()) {
+      requiredFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    } else {
+      cfg.getRecordKeyFieldStream().forEach(requiredFields::add);
+    }
+    String preCombine = cfg.getPreCombineField();
+
+    //maybe throw exception otherwise

Review Comment:
   Should this be left to the caller? There are valid code paths with optional 
precombine field.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java:
##########
@@ -147,6 +148,10 @@ public String getFileExtension() {
     return fileExtension;
   }
 
+  public boolean isCDC() {

Review Comment:
   Should we make it a static method in `FSUtils`?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -62,7 +63,7 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: 
Option[Partitioned
                                      requiredSchema: Schema,
                                      conf: Configuration): 
ClosableIterator[InternalRow] = {
     val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
-      .createPartitionedFile(partitionValues, filePath, start, length)
+      .createPartitionedFile(InternalRow.empty, filePath, start, length)

Review Comment:
   Why pass empty? I think `createPartitionedFile` needs the partition values, 
isn't it?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -540,6 +541,15 @@ public Option<String[]> getRecordKeyFields() {
     }
   }
 
+  public Stream<String> getRecordKeyFieldStream() {

Review Comment:
   How about reusing the existing `getRecordKeyFields` method? and then use 
stream at the call site.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +85,167 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext, 
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger, 
props)
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
+    this.baseFileIterator = makeBaseFileIterator();
     scanLogFiles();
     recordBuffer.setBaseFileIterator(baseFileIterator);
   }
 
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    //MergeOnReadSnapshotRelation.isProjectionCompatible. Should centralize 
the logic
+    if 
(!hoodieTableConfig.getPayloadClass().equals(OverwriteWithLatestAvroPayload.class.getName()))
 {
+      return dataSchema;
+    }
+
+    List<Schema.Field> addedFields = new ArrayList<>();
+    for (String field : 
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+      if (requestedSchema.getField(field) == null) {
+        Schema.Field foundField  = dataSchema.getField(field);
+        if (foundField == null) {
+          throw new IllegalArgumentException("Field: " + field + " does not 
exist in the table schema");
+        }
+        addedFields.add(new Schema.Field(foundField.name(), 
foundField.schema(), foundField.doc(), foundField.defaultVal()));
+      }
+    }
+
+    if (addedFields.isEmpty()) {
+      return maybeReorderForBootstrap(requestedSchema);
+    }
+
+    return 
maybeReorderForBootstrap(AvroSchemaUtils.appendFieldsToSchema(requestedSchema, 
addedFields));
+  }
+
+  private Schema maybeReorderForBootstrap(Schema input) {
+    if (this.hoodieBaseFileOption.isPresent() && 
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+      Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
getDataAndMetaCols(input);
+      if (!(requiredFields.getLeft().isEmpty() || 
requiredFields.getRight().isEmpty())) {
+        return 
createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(), 
requiredFields.getRight().stream())
+            .collect(Collectors.toList()));
+      }
+    }
+    return input;
+  }
+
+  private static Pair<List<Schema.Field>,List<Schema.Field>> 
getDataAndMetaCols(Schema schema) {
+    Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
+        .collect(Collectors.partitioningBy(f -> 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
+    return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
+        fieldsByMeta.getOrDefault(false, Collections.emptyList()));
+  }
+
+  private Schema createSchemaFromFields(List<Schema.Field> fields) {

Review Comment:
   we need to test the path wherever these methods are exercised.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +85,167 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext, 
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger, 
props)
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
+    this.baseFileIterator = makeBaseFileIterator();
     scanLogFiles();
     recordBuffer.setBaseFileIterator(baseFileIterator);
   }
 
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    //MergeOnReadSnapshotRelation.isProjectionCompatible. Should centralize 
the logic

Review Comment:
   Please complete the TODOs here or remove it if it is no lponger necessary.



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -158,7 +156,7 @@ private void 
validateOutputFromFileGroupReader(Configuration hadoopConf,
     SyncableFileSystemView fsView = viewManager.getFileSystemView(metaClient);
     FileSlice fileSlice = 
fsView.getAllFileSlices(partitionPaths[0]).findFirst().get();
     List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
-    Collections.sort(logFilePathList);
+    //Collections.sort(logFilePathList);

Review Comment:
   Why removing the sorting?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -91,73 +95,69 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
     spark.conf.set("spark.sql.parquet.enableVectorizedReader", 
supportBatchResult)
     val requiredSchemaWithMandatory = 
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema, partitionSchema)
+    val isCount = requiredSchemaWithMandatory.isEmpty
     val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f 
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
     val requiredMeta = StructType(requiredSchemaSplits._1)
     val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
     val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
-    val (baseFileReader, preMergeBaseFileReader, _, _) = buildFileReaders(
+    val (baseFileReader, preMergeBaseFileReader, readerMaps) = 
buildFileReaders(
       spark, dataSchema, partitionSchema, if (isIncremental) 
requiredSchemaWithMandatory else requiredSchema,
       filters, options, augmentedHadoopConf, requiredSchemaWithMandatory, 
requiredWithoutMeta, requiredMeta)
+
+    val requestedAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, 
sanitizedTableName)
+    val dataAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName)
+
     val broadcastedHadoopConf = spark.sparkContext.broadcast(new 
SerializableConfiguration(augmentedHadoopConf))
+    val broadcastedDataSchema =  spark.sparkContext.broadcast(dataAvroSchema)
+    val broadcastedRequestedSchema =  
spark.sparkContext.broadcast(requestedAvroSchema)
     val props: TypedProperties = HoodieFileIndex.getConfigProperties(spark, 
options)
 
     (file: PartitionedFile) => {
       file.partitionValues match {
         // Snapshot or incremental queries.
         case fileSliceMapping: HoodiePartitionFileSliceMapping =>
           val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
-          if (FSUtils.isLogFile(filePath)) {
-            val partitionValues = fileSliceMapping.getPartitionValues
-            val fileSlice = 
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
-            buildFileGroupIterator(
-              Option.empty[PartitionedFile => Iterator[InternalRow]],
-              partitionValues,
-              Option.empty[HoodieBaseFile],
-              getLogFilesFromSlice(fileSlice),
-              requiredSchemaWithMandatory,
-              outputSchema,
-              partitionSchema,
-              broadcastedHadoopConf.value.value,
-              -1,
-              -1,
-              shouldUseRecordPosition
-            )
+          val filegroupName = if (FSUtils.isLogFile(filePath)) {
+            FSUtils.getFileId(filePath.getName).substring(1)
           } else {
-            fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName)) 
match {
-              case Some(fileSlice) =>
+            FSUtils.getFileId(filePath.getName)
+          }
+          fileSliceMapping.getSlice(filegroupName) match {
+            case Some(fileSlice) if !isCount =>
+              if (requiredSchema.isEmpty && 
!fileSlice.getLogFiles.findAny().isPresent) {
                 val hoodieBaseFile = fileSlice.getBaseFile.get()
-                val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
-                val partitionValues = fileSliceMapping.getPartitionValues
-                val logFiles = getLogFilesFromSlice(fileSlice)
-                if (requiredSchemaWithMandatory.isEmpty) {
-                  val baseFile = createPartitionedFile(partitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
-                  baseFileReader(baseFile)
-                } else if (bootstrapFileOpt.isPresent) {
-                  // TODO: Use FileGroupReader here: HUDI-6942.
-                  throw new NotImplementedError("Not support reading bootstrap 
file")
-                } else {
-                  if (logFiles.isEmpty) {
-                    throw new IllegalStateException(
-                      "should not be here since file slice should not have 
been broadcasted "
-                        + "since it has no log or data files")
-                  }
-                  buildFileGroupIterator(
-                    Option(preMergeBaseFileReader),
-                    partitionValues,
-                    Option(hoodieBaseFile),
-                    logFiles,
-                    requiredSchemaWithMandatory,
-                    outputSchema,
-                    partitionSchema,
-                    broadcastedHadoopConf.value.value,
-                    0,
-                    hoodieBaseFile.getFileLen,
-                    shouldUseRecordPosition
-                  )
-                }
-              // TODO: Use FileGroupReader here: HUDI-6942.
-              case _ => baseFileReader(file)
-            }
+                
baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen))
+              } else {
+                val readerContext: HoodieReaderContext[InternalRow] = new 
SparkFileFormatInternalRowReaderContext(
+                  readerMaps)
+                val serializedHadoopConf = broadcastedHadoopConf.value.value
+                val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
+                  
.builder().setConf(serializedHadoopConf).setBasePath(tableState.tablePath).build
+                val reader = new HoodieFileGroupReader[InternalRow](
+                  readerContext,
+                  serializedHadoopConf,
+                  tableState.tablePath,
+                  tableState.latestCommitTimestamp.get,

Review Comment:
   i hope the latestCommitTimestamp is not empty/null. Have we considered that?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieSimpleFileGroupRecordBuffer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class HoodieSimpleFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupRecordBuffer<T> {
+  public HoodieSimpleFileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext, Schema readerSchema, Schema baseFileSchema,
+                                           Option<String> 
partitionNameOverrideOpt, Option<String[]> partitionPathFieldOpt, 
HoodieRecordMerger recordMerger,
+                                           TypedProperties payloadProps) {
+    super(readerContext, readerSchema, baseFileSchema, 
partitionNameOverrideOpt, partitionPathFieldOpt, recordMerger, payloadProps);
+  }
+
+  @Override
+  protected boolean doHasNext() throws IOException {
+    if (baseFileIterator.hasNext()) {
+      nextRecord = readerContext.seal(baseFileIterator.next());
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public BufferType getBufferType() {
+    return BufferType.SIMPLE;
+  }
+
+  @Override
+  public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> 
keySpecOpt) throws IOException {
+    throw new IllegalStateException("processDataBlock should never be called 
from HoodieSimpleFileGroupRecordBuffer");

Review Comment:
   should all these methods to process next data invoke super or throw an 
exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to