jonvex commented on code in PR #10137:
URL: https://github.com/apache/hudi/pull/10137#discussion_r1419433198


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -81,119 +84,213 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private HoodieRecordMerger recordMerger;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieTableMetaClient metaClient,
-                               String fileGroupId,
-                               TypedProperties props,
-                               HoodieTimeline timeline,
-                               HoodieTableQueryType queryType,
-                               Option<String> instantTime,
-                               Option<String> startInstantTime,
-                               boolean shouldUseRecordPosition) throws 
Exception {
-    // This constructor is a placeholder now to allow automatically fetching 
the correct list of
-    // base and log files for a file group.
-    // Derive base and log files and call the corresponding constructor.
-    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
-        instantTime.get(), Option.empty(), Option.empty(),
-        new TableSchemaResolver(metaClient).getTableAvroSchema(),
-        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
-  }
+  private final Schema dataSchema;
+
+  // requestedSchema: the schema that the caller requests
+  private final Schema requestedSchema;
+
+  // requiredSchema: the requestedSchema with any additional columns required 
for merging etc
+  private final Schema requiredSchema;
+
+  private final HoodieTableConfig hoodieTableConfig;
+
+  private final Option<UnaryOperator<T>> outputConverter;
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
                                Configuration hadoopConf,
                                String tablePath,
                                String latestCommitTime,
-                               Option<HoodieBaseFile> baseFilePath,
-                               Option<List<String>> logFilePathList,
-                               Schema avroSchema,
+                               FileSlice fileSlice,
+                               Schema dataSchema,
+                               Schema requestedSchema,
                                TypedProperties props,
+                               HoodieTableConfig tableConfig,
                                long start,
                                long length,
                                boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
-    this.baseFilePath = baseFilePath;
-    this.logFilePathList = logFilePathList;
+    this.hoodieBaseFileOption = fileSlice.getBaseFile();
+    this.logFiles = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
     this.props = props;
     this.start = start;
     this.length = length;
-    this.recordMerger = readerContext.getRecordMerger(
-        getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue()));
+    this.recordMerger = 
readerContext.getRecordMerger(tableConfig.getRecordMergerStrategy());
     this.readerState.tablePath = tablePath;
     this.readerState.latestCommitTime = latestCommitTime;
-    this.readerState.baseFileAvroSchema = avroSchema;
-    this.readerState.logRecordAvroSchema = avroSchema;
+    this.dataSchema = dataSchema;
+    this.requestedSchema = requestedSchema;
+    this.hoodieTableConfig = tableConfig;
+    this.requiredSchema = generateRequiredSchema();
+    if (!requestedSchema.equals(requiredSchema)) {
+      this.outputConverter = 
Option.of(readerContext.projectRecord(requiredSchema, requestedSchema));
+    } else {
+      this.outputConverter = Option.empty();
+    }
+    this.readerState.baseFileAvroSchema = requiredSchema;
+    this.readerState.logRecordAvroSchema = requiredSchema;
     this.readerState.mergeProps.putAll(props);
