jonvex commented on code in PR #10137:
URL: https://github.com/apache/hudi/pull/10137#discussion_r1408240703


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,82 +85,167 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+  private final Schema requestedSchema;
+
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
     this.recordMerger = readerContext.getRecordMerger(
         getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? new HoodieSimpleFileGroupRecordBuffer<>(readerContext, 
requiredSchema, requiredSchema, Option.empty(), Option.empty(), recordMerger, 
props)
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
+    this.baseFileIterator = makeBaseFileIterator();
     scanLogFiles();
     recordBuffer.setBaseFileIterator(baseFileIterator);
   }
 
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    //MergeOnReadSnapshotRelation.isProjectionCompatible. Should centralize 
the logic

Review Comment:
   Completed the todo, but I think what we do with payloads seems like there is 
room for improvement. If you have better ideas lmk 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to