yihua commented on code in PR #13300:
URL: https://github.com/apache/hudi/pull/13300#discussion_r2201410390


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -292,180 +325,58 @@ private static ArrayList<ArrayList<String>> 
partitionKeysByFileSlices(List<Strin
     return partitionedKeys;
   }
 
-  /**
-   * Lookup list of keys from a single file slice.
-   *
-   * @param partitionName Name of the partition
-   * @param keys          The list of keys to lookup
-   * @param fileSlice     The file slice to read
-   * @return A {@code Map} of key name to {@code HoodieRecord} for the keys 
which were found in the file slice
-   */
-  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeysFromFileSlice(String partitionName, List<String> keys, FileSlice 
fileSlice) {
-    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
getOrCreateReaders(partitionName, fileSlice);
-    try {
-      HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-      HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
-      if (baseFileReader == null && logRecordScanner == null) {
-        return Collections.emptyMap();
-      }
-
-      // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
-      List<String> sortedKeys = new ArrayList<>(keys);
-      Collections.sort(sortedKeys);
-      boolean fullKeys = true;
-      List<Long> timings = new ArrayList<>(1);
-      Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = 
readLogRecords(logRecordScanner, sortedKeys, fullKeys, timings);
-      return readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeys, 
fullKeys, logRecords, timings, partitionName);
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error merging records from metadata table 
for  " + keys.size() + " key : ", ioe);
-    } finally {
-      if (!reuse) {
-        closeReader(readers);
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> lookupKeys(String 
partitionName,
+                                                                      
List<String> keys,
+                                                                      
FileSlice fileSlice) {
+    // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
+    List<String> sortedKeys = new ArrayList<>(keys);
+    // So we use the natural order to sort.
+    Collections.sort(sortedKeys);
+    Option<HoodieInstant> latestMetadataInstant =
+        
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+    String latestMetadataInstantTime =
+        
latestMetadataInstant.map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+    // Only those log files which have a corresponding completed instant on 
the dataset should be read
+    // This is because the metadata table is updated before the dataset 
instants are committed.
+    Set<String> validInstantTimestamps = getValidInstantTimestamps();
+    InstantRange instantRange = InstantRange.builder()
+        .rangeType(InstantRange.RangeType.EXACT_MATCH)
+        .explicitInstants(validInstantTimestamps).build();
+    HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(
+        storageConf,
+        metadataMetaClient.getTableConfig(),
+        Option.of(instantRange),
+        Option.of(transformKeysToPredicate(sortedKeys)));
+    try (HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+        .withReaderContext(readerContext)
+        .withHoodieTableMetaClient(metadataMetaClient)
+        .withLatestCommitTime(latestMetadataInstantTime)
+        .withFileSlice(fileSlice)
+        .withDataSchema(schema)
+        .withRequestedSchema(schema)
+        .withProps(buildFileGroupReaderProperties(metadataConfig))
+        .withStart(0)
+        .withLength(Long.MAX_VALUE)
+        .withShouldUseRecordPosition(false)
+        .build();
+         ClosableIterator<IndexedRecord> it = 
fileGroupReader.getClosableIterator()) {
+      Map<String, HoodieRecord<HoodieMetadataPayload>> records = new 
HashMap<>();
+      while (it.hasNext()) {
+        GenericRecord metadataRecord = (GenericRecord) it.next();
+        HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
+        String rowKey = payload.key != null
+            ? payload.key : ((GenericRecord) 
metadataRecord).get(KEY_FIELD_NAME).toString();
+        HoodieKey hoodieKey = new HoodieKey(rowKey, partitionName);
+        records.put(rowKey, new HoodieAvroRecord<>(hoodieKey, payload));
       }
-    }
-  }
-
-  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
readLogRecords(HoodieMetadataLogRecordReader logRecordReader,
-                                                                          
List<String> sortedKeys,
-                                                                          
boolean fullKey,
-                                                                          
List<Long> timings) {
-    HoodieTimer timer = HoodieTimer.start();
-
-    if (logRecordReader == null) {
-      timings.add(timer.endTimer());
-      return Collections.emptyMap();
-    }
-
-    try {
-      return fullKey ? logRecordReader.getRecordsByKeys(sortedKeys) : 
logRecordReader.getRecordsByKeyPrefixes(sortedKeys);
-    } finally {
-      timings.add(timer.endTimer());
-    }
-  }
-
-  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
readFromBaseAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,
-                                                                               
               List<String> sortedKeys,
-                                                                               
               boolean fullKeys,
-                                                                               
               Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords,
-                                                                               
               List<Long> timings,
-                                                                               
               String partitionName) throws IOException {
-    HoodieTimer timer = HoodieTimer.start();
-
-    if (reader == null) {
-      // No base file at all
-      timings.add(timer.endTimer());
-      return logRecords;
-    }
-
-    HoodieTimer readTimer = HoodieTimer.start();
-
-    Map<String, HoodieRecord<HoodieMetadataPayload>> records =
-        fetchBaseFileRecordsByKeys(reader, sortedKeys, fullKeys, 
partitionName);
-
-    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
-
-    // Iterate over all provided log-records, merging them into existing 
records
-    logRecords.values().forEach(logRecord ->
-        records.merge(
-            logRecord.getRecordKey(),
-            logRecord,
-            (oldRecord, newRecord) -> {
-              HoodieMetadataPayload mergedPayload = 
newRecord.getData().preCombine(oldRecord.getData());
-              return mergedPayload.isDeleted() ? null : new 
HoodieAvroRecord<>(oldRecord.getKey(), mergedPayload);
-            }
-        ));
-
-    timings.add(timer.endTimer());
-    return records;
-  }
-
-  @SuppressWarnings("unchecked")
-  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
fetchBaseFileRecordsByKeys(HoodieSeekingFileReader reader,
-                                                                               
       List<String> sortedKeys,
-                                                                               
       boolean fullKeys,
-                                                                               
       String partitionName) throws IOException {
-    Map<String, HoodieRecord<HoodieMetadataPayload>> result;
-    try (ClosableIterator<HoodieRecord<?>> records = fullKeys
-        ? reader.getRecordsByKeysIterator(sortedKeys)
-        : reader.getRecordsByKeyPrefixIterator(sortedKeys)) {
-      result = toStream(records)
-          .map(record -> {
-            GenericRecord data = (GenericRecord) record.getData();
-            // populateMetaFields is hardcoded to false for the metadata table 
so key must be extracted from the `key` field
-            String recordKey = (String) 
data.get(HoodieMetadataPayload.KEY_FIELD_NAME);
-            return Pair.of(recordKey, composeRecord(data, recordKey, 
partitionName));
-          })
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-    }
-    return result;
-  }
-
-  private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord 
avroRecord, String recordKey, String partitionName) {
-    return new HoodieAvroRecord<>(new HoodieKey(recordKey, partitionName),
-        new HoodieMetadataPayload(avroRecord, 0L), null);
-  }
-
-  /**
-   * Create a file reader and the record scanner for a given partition and 
file slice
-   * if readers are not already available.
-   *
-   * @param partitionName - Partition name
-   * @param slice         - The file slice to open readers for
-   * @return File reader and the record scanner pair for the requested file 
slice
-   */
-  private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> 
getOrCreateReaders(String partitionName, FileSlice slice) {
-    if (reuse) {
-      Pair<String, String> key = Pair.of(partitionName, slice.getFileId());
-      return partitionReaders.get().computeIfAbsent(key, ignored -> 
openReaders(partitionName, slice));
-    } else {
-      return openReaders(partitionName, slice);
-    }

Review Comment:
   Is the `reuse` behavior handled in a different way though the FGReader-based 
MDT reading?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to