linliu-code commented on code in PR #13300:
URL: https://github.com/apache/hudi/pull/13300#discussion_r2114196157


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -292,36 +319,49 @@ private static ArrayList<ArrayList<String>> 
partitionKeysByFileSlices(List<Strin
     return partitionedKeys;
   }
 
-  /**
-   * Lookup list of keys from a single file slice.
-   *
-   * @param partitionName Name of the partition
-   * @param keys          The list of keys to lookup
-   * @param fileSlice     The file slice to read
-   * @return A {@code Map} of key name to {@code HoodieRecord} for the keys 
which were found in the file slice
-   */
-  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeysFromFileSlice(String partitionName, List<String> keys, FileSlice 
fileSlice) {
-    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
getOrCreateReaders(partitionName, fileSlice);
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeysWithFileGroupReader(String partitionName,
+                                                                               
          List<String> keys,
+                                                                               
          FileSlice fileSlice) {
     try {
-      HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-      HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
-      if (baseFileReader == null && logRecordScanner == null) {
-        return Collections.emptyMap();
-      }
-
       // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
       List<String> sortedKeys = new ArrayList<>(keys);
+      // So we use the natural order to sort.
       Collections.sort(sortedKeys);
-      boolean fullKeys = true;
-      List<Long> timings = new ArrayList<>(1);
-      Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = 
readLogRecords(logRecordScanner, sortedKeys, fullKeys, timings);
-      return readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeys, 
fullKeys, logRecords, timings, partitionName);
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error merging records from metadata table 
for  " + keys.size() + " key : ", ioe);
-    } finally {
-      if (!reuse) {
-        closeReader(readers);
+      Option<HoodieInstant> latestMetadataInstant =
+          
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+      String latestMetadataInstantTime =
+          
latestMetadataInstant.map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
+      Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+      // Only those log files which have a corresponding completed instant on 
the dataset should be read
+      // This is because the metadata table is updated before the dataset 
instants are committed.
+      Set<String> validInstantTimestamps = getValidInstantTimestamps();
+      InstantRange instantRange = InstantRange.builder()
+          .rangeType(InstantRange.RangeType.EXACT_MATCH)
+          .explicitInstants(validInstantTimestamps).build();
+
+      HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
getFileGroupReader(

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to