linliu-code commented on code in PR #13300:
URL: https://github.com/apache/hudi/pull/13300#discussion_r2098427230


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -210,38 +229,110 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(L
         getEngineContext().parallelize(partitionFileSlices))
         .flatMap(
             (SerializableFunction<FileSlice, 
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
-              // NOTE: Since this will be executed by executors, we can't 
access previously cached
-              //       readers, and therefore have to always open new ones
-              Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> 
readers =
-                  openReaders(partitionName, fileSlice);
-              try {
-                List<Long> timings = new ArrayList<>();
+              if (!metadataConfig.isFileGroupReaderEnabled()) {
+                return getByKeyPrefixes(fileSlice, sortedKeyPrefixes, 
partitionName);
+              } else {
+                return getByKeyPrefixesWithFileGroupReader(fileSlice, 
sortedKeyPrefixes, partitionName);
+              }
+            });
+  }
 
-                HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-                HoodieMetadataLogRecordReader logRecordScanner = 
readers.getRight();
+  private Iterator<HoodieRecord<HoodieMetadataPayload>> 
getByKeyPrefixes(FileSlice fileSlice,
+                                                                         
List<String> sortedKeyPrefixes,
+                                                                         
String partitionName) {
+    // NOTE: Since this will be executed by executors, we can't access 
previously cached
+    //       readers, and therefore have to always open new ones
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
+        openReaders(partitionName, fileSlice);
+    try {
+      List<Long> timings = new ArrayList<>();
+      HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
+      HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
 
-                if (baseFileReader == null && logRecordScanner == null) {
-                  // TODO: what do we do if both does not exist? should we 
throw an exception and let caller do the fallback ?
-                  return Collections.emptyIterator();
-                }
+      if (baseFileReader == null && logRecordScanner == null) {
+        // TODO: what do we do if both does not exist? should we throw an 
exception and let caller do the fallback ?
+        return Collections.emptyIterator();
+      }
+      boolean fullKeys = false;
 
-                boolean fullKeys = false;
+      Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
+          readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, 
timings);
+      Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords =
+          readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
 
-                Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
-                    readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
+      LOG.debug("Metadata read for {} keys took [baseFileRead, logMerge] {} 
ms", sortedKeyPrefixes.size(), timings);
+
+      return mergedRecords.values().iterator();
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error merging records from metadata table 
for  " + sortedKeyPrefixes.size() + " key : ", ioe);
+    } finally {
+      closeReader(readers);
+    }
+  }
 
-                Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords 
=
-                    readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+  // TODO: add predicate support.
+  private Iterator<HoodieRecord<HoodieMetadataPayload>> 
getByKeyPrefixesWithFileGroupReader(FileSlice fileSlice,
+                                                                               
             List<String> sortedKeyPrefixes,
+                                                                               
             String partitionName) throws IOException {
+    Option<HoodieInstant> latestMetadataInstant =
+        
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+    String latestMetadataInstantTime =
+        
latestMetadataInstant.map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+    HoodieFileGroupReader fileGroupReader = getFileGroupReader(
+        metadataMetaClient.getTableConfig(),
+        metadataMetaClient.getBasePath().toString(),
+        latestMetadataInstantTime,
+        fileSlice,
+        schema,
+        schema,
+        Option.empty(),
+        metadataMetaClient,
+        new TypedProperties(),
+        Collections.emptyList()); // TODO: Any properties?
+    fileGroupReader.initRecordIterators();
+    ClosableIterator it = fileGroupReader.getClosableIterator();
+    return new HoodieRecordIterator(it, partitionName, sortedKeyPrefixes);
+  }
+
+  public static class HoodieRecordIterator implements 
Iterator<HoodieRecord<HoodieMetadataPayload>> {
+    private final ClosableIterator<IndexedRecord> baseIterator;
+    private final String partitionName;
+    private final List<String> sortedKeyPrefixes;
+    private HoodieMetadataRecord metadataRecord;
+
+    public HoodieRecordIterator(ClosableIterator<IndexedRecord> baseIterator, 
String partitionName, List<String> sortedKeyPrefixes) {
+      this.baseIterator = baseIterator;
+      this.partitionName = partitionName;
+      this.sortedKeyPrefixes = sortedKeyPrefixes;
+    }
 
-                LOG.debug("Metadata read for {} keys took [baseFileRead, 
logMerge] {} ms", sortedKeyPrefixes.size(), timings);
+    @Override
+    public boolean hasNext() {
+      while (baseIterator.hasNext()) {
+        try {
+          metadataRecord = transform((GenericRecord) baseIterator.next());
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        boolean found = sortedKeyPrefixes.stream().anyMatch(p -> 
metadataRecord.getKey().startsWith(p));

Review Comment:
   Yeah, should removed.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -325,6 +421,72 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeysFromFileSlice
     }
   }
 
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeysWithFileGroupReader(String partitionName,
+                                                                               
          List<String> keys,
+                                                                               
          FileSlice fileSlice) {
+    try {
+      // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
+      List<String> sortedKeys = new ArrayList<>(keys);
+      // So we use the natural order to sort.
+      Collections.sort(sortedKeys);
+      Option<HoodieInstant> latestMetadataInstant =
+          
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+      String latestMetadataInstantTime =
+          
latestMetadataInstant.map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
+      Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+      HoodieFileGroupReader fileGroupReader = getFileGroupReader(
+          metadataMetaClient.getTableConfig(),
+          metadataMetaClient.getBasePath().toString(),
+          latestMetadataInstantTime,
+          fileSlice,
+          schema,
+          schema,
+          Option.empty(),
+          metadataMetaClient,
+          new TypedProperties(),
+          Collections.emptyList()); // TODO: Any properties?
+      fileGroupReader.initRecordIterators();
+      ClosableIterator it = fileGroupReader.getClosableIterator();
+      Map<String, HoodieRecord<HoodieMetadataPayload>> records = new 
HashMap<>();
+      while (it.hasNext()) {
+        HoodieMetadataRecord r = transform((GenericRecord) it.next());
+        // Remove bad results.
+        if (!keys.contains(r.getKey())) {
+          continue;
+        }
+        HoodieMetadataPayload payload = new HoodieMetadataPayload(r, 
r.getKey());
+        HoodieKey key = new HoodieKey(r.getKey(), partitionName);
+        HoodieAvroRecord record = new HoodieAvroRecord(key, payload);
+        records.put(key.getRecordKey(), record);
+      }
+      return records;
+    } catch (IOException e) {
+      throw new HoodieIOException("Error merging records from metadata table 
for  " + keys.size() + " keys : ", e);
+    }
+  }
+
+  /**
+   * This is a temporary solution. We should create a reader to generate 
HoodieMetadataRecord

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to