nsivabalan commented on a change in pull request #3426:
URL: https://github.com/apache/hudi/pull/3426#discussion_r703004302



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
       throw new HoodieIOException("Error merging records from metadata table 
for key :" + key, ioe);
     } finally {
       if (!reuse) {
-        closeOrThrow();
+        close(partitionName);
       }
     }
   }
 
-  private void openReadersIfNeededOrThrow() {
-    try {
-      openReadersIfNeeded();
-    } catch (IOException e) {
-      throw new HoodieIOException("Error opening readers to the Metadata 
Table: ", e);
-    }
-  }
-
   /**
    * Returns a new pair of readers to the base and log files.
    */
-  private void openReadersIfNeeded() throws IOException {
-    if (reuse && (baseFileReader != null || logRecordScanner != null)) {
-      // quickly exit out without synchronizing if reusing and readers are 
already open
-      return;
-    }
-
-    // we always force synchronization, if reuse=false, to handle concurrent 
close() calls as well.
-    synchronized (this) {
-      if (baseFileReader != null || logRecordScanner != null) {
-        return;
-      }
-
-      final long baseFileOpenMs;
-      final long logScannerOpenMs;
-
-      // Metadata is in sync till the latest completed instant on the dataset
-      HoodieTimer timer = new HoodieTimer().startTimer();
-      String latestInstantTime = getLatestDatasetInstantTime();
-      ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 
1, "must be at-least one valid metadata file slice");
-
-      // If the base file is present then create a reader
-      Option<HoodieBaseFile> basefile = 
latestFileSystemMetadataSlices.get(0).getBaseFile();
-      if (basefile.isPresent()) {
-        String basefilePath = basefile.get().getPath();
-        baseFileReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
-        baseFileOpenMs = timer.endTimer();
-        LOG.info(String.format("Opened metadata base file from %s at instant 
%s in %d ms", basefilePath,
-            basefile.get().getCommitTime(), baseFileOpenMs));
-      } else {
-        baseFileOpenMs = 0;
-        timer.endTimer();
-      }
-
-      // Open the log record scanner using the log files from the latest file 
slice
-      timer.startTimer();
-      List<String> logFilePaths = 
latestFileSystemMetadataSlices.get(0).getLogFiles()
-          .sorted(HoodieLogFile.getLogFileComparator())
-          .map(o -> o.getPath().toString())
-          .collect(Collectors.toList());
-      Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
-      String latestMetaInstantTimestamp = 
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
-      // Load the schema
-      Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-      HoodieCommonConfig commonConfig = 
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
-      logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
-          .withFileSystem(metaClient.getFs())
-          .withBasePath(metadataBasePath)
-          .withLogFilePaths(logFilePaths)
-          .withReaderSchema(schema)
-          .withLatestInstantTime(latestMetaInstantTimestamp)
-          .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
-          .withBufferSize(BUFFER_SIZE)
-          .withSpillableMapBasePath(spillableMapDirectory)
-          .withDiskMapType(commonConfig.getSpillableDiskMapType())
-          
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
-          .build();
-
-      logScannerOpenMs = timer.endTimer();
-      LOG.info(String.format("Opened metadata log files from %s at instant 
(dataset instant=%s, metadata instant=%s) in %d ms",
-          logFilePaths, latestInstantTime, latestMetaInstantTimestamp, 
logScannerOpenMs));
-
-      metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + 
logScannerOpenMs));
-    }
-  }
+  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> 
openReadersIfNeeded(String key, String partitionName) throws IOException {
+    return shardReaders.computeIfAbsent(partitionName, k -> {
+      try {
+        final long baseFileOpenMs;
+        final long logScannerOpenMs;
+        HoodieFileReader baseFileReader = null;
+        HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
+
+        // Metadata is in sync till the latest completed instant on the dataset
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        List<FileSlice> shards = 
HoodieTableMetadataUtil.loadPartitionShards(metaClient, partitionName);
+        ValidationUtils.checkArgument(shards.size() == 1, 
String.format("Invalid number of shards: found=%d, required=%d", shards.size(), 
1));
+        final FileSlice slice = 
shards.get(HoodieTableMetadataUtil.keyToShard(key, shards.size()));
+
+        // If the base file is present then create a reader
+        Option<HoodieBaseFile> basefile = slice.getBaseFile();
+        if (basefile.isPresent()) {
+          String basefilePath = basefile.get().getPath();
+          baseFileReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
+          baseFileOpenMs = timer.endTimer();
+          LOG.info(String.format("Opened metadata base file from %s at instant 
%s in %d ms", basefilePath,
+              basefile.get().getCommitTime(), baseFileOpenMs));
+        } else {
+          baseFileOpenMs = 0;
+          timer.endTimer();
+        }
 
-  private void close(HoodieFileReader localFileReader, 
HoodieMetadataMergedLogRecordScanner localLogScanner) {
-    try {
-      if (localFileReader != null) {
-        localFileReader.close();
-      }
-      if (localLogScanner != null) {
-        localLogScanner.close();
+        // Open the log record scanner using the log files from the latest 
file slice
+        timer.startTimer();
+        List<String> logFilePaths = slice.getLogFiles()
+            .sorted(HoodieLogFile.getLogFileComparator())
+            .map(o -> o.getPath().toString())
+            .collect(Collectors.toList());
+
+        // Only those log files which have a corresponding completed instant 
on the dataset should be read
+        // This is because the metadata table is updated before the dataset 
instants are committed.
+        HoodieActiveTimeline datasetTimeline = 
datasetMetaClient.getActiveTimeline();
+        Set<String> validInstantTimestamps = 
datasetTimeline.filterCompletedInstants().getInstants()
+            .map(i -> i.getTimestamp()).collect(Collectors.toSet());
+
+        // For any rollbacks and restores, we cannot neglect the instants that 
they are rolling back.
+        // The rollback instant should be more recent than the start of the 
timeline for it to have rolled back any
+        // instant which we have a log block for.
+        final String minInstantTime = validInstantTimestamps.isEmpty() ? 
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
+        
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstants()
+            .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN, minInstantTime))
+            .forEach(instant -> {
+              
validInstantTimestamps.addAll(HoodieTableMetadataUtil.getCommitsRolledback(instant,
 datasetTimeline));
+            });
+
+        // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid 
timestamp
+        validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP);
+
+        Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();

Review comment:
       just to complete the argument, let me explain a/ an illustration. 
   dataset timeline: 
   C1.complete, C2.complete, C3.inflight, C4.complete. 
   Metadata timeline
   C1.complete, C2.complete, C4.complete. 
   
   Few cases that could happen. 
   a. C3 got committed to metadata and failed before committing to actual 
dataset. 
   a.1. rollback finished for C3 in original dataset. 
   a.2. rollback not yet triggered for C3 in original dataset. So, C3 is still 
inflight. Unless new writes kick in, this will not be rolledback. 
   b. C3 is a partial failed commit. so not yet synced to metadata table. 
   
   (b) is straight forward, as we don't sync non-completed commits yet. 
   a.1. lastInstant will be R5 (if rollback was complete, we would have synced 
that to metadata table), and validInstants will be {C1, C2, C3, C4} since we 
have a rollback in original dataset for C3. R5 refers to rollback of C3. 
   a.2. lastInstant will be C4, and validInstants will be {C1, C2, C4}
   
   
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
       throw new HoodieIOException("Error merging records from metadata table 
for key :" + key, ioe);
     } finally {
       if (!reuse) {
-        closeOrThrow();
+        close(partitionName);
       }
     }
   }
 
-  private void openReadersIfNeededOrThrow() {
-    try {
-      openReadersIfNeeded();
-    } catch (IOException e) {
-      throw new HoodieIOException("Error opening readers to the Metadata 
Table: ", e);
-    }
-  }
-
   /**
    * Returns a new pair of readers to the base and log files.
    */
-  private void openReadersIfNeeded() throws IOException {
-    if (reuse && (baseFileReader != null || logRecordScanner != null)) {
-      // quickly exit out without synchronizing if reusing and readers are 
already open
-      return;
-    }
-
-    // we always force synchronization, if reuse=false, to handle concurrent 
close() calls as well.
-    synchronized (this) {
-      if (baseFileReader != null || logRecordScanner != null) {
-        return;
-      }
-
-      final long baseFileOpenMs;
-      final long logScannerOpenMs;
-
-      // Metadata is in sync till the latest completed instant on the dataset
-      HoodieTimer timer = new HoodieTimer().startTimer();
-      String latestInstantTime = getLatestDatasetInstantTime();
-      ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 
1, "must be at-least one valid metadata file slice");
-
-      // If the base file is present then create a reader
-      Option<HoodieBaseFile> basefile = 
latestFileSystemMetadataSlices.get(0).getBaseFile();
-      if (basefile.isPresent()) {
-        String basefilePath = basefile.get().getPath();
-        baseFileReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
-        baseFileOpenMs = timer.endTimer();
-        LOG.info(String.format("Opened metadata base file from %s at instant 
%s in %d ms", basefilePath,
-            basefile.get().getCommitTime(), baseFileOpenMs));
-      } else {
-        baseFileOpenMs = 0;
-        timer.endTimer();
-      }
-
-      // Open the log record scanner using the log files from the latest file 
slice
-      timer.startTimer();
-      List<String> logFilePaths = 
latestFileSystemMetadataSlices.get(0).getLogFiles()
-          .sorted(HoodieLogFile.getLogFileComparator())
-          .map(o -> o.getPath().toString())
-          .collect(Collectors.toList());
-      Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
-      String latestMetaInstantTimestamp = 
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
-      // Load the schema
-      Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-      HoodieCommonConfig commonConfig = 
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
-      logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
-          .withFileSystem(metaClient.getFs())
-          .withBasePath(metadataBasePath)
-          .withLogFilePaths(logFilePaths)
-          .withReaderSchema(schema)
-          .withLatestInstantTime(latestMetaInstantTimestamp)
-          .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
-          .withBufferSize(BUFFER_SIZE)
-          .withSpillableMapBasePath(spillableMapDirectory)
-          .withDiskMapType(commonConfig.getSpillableDiskMapType())
-          
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
-          .build();
-
-      logScannerOpenMs = timer.endTimer();
-      LOG.info(String.format("Opened metadata log files from %s at instant 
(dataset instant=%s, metadata instant=%s) in %d ms",
-          logFilePaths, latestInstantTime, latestMetaInstantTimestamp, 
logScannerOpenMs));
-
-      metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + 
logScannerOpenMs));
-    }
-  }
+  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> 
openReadersIfNeeded(String key, String partitionName) throws IOException {
+    return shardReaders.computeIfAbsent(partitionName, k -> {
+      try {
+        final long baseFileOpenMs;
+        final long logScannerOpenMs;
+        HoodieFileReader baseFileReader = null;
+        HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
+
+        // Metadata is in sync till the latest completed instant on the dataset
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        List<FileSlice> shards = 
HoodieTableMetadataUtil.loadPartitionShards(metaClient, partitionName);
+        ValidationUtils.checkArgument(shards.size() == 1, 
String.format("Invalid number of shards: found=%d, required=%d", shards.size(), 
1));
+        final FileSlice slice = 
shards.get(HoodieTableMetadataUtil.keyToShard(key, shards.size()));
+
+        // If the base file is present then create a reader
+        Option<HoodieBaseFile> basefile = slice.getBaseFile();
+        if (basefile.isPresent()) {
+          String basefilePath = basefile.get().getPath();
+          baseFileReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
+          baseFileOpenMs = timer.endTimer();
+          LOG.info(String.format("Opened metadata base file from %s at instant 
%s in %d ms", basefilePath,
+              basefile.get().getCommitTime(), baseFileOpenMs));
+        } else {
+          baseFileOpenMs = 0;
+          timer.endTimer();
+        }
 
-  private void close(HoodieFileReader localFileReader, 
HoodieMetadataMergedLogRecordScanner localLogScanner) {
-    try {
-      if (localFileReader != null) {
-        localFileReader.close();
-      }
-      if (localLogScanner != null) {
-        localLogScanner.close();
+        // Open the log record scanner using the log files from the latest 
file slice
+        timer.startTimer();
+        List<String> logFilePaths = slice.getLogFiles()
+            .sorted(HoodieLogFile.getLogFileComparator())
+            .map(o -> o.getPath().toString())
+            .collect(Collectors.toList());
+
+        // Only those log files which have a corresponding completed instant 
on the dataset should be read
+        // This is because the metadata table is updated before the dataset 
instants are committed.
+        HoodieActiveTimeline datasetTimeline = 
datasetMetaClient.getActiveTimeline();
+        Set<String> validInstantTimestamps = 
datasetTimeline.filterCompletedInstants().getInstants()
+            .map(i -> i.getTimestamp()).collect(Collectors.toSet());
+
+        // For any rollbacks and restores, we cannot neglect the instants that 
they are rolling back.
+        // The rollback instant should be more recent than the start of the 
timeline for it to have rolled back any
+        // instant which we have a log block for.
+        final String minInstantTime = validInstantTimestamps.isEmpty() ? 
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
+        
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstants()
+            .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN, minInstantTime))
+            .forEach(instant -> {
+              
validInstantTimestamps.addAll(HoodieTableMetadataUtil.getCommitsRolledback(instant,
 datasetTimeline));
+            });
+
+        // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid 
timestamp
+        validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP);
+
+        Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();

Review comment:
       sorry, my bad. this piece of code is used differently. Actually existing 
piece of code looks good. 
   We use lastInstant only to ignore those delta commits in metadata timeline 
while processing log blocks. 
   Further valid instants are used to filter out those of interest to us 
(completed, and commits that got rolledback). 
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
       throw new HoodieIOException("Error merging records from metadata table 
for key :" + key, ioe);
     } finally {
       if (!reuse) {
-        closeOrThrow();
+        close(partitionName);
       }
     }
   }
 
-  private void openReadersIfNeededOrThrow() {
-    try {
-      openReadersIfNeeded();
-    } catch (IOException e) {
-      throw new HoodieIOException("Error opening readers to the Metadata 
Table: ", e);
-    }
-  }
-
   /**
    * Returns a new pair of readers to the base and log files.
    */
-  private void openReadersIfNeeded() throws IOException {
-    if (reuse && (baseFileReader != null || logRecordScanner != null)) {
-      // quickly exit out without synchronizing if reusing and readers are 
already open
-      return;
-    }
-
-    // we always force synchronization, if reuse=false, to handle concurrent 
close() calls as well.
-    synchronized (this) {
-      if (baseFileReader != null || logRecordScanner != null) {
-        return;
-      }
-
-      final long baseFileOpenMs;
-      final long logScannerOpenMs;
-
-      // Metadata is in sync till the latest completed instant on the dataset
-      HoodieTimer timer = new HoodieTimer().startTimer();
-      String latestInstantTime = getLatestDatasetInstantTime();
-      ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 
1, "must be at-least one valid metadata file slice");
-
-      // If the base file is present then create a reader
-      Option<HoodieBaseFile> basefile = 
latestFileSystemMetadataSlices.get(0).getBaseFile();
-      if (basefile.isPresent()) {
-        String basefilePath = basefile.get().getPath();
-        baseFileReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
-        baseFileOpenMs = timer.endTimer();
-        LOG.info(String.format("Opened metadata base file from %s at instant 
%s in %d ms", basefilePath,
-            basefile.get().getCommitTime(), baseFileOpenMs));
-      } else {
-        baseFileOpenMs = 0;
-        timer.endTimer();
-      }
-
-      // Open the log record scanner using the log files from the latest file 
slice
-      timer.startTimer();
-      List<String> logFilePaths = 
latestFileSystemMetadataSlices.get(0).getLogFiles()
-          .sorted(HoodieLogFile.getLogFileComparator())
-          .map(o -> o.getPath().toString())
-          .collect(Collectors.toList());
-      Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
-      String latestMetaInstantTimestamp = 
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
-      // Load the schema
-      Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-      HoodieCommonConfig commonConfig = 
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
-      logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
-          .withFileSystem(metaClient.getFs())
-          .withBasePath(metadataBasePath)
-          .withLogFilePaths(logFilePaths)
-          .withReaderSchema(schema)
-          .withLatestInstantTime(latestMetaInstantTimestamp)
-          .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
-          .withBufferSize(BUFFER_SIZE)
-          .withSpillableMapBasePath(spillableMapDirectory)
-          .withDiskMapType(commonConfig.getSpillableDiskMapType())
-          
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
-          .build();
-
-      logScannerOpenMs = timer.endTimer();
-      LOG.info(String.format("Opened metadata log files from %s at instant 
(dataset instant=%s, metadata instant=%s) in %d ms",
-          logFilePaths, latestInstantTime, latestMetaInstantTimestamp, 
logScannerOpenMs));
-
-      metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + 
logScannerOpenMs));
-    }
-  }
+  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> 
openReadersIfNeeded(String key, String partitionName) throws IOException {
+    return shardReaders.computeIfAbsent(partitionName, k -> {
+      try {
+        final long baseFileOpenMs;
+        final long logScannerOpenMs;
+        HoodieFileReader baseFileReader = null;
+        HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
+
+        // Metadata is in sync till the latest completed instant on the dataset
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        List<FileSlice> shards = 
HoodieTableMetadataUtil.loadPartitionShards(metaClient, partitionName);
+        ValidationUtils.checkArgument(shards.size() == 1, 
String.format("Invalid number of shards: found=%d, required=%d", shards.size(), 
1));
+        final FileSlice slice = 
shards.get(HoodieTableMetadataUtil.keyToShard(key, shards.size()));
+
+        // If the base file is present then create a reader
+        Option<HoodieBaseFile> basefile = slice.getBaseFile();
+        if (basefile.isPresent()) {
+          String basefilePath = basefile.get().getPath();
+          baseFileReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
+          baseFileOpenMs = timer.endTimer();
+          LOG.info(String.format("Opened metadata base file from %s at instant 
%s in %d ms", basefilePath,
+              basefile.get().getCommitTime(), baseFileOpenMs));
+        } else {
+          baseFileOpenMs = 0;
+          timer.endTimer();
+        }
 
-  private void close(HoodieFileReader localFileReader, 
HoodieMetadataMergedLogRecordScanner localLogScanner) {
-    try {
-      if (localFileReader != null) {
-        localFileReader.close();
-      }
-      if (localLogScanner != null) {
-        localLogScanner.close();
+        // Open the log record scanner using the log files from the latest 
file slice
+        timer.startTimer();
+        List<String> logFilePaths = slice.getLogFiles()
+            .sorted(HoodieLogFile.getLogFileComparator())
+            .map(o -> o.getPath().toString())
+            .collect(Collectors.toList());
+
+        // Only those log files which have a corresponding completed instant 
on the dataset should be read
+        // This is because the metadata table is updated before the dataset 
instants are committed.
+        HoodieActiveTimeline datasetTimeline = 
datasetMetaClient.getActiveTimeline();
+        Set<String> validInstantTimestamps = 
datasetTimeline.filterCompletedInstants().getInstants()
+            .map(i -> i.getTimestamp()).collect(Collectors.toSet());
+
+        // For any rollbacks and restores, we cannot neglect the instants that 
they are rolling back.
+        // The rollback instant should be more recent than the start of the 
timeline for it to have rolled back any
+        // instant which we have a log block for.
+        final String minInstantTime = validInstantTimestamps.isEmpty() ? 
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
+        
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstants()
+            .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN, minInstantTime))
+            .forEach(instant -> {
+              
validInstantTimestamps.addAll(HoodieTableMetadataUtil.getCommitsRolledback(instant,
 datasetTimeline));
+            });
+
+        // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid 
timestamp
+        validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP);
+
+        Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();

Review comment:
       what ideally we need here is, max commit timestamp before which there 
are no inflight writes and all of them are synced to metadata. (last known 
fully synced timestamp)
   
   For eg,
   Dataset timeline: C1.complete, C2.complete, C3.complete, C4.Inflight, 
C5.complete. C6.inflght, C7.complete.
   Metadataset timeline: C1.complete, C2.complete, C3.complete, C5.complete. 
   
   We need to know C3 as the last known fully synced timestamp. So, everytime, 
when we are interested to find new instants to sync to metadata table, we need 
to look for any new commits after this timestamp in original dataset and take 
appropriate actions
   For those commits > C3, could fall into 3 categories. 
   a. Completed in dataset and synced to metadata. eg: C5. Do not require 
syncing as its already synced. 
   b. Inflight in dataset. eg: C4, C6. do not require syncing since its not yet 
complete. 
   c. completed in dataset and not yet synced to metadata. eg: C7. This needs 
to be synced in this round. 
   
   
   
   
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
       throw new HoodieIOException("Error merging records from metadata table 
for key :" + key, ioe);
     } finally {
       if (!reuse) {
-        closeOrThrow();
+        close(partitionName);
       }
     }
   }
 
-  private void openReadersIfNeededOrThrow() {
-    try {
-      openReadersIfNeeded();
-    } catch (IOException e) {
-      throw new HoodieIOException("Error opening readers to the Metadata 
Table: ", e);
-    }
-  }
-
   /**
    * Returns a new pair of readers to the base and log files.
    */
-  private void openReadersIfNeeded() throws IOException {
-    if (reuse && (baseFileReader != null || logRecordScanner != null)) {
-      // quickly exit out without synchronizing if reusing and readers are 
already open
-      return;
-    }
-
-    // we always force synchronization, if reuse=false, to handle concurrent 
close() calls as well.
-    synchronized (this) {
-      if (baseFileReader != null || logRecordScanner != null) {
-        return;
-      }
-
-      final long baseFileOpenMs;
-      final long logScannerOpenMs;
-
-      // Metadata is in sync till the latest completed instant on the dataset
-      HoodieTimer timer = new HoodieTimer().startTimer();
-      String latestInstantTime = getLatestDatasetInstantTime();
-      ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 
1, "must be at-least one valid metadata file slice");
-
-      // If the base file is present then create a reader
-      Option<HoodieBaseFile> basefile = 
latestFileSystemMetadataSlices.get(0).getBaseFile();
-      if (basefile.isPresent()) {
-        String basefilePath = basefile.get().getPath();
-        baseFileReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
-        baseFileOpenMs = timer.endTimer();
-        LOG.info(String.format("Opened metadata base file from %s at instant 
%s in %d ms", basefilePath,
-            basefile.get().getCommitTime(), baseFileOpenMs));
-      } else {
-        baseFileOpenMs = 0;
-        timer.endTimer();
-      }
-
-      // Open the log record scanner using the log files from the latest file 
slice
-      timer.startTimer();
-      List<String> logFilePaths = 
latestFileSystemMetadataSlices.get(0).getLogFiles()
-          .sorted(HoodieLogFile.getLogFileComparator())
-          .map(o -> o.getPath().toString())
-          .collect(Collectors.toList());
-      Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
-      String latestMetaInstantTimestamp = 
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
-      // Load the schema
-      Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-      HoodieCommonConfig commonConfig = 
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
-      logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
-          .withFileSystem(metaClient.getFs())
-          .withBasePath(metadataBasePath)
-          .withLogFilePaths(logFilePaths)
-          .withReaderSchema(schema)
-          .withLatestInstantTime(latestMetaInstantTimestamp)
-          .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
-          .withBufferSize(BUFFER_SIZE)
-          .withSpillableMapBasePath(spillableMapDirectory)
-          .withDiskMapType(commonConfig.getSpillableDiskMapType())
-          
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
-          .build();
-
-      logScannerOpenMs = timer.endTimer();
-      LOG.info(String.format("Opened metadata log files from %s at instant 
(dataset instant=%s, metadata instant=%s) in %d ms",
-          logFilePaths, latestInstantTime, latestMetaInstantTimestamp, 
logScannerOpenMs));
-
-      metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + 
logScannerOpenMs));
-    }
-  }
+  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> 
openReadersIfNeeded(String key, String partitionName) throws IOException {
+    return shardReaders.computeIfAbsent(partitionName, k -> {
+      try {
+        final long baseFileOpenMs;
+        final long logScannerOpenMs;
+        HoodieFileReader baseFileReader = null;
+        HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
+
+        // Metadata is in sync till the latest completed instant on the dataset
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        List<FileSlice> shards = 
HoodieTableMetadataUtil.loadPartitionShards(metaClient, partitionName);
+        ValidationUtils.checkArgument(shards.size() == 1, 
String.format("Invalid number of shards: found=%d, required=%d", shards.size(), 
1));
+        final FileSlice slice = 
shards.get(HoodieTableMetadataUtil.keyToShard(key, shards.size()));
+
+        // If the base file is present then create a reader
+        Option<HoodieBaseFile> basefile = slice.getBaseFile();
+        if (basefile.isPresent()) {
+          String basefilePath = basefile.get().getPath();
+          baseFileReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
+          baseFileOpenMs = timer.endTimer();
+          LOG.info(String.format("Opened metadata base file from %s at instant 
%s in %d ms", basefilePath,
+              basefile.get().getCommitTime(), baseFileOpenMs));
+        } else {
+          baseFileOpenMs = 0;
+          timer.endTimer();
+        }
 
-  private void close(HoodieFileReader localFileReader, 
HoodieMetadataMergedLogRecordScanner localLogScanner) {
-    try {
-      if (localFileReader != null) {
-        localFileReader.close();
-      }
-      if (localLogScanner != null) {
-        localLogScanner.close();
+        // Open the log record scanner using the log files from the latest 
file slice
+        timer.startTimer();
+        List<String> logFilePaths = slice.getLogFiles()
+            .sorted(HoodieLogFile.getLogFileComparator())
+            .map(o -> o.getPath().toString())
+            .collect(Collectors.toList());
+
+        // Only those log files which have a corresponding completed instant 
on the dataset should be read
+        // This is because the metadata table is updated before the dataset 
instants are committed.
+        HoodieActiveTimeline datasetTimeline = 
datasetMetaClient.getActiveTimeline();
+        Set<String> validInstantTimestamps = 
datasetTimeline.filterCompletedInstants().getInstants()
+            .map(i -> i.getTimestamp()).collect(Collectors.toSet());
+
+        // For any rollbacks and restores, we cannot neglect the instants that 
they are rolling back.
+        // The rollback instant should be more recent than the start of the 
timeline for it to have rolled back any
+        // instant which we have a log block for.
+        final String minInstantTime = validInstantTimestamps.isEmpty() ? 
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
+        
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstants()
+            .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN, minInstantTime))
+            .forEach(instant -> {
+              
validInstantTimestamps.addAll(HoodieTableMetadataUtil.getCommitsRolledback(instant,
 datasetTimeline));
+            });
+
+        // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid 
timestamp
+        validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP);
+
+        Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();

Review comment:
       Don't think we can use the latestCompletedInstant as delimiter. There 
could be concurrent writes to original dataset and chances that an older write 
could get committed later. 
   for eg, C1 and C2 does concurrent writes to original dataset, but C2 
completes earlier. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to