the-other-tim-brown commented on code in PR #13300:
URL: https://github.com/apache/hudi/pull/13300#discussion_r2110017119


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java:
##########
@@ -156,7 +158,8 @@ private void performScan() {
     // Do the scan and merge
     timer.startTimer();
 
-    scanInternal(Option.empty(), false);
+    Option<KeySpec> keySpecOpt = 
createKeySpec(readerContext.getKeyFilterOpt());
+    scanInternal(keySpecOpt, false);

Review Comment:
   There is a `scanByFullKeys` and `scanByKeyPrefixes` methods in this class 
that are unused, can we clean these up while updating this class?



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -80,12 +85,29 @@ public ClosableIterator<IndexedRecord> 
getFileRecordIterator(
       long length,
       Schema dataSchema,
       Schema requiredSchema,
-      HoodieStorage storage
-  ) throws IOException {
+      HoodieStorage storage) throws IOException {
     HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(storage)
         
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new 
HoodieConfig(),
-            filePath, HoodieFileFormat.PARQUET, Option.empty());
-    return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+            filePath, baseFileFormat, Option.empty());
+    if (keyFilterOpt.isEmpty()) {
+      return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+    } else {
+      // Currently predicate is only supported for HFile reader.
+      if (!(reader instanceof HoodieAvroHFileReaderImplBase)) {
+        return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+      } else {
+        // For HFile reader, only two predicates are supported: IN and 
StringStartsWithAny.
+        HoodieAvroHFileReaderImplBase hfileReader = 
(HoodieAvroHFileReaderImplBase) reader;
+        List<Expression> children = keyFilterOpt.get().getChildren();
+        List<String> keysOrPrefixes = children.subList(1, children.size())
+            .stream().map(e -> (String) 
e.eval(null)).collect(Collectors.toList());
+        if (keyFilterOpt.get().getOperator().equals(Expression.Operator.IN)) { 
// With keys.
+          return hfileReader.getIndexedRecordsByKeysIterator(keysOrPrefixes, 
requiredSchema);
+        } else {  // With key prefixes.

Review Comment:
   Can we check that the predicate is "starts with" and then throw an exception 
if it is neither of these cases? I want to avoid a case where a future 
developer thinks this will work with any predicate.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -210,40 +222,55 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(L
         getEngineContext().parallelize(partitionFileSlices))
         .flatMap(
             (SerializableFunction<FileSlice, 
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
-              // NOTE: Since this will be executed by executors, we can't 
access previously cached
-              //       readers, and therefore have to always open new ones
-              Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> 
readers =
-                  openReaders(partitionName, fileSlice);
-              try {
-                List<Long> timings = new ArrayList<>();
-
-                HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-                HoodieMetadataLogRecordReader logRecordScanner = 
readers.getRight();
-
-                if (baseFileReader == null && logRecordScanner == null) {
-                  // TODO: what do we do if both does not exist? should we 
throw an exception and let caller do the fallback ?
-                  return Collections.emptyIterator();
-                }
-
-                boolean fullKeys = false;
-
-                Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
-                    readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
-
-                Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords 
=
-                    readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
-
-                LOG.debug("Metadata read for {} keys took [baseFileRead, 
logMerge] {} ms", sortedKeyPrefixes.size(), timings);
-
-                return mergedRecords.values().iterator();
-              } catch (IOException ioe) {
-                throw new HoodieIOException("Error merging records from 
metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
-              } finally {
-                closeReader(readers);
-              }
+              return getByKeyPrefixesWithFileGroupReader(fileSlice, 
sortedKeyPrefixes, partitionName);
             });
   }
 
+  private Iterator<HoodieRecord<HoodieMetadataPayload>> 
getByKeyPrefixesWithFileGroupReader(FileSlice fileSlice,

Review Comment:
   Nitpick on naming: you don't need to include the implementation detail of 
using FileGroupReader in the name



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -210,40 +222,55 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(L
         getEngineContext().parallelize(partitionFileSlices))
         .flatMap(
             (SerializableFunction<FileSlice, 
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
-              // NOTE: Since this will be executed by executors, we can't 
access previously cached
-              //       readers, and therefore have to always open new ones
-              Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> 
readers =
-                  openReaders(partitionName, fileSlice);
-              try {
-                List<Long> timings = new ArrayList<>();
-
-                HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-                HoodieMetadataLogRecordReader logRecordScanner = 
readers.getRight();
-
-                if (baseFileReader == null && logRecordScanner == null) {
-                  // TODO: what do we do if both does not exist? should we 
throw an exception and let caller do the fallback ?
-                  return Collections.emptyIterator();
-                }
-
-                boolean fullKeys = false;
-
-                Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
-                    readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
-
-                Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords 
=
-                    readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
-
-                LOG.debug("Metadata read for {} keys took [baseFileRead, 
logMerge] {} ms", sortedKeyPrefixes.size(), timings);
-
-                return mergedRecords.values().iterator();
-              } catch (IOException ioe) {
-                throw new HoodieIOException("Error merging records from 
metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
-              } finally {
-                closeReader(readers);
-              }
+              return getByKeyPrefixesWithFileGroupReader(fileSlice, 
sortedKeyPrefixes, partitionName);
             });
   }
 
+  private Iterator<HoodieRecord<HoodieMetadataPayload>> 
getByKeyPrefixesWithFileGroupReader(FileSlice fileSlice,
+                                                                               
             List<String> sortedKeyPrefixes,
+                                                                               
             String partitionName) throws IOException {
+    Option<HoodieInstant> latestMetadataInstant =
+        
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+    String latestMetadataInstantTime =
+        
latestMetadataInstant.map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+    // Only those log files which have a corresponding completed instant on 
the dataset should be read
+    // This is because the metadata table is updated before the dataset 
instants are committed.
+    Set<String> validInstantTimestamps = getValidInstantTimestamps();
+    InstantRange instantRange = InstantRange.builder()
+        .rangeType(InstantRange.RangeType.EXACT_MATCH)
+        .explicitInstants(validInstantTimestamps).build();
+    HoodieFileGroupReader<IndexedRecord> fileGroupReader = getFileGroupReader(
+        metadataMetaClient.getTableConfig(),
+        latestMetadataInstantTime,
+        fileSlice,
+        schema,
+        schema,
+        metadataMetaClient,
+        new TypedProperties(),
+        Option.of(transformKeyPrefixesToPredicate(sortedKeyPrefixes)),
+        instantRange);
+    ClosableIterator<IndexedRecord> it = fileGroupReader.getClosableIterator();
+    return new CloseableMappingIterator<>(
+        it,
+        metadataRecord -> {
+          HoodieKey key = new HoodieKey(
+              ((GenericRecord) metadataRecord).get(KEY_FIELD_NAME).toString(), 
partitionName);

Review Comment:
   We can find the field index of the key field and use that instead of looking 
up the field name in the schema on each record to get slightly better 
performance.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -292,36 +319,49 @@ private static ArrayList<ArrayList<String>> 
partitionKeysByFileSlices(List<Strin
     return partitionedKeys;
   }
 
-  /**
-   * Lookup list of keys from a single file slice.
-   *
-   * @param partitionName Name of the partition
-   * @param keys          The list of keys to lookup
-   * @param fileSlice     The file slice to read
-   * @return A {@code Map} of key name to {@code HoodieRecord} for the keys 
which were found in the file slice
-   */
-  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeysFromFileSlice(String partitionName, List<String> keys, FileSlice 
fileSlice) {
-    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
getOrCreateReaders(partitionName, fileSlice);
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeysWithFileGroupReader(String partitionName,
+                                                                               
          List<String> keys,
+                                                                               
          FileSlice fileSlice) {
     try {
-      HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-      HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
-      if (baseFileReader == null && logRecordScanner == null) {
-        return Collections.emptyMap();
-      }
-
       // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
       List<String> sortedKeys = new ArrayList<>(keys);
+      // So we use the natural order to sort.
       Collections.sort(sortedKeys);
-      boolean fullKeys = true;
-      List<Long> timings = new ArrayList<>(1);
-      Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords = 
readLogRecords(logRecordScanner, sortedKeys, fullKeys, timings);
-      return readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeys, 
fullKeys, logRecords, timings, partitionName);
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error merging records from metadata table 
for  " + keys.size() + " key : ", ioe);
-    } finally {
-      if (!reuse) {
-        closeReader(readers);
+      Option<HoodieInstant> latestMetadataInstant =
+          
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+      String latestMetadataInstantTime =
+          
latestMetadataInstant.map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
+      Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+      // Only those log files which have a corresponding completed instant on 
the dataset should be read
+      // This is because the metadata table is updated before the dataset 
instants are committed.
+      Set<String> validInstantTimestamps = getValidInstantTimestamps();
+      InstantRange instantRange = InstantRange.builder()
+          .rangeType(InstantRange.RangeType.EXACT_MATCH)
+          .explicitInstants(validInstantTimestamps).build();
+
+      HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
getFileGroupReader(

Review Comment:
   Nitpick: I think it is more clear to use the builder directly than a method 
with lots of args.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -686,4 +726,29 @@ record -> {
         .stream()
         .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toSet())));
   }
+
+  private HoodieFileGroupReader<IndexedRecord> 
getFileGroupReader(HoodieTableConfig tableConfig,

Review Comment:
   Now that the FGreader is used `readFromBaseAndMergeWithLogRecords` can be 
removed



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -80,12 +85,29 @@ public ClosableIterator<IndexedRecord> 
getFileRecordIterator(
       long length,
       Schema dataSchema,
       Schema requiredSchema,
-      HoodieStorage storage
-  ) throws IOException {
+      HoodieStorage storage) throws IOException {
     HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(storage)
         
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new 
HoodieConfig(),
-            filePath, HoodieFileFormat.PARQUET, Option.empty());
-    return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+            filePath, baseFileFormat, Option.empty());
+    if (keyFilterOpt.isEmpty()) {
+      return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+    } else {
+      // Currently predicate is only supported for HFile reader.
+      if (!(reader instanceof HoodieAvroHFileReaderImplBase)) {
+        return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+      } else {
+        // For HFile reader, only two predicates are supported: IN and 
StringStartsWithAny.
+        HoodieAvroHFileReaderImplBase hfileReader = 
(HoodieAvroHFileReaderImplBase) reader;
+        List<Expression> children = keyFilterOpt.get().getChildren();
+        List<String> keysOrPrefixes = children.subList(1, children.size())

Review Comment:
   Why is the first expression skipped? Can you add an inline comment so it is 
clear this is intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to