-    String filePath = baseFilePath.isPresent()
-        ? baseFilePath.get().getPath()
-        : logFilePathList.get().get(0);
-    String partitionPath = FSUtils.getRelativePartitionPath(
-        new Path(tablePath), new Path(filePath).getParent());
-    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
-        ? Option.empty() : Option.of(partitionPath);
-    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
-    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
-        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
-        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
-        : Option.empty();
-    this.recordBuffer = shouldUseRecordPosition
+    this.recordBuffer = this.logFiles.isEmpty()
+        ? null
+        : shouldUseRecordPosition
         ? new HoodiePositionBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props)
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
+
+
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
   public void initRecordIterators() {
-    this.baseFileIterator = baseFilePath.isPresent()
-        ? readerContext.getFileRecordIterator(
-            baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
-        : new EmptyIterator<>();
-    scanLogFiles();
-    recordBuffer.setBaseFileIterator(baseFileIterator);
+    ClosableIterator<T> iter = makeBaseFileIterator();
+    if (logFiles.isEmpty()) {
+      this.baseFileIterator = CachingIterator.wrap(iter, readerContext);
+    } else {
+      this.baseFileIterator = iter;
+      scanLogFiles();
+      recordBuffer.setBaseFileIterator(baseFileIterator);
+    }
+  }
+
+  private ClosableIterator<T> makeBaseFileIterator() {
+    if (!hoodieBaseFileOption.isPresent()) {
+      return new EmptyIterator<>();
+    }
+
+    HoodieBaseFile baseFile = hoodieBaseFileOption.get();
+    if (baseFile.getBootstrapBaseFile().isPresent()) {
+      return makeBootstrapBaseFileIterator(baseFile);
+    }
+
+    return readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 
start, length,
+         dataSchema, requiredSchema, hadoopConf);
+  }
+
+  private Schema generateRequiredSchema() {
+    //might need to change this if other queries than mor have mandatory fields
+    if (logFiles.isEmpty()) {
+      return requestedSchema;
+    }
+
+    List<Schema.Field> addedFields = new ArrayList<>();
+    for (String field : 
recordMerger.getMandatoryFieldsForMerging(hoodieTableConfig)) {
+      if (requestedSchema.getField(field) == null) {
+        Option<Schema.Field> foundFieldOpt  = findNestedField(dataSchema, 
field);
+        if (!foundFieldOpt.isPresent()) {
+          throw new IllegalArgumentException("Field: " + field + " does not 
exist in the table schema");
+        }
+        Schema.Field foundField = foundFieldOpt.get();
+        addedFields.add(foundField);
+      }
+    }
+
+    if (addedFields.isEmpty()) {
+      return maybeReorderForBootstrap(requestedSchema);
+    }
+
+    return maybeReorderForBootstrap(appendFieldsToSchema(requestedSchema, 
addedFields));
+  }
+
+  private Schema maybeReorderForBootstrap(Schema input) {
+    if (this.hoodieBaseFileOption.isPresent() && 
this.hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()) {
+      Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
getDataAndMetaCols(input);
+      if (!(requiredFields.getLeft().isEmpty() || 
requiredFields.getRight().isEmpty())) {
+        return 
createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(), 
requiredFields.getRight().stream())
+            .collect(Collectors.toList()));
+      }
+    }
+    return input;
+  }
+
+  private static Pair<List<Schema.Field>,List<Schema.Field>> 
getDataAndMetaCols(Schema schema) {
+    Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
+        .collect(Collectors.partitioningBy(f -> 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
+    return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
+        fieldsByMeta.getOrDefault(false, Collections.emptyList()));
+  }
+
+  private Schema createSchemaFromFields(List<Schema.Field> fields) {
+    //fields have positions set, so we need to remove them due to avro 
setFields implementation
+    for (int i = 0; i < fields.size(); i++) {
+      Schema.Field curr = fields.get(i);
+      fields.set(i, new Schema.Field(curr.name(), curr.schema(), curr.doc(), 
curr.defaultVal()));
+    }
+    Schema newSchema = Schema.createRecord(dataSchema.getName(), 
dataSchema.getDoc(), dataSchema.getNamespace(), dataSchema.isError());
+    newSchema.setFields(fields);
+    return newSchema;
+  }
+
+  private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile 
baseFile) {
+    BaseFile dataFile = baseFile.getBootstrapBaseFile().get();
+    Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
getDataAndMetaCols(requiredSchema);
+    Pair<List<Schema.Field>,List<Schema.Field>> allFields = 
getDataAndMetaCols(dataSchema);
+
+    Option<ClosableIterator<T>> dataFileIterator = 
requiredFields.getRight().isEmpty() ? Option.empty() :
+        
Option.of(readerContext.getFileRecordIterator(dataFile.getHadoopPath(), 0, 
dataFile.getFileLen(),
+            createSchemaFromFields(allFields.getRight()), 
createSchemaFromFields(requiredFields.getRight()), hadoopConf));
+
+    Option<ClosableIterator<T>> skeletonFileIterator = 
requiredFields.getLeft().isEmpty() ? Option.empty() :
+        
Option.of(readerContext.getFileRecordIterator(baseFile.getHadoopPath(), 0, 
baseFile.getFileLen(),
+            createSchemaFromFields(allFields.getLeft()), 
createSchemaFromFields(requiredFields.getLeft()), hadoopConf));
+    if (!dataFileIterator.isPresent() && !skeletonFileIterator.isPresent()) {
+      throw new IllegalStateException("should not be here if only partition 
cols are required");
+    } else if (!dataFileIterator.isPresent()) {
+      return skeletonFileIterator.get();
+    } else if (!skeletonFileIterator.isPresent()) {
+      return  dataFileIterator.get();
+    } else {
+      return readerContext.mergeBootstrapReaders(skeletonFileIterator.get(), 
dataFileIterator.get());
+    }
   }
 
   /**
    * @return {@code true} if the next record exists; {@code false} otherwise.
    * @throws IOException on reader error.
    */
   public boolean hasNext() throws IOException {
-    return recordBuffer.hasNext();
+    if (recordBuffer == null) {
+      return baseFileIterator.hasNext();

Review Comment:
   https://issues.apache.org/jira/browse/HUDI-7193



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to