manojpec commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r796948659



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -233,38 +249,69 @@ private void initIfNeeded() {
   }
 
   /**
-   * Returns a new pair of readers to the base and log files.
+   * Get the file slice details for the given key in a partition.
+   *
+   * @param partitionName - Metadata partition name
+   * @param key           - Key to get the file slice for
+   * @return Partition and file slice pair for the given key
    */
-  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> 
openReadersIfNeeded(String key, String partitionName) {
-    return partitionReaders.computeIfAbsent(partitionName, k -> {
-      try {
-        final long baseFileOpenMs;
-        final long logScannerOpenMs;
-        HoodieFileReader baseFileReader = null;
-        HoodieMetadataMergedLogRecordReader logRecordScanner = null;
+  private Pair<String, FileSlice> getPartitionFileSlice(final String 
partitionName, final String key) {
+    // Metadata is in sync till the latest completed instant on the dataset
+    List<FileSlice> latestFileSlices =
+        
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
partitionName);
+
+    final FileSlice slice = 
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
+        latestFileSlices.size()));
+    return Pair.of(partitionName, slice);
+  }
+
+  /**
+   * Get the latest file slices for the interested keys in a given partition.
+   *
+   * @param partitionName - Partition to get the file slices from
+   * @param keys          - Interested keys
+   * @return FileSlices for the keys
+   */
+  private Map<Pair<String, FileSlice>, List<String>> 
getPartitionFileSlices(final String partitionName, final List<String> keys) {
+    // Metadata is in sync till the latest completed instant on the dataset
+    List<FileSlice> latestFileSlices =
+        
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
partitionName);
+
+    Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = 
new HashMap<>();
+    for (String key : keys) {
+      final FileSlice slice = 
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
+          latestFileSlices.size()));
+      final Pair<String, FileSlice> keyFileSlicePair = Pair.of(partitionName, 
slice);
+      partitionFileSliceToKeysMap.computeIfAbsent(keyFileSlicePair, k -> new 
ArrayList<>()).add(key);
+    }
+    return partitionFileSliceToKeysMap;
+  }
 
-        // Metadata is in sync till the latest completed instant on the dataset
+  /**
+   * Create a file reader and the record scanner for a given partition and 
file slice
+   * if readers are not already available.
+   *
+   * @param partitionName - Partition name
+   * @param slice         - The file slice to open readers for
+   * @return File reader and the record scanner pair for the requested file 
slice
+   */
+  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> 
openReadersIfNeeded(String partitionName, FileSlice slice) {
+    return partitionReaders.computeIfAbsent(Pair.of(partitionName, 
slice.getFileId()), k -> {
+      try {
         HoodieTimer timer = new HoodieTimer().startTimer();
-        List<FileSlice> latestFileSlices = 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
partitionName);
-        if (latestFileSlices.size() == 0) {
-          // empty partition
-          return Pair.of(null, null);
-        }
-        ValidationUtils.checkArgument(latestFileSlices.size() == 1, 
String.format("Invalid number of file slices: found=%d, required=%d", 
latestFileSlices.size(), 1));
-        final FileSlice slice = 
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, 
latestFileSlices.size()));
 
         // Open base file reader
         Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = 
getBaseFileReader(slice, timer);
-        baseFileReader = baseFileReaderOpenTimePair.getKey();
-        baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
+        HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
+        final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
 
         // Open the log record scanner using the log files from the latest 
file slice
-        Pair<HoodieMetadataMergedLogRecordReader, Long> 
logRecordScannerOpenTimePair = getLogRecordScanner(slice,
-            partitionName);
-        logRecordScanner = logRecordScannerOpenTimePair.getKey();
-        logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
+        Pair<HoodieMetadataMergedLogRecordReader, Long> 
logRecordScannerOpenTimePair = getLogRecordScanner(slice, partitionName);
+        HoodieMetadataMergedLogRecordReader logRecordScanner = 
logRecordScannerOpenTimePair.getKey();
+        final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
 
-        metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + 
logScannerOpenMs));
+        metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,

Review comment:
       Actually full scanning happens (with the default config today) as part 
of opening the readers. Also its based on the config you pass in to open the 
reader. Once we introduce partition specific inline reading in HUDI-3317, these 
metrics need to be updated. Not until then.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to