nsivabalan commented on a change in pull request #3426:
URL: https://github.com/apache/hudi/pull/3426#discussion_r702974631



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -162,157 +168,121 @@ private void initIfNeeded() {
       throw new HoodieIOException("Error merging records from metadata table 
for key :" + key, ioe);
     } finally {
       if (!reuse) {
-        closeOrThrow();
+        close(partitionName);
       }
     }
   }
 
-  private void openReadersIfNeededOrThrow() {
-    try {
-      openReadersIfNeeded();
-    } catch (IOException e) {
-      throw new HoodieIOException("Error opening readers to the Metadata 
Table: ", e);
-    }
-  }
-
   /**
    * Returns a new pair of readers to the base and log files.
    */
-  private void openReadersIfNeeded() throws IOException {
-    if (reuse && (baseFileReader != null || logRecordScanner != null)) {
-      // quickly exit out without synchronizing if reusing and readers are 
already open
-      return;
-    }
-
-    // we always force synchronization, if reuse=false, to handle concurrent 
close() calls as well.
-    synchronized (this) {
-      if (baseFileReader != null || logRecordScanner != null) {
-        return;
-      }
-
-      final long baseFileOpenMs;
-      final long logScannerOpenMs;
-
-      // Metadata is in sync till the latest completed instant on the dataset
-      HoodieTimer timer = new HoodieTimer().startTimer();
-      String latestInstantTime = getLatestDatasetInstantTime();
-      ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 
1, "must be at-least one valid metadata file slice");
-
-      // If the base file is present then create a reader
-      Option<HoodieBaseFile> basefile = 
latestFileSystemMetadataSlices.get(0).getBaseFile();
-      if (basefile.isPresent()) {
-        String basefilePath = basefile.get().getPath();
-        baseFileReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
-        baseFileOpenMs = timer.endTimer();
-        LOG.info(String.format("Opened metadata base file from %s at instant 
%s in %d ms", basefilePath,
-            basefile.get().getCommitTime(), baseFileOpenMs));
-      } else {
-        baseFileOpenMs = 0;
-        timer.endTimer();
-      }
-
-      // Open the log record scanner using the log files from the latest file 
slice
-      timer.startTimer();
-      List<String> logFilePaths = 
latestFileSystemMetadataSlices.get(0).getLogFiles()
-          .sorted(HoodieLogFile.getLogFileComparator())
-          .map(o -> o.getPath().toString())
-          .collect(Collectors.toList());
-      Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
-      String latestMetaInstantTimestamp = 
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
-      // Load the schema
-      Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-      HoodieCommonConfig commonConfig = 
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
-      logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
-          .withFileSystem(metaClient.getFs())
-          .withBasePath(metadataBasePath)
-          .withLogFilePaths(logFilePaths)
-          .withReaderSchema(schema)
-          .withLatestInstantTime(latestMetaInstantTimestamp)
-          .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
-          .withBufferSize(BUFFER_SIZE)
-          .withSpillableMapBasePath(spillableMapDirectory)
-          .withDiskMapType(commonConfig.getSpillableDiskMapType())
-          
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
-          .build();
-
-      logScannerOpenMs = timer.endTimer();
-      LOG.info(String.format("Opened metadata log files from %s at instant 
(dataset instant=%s, metadata instant=%s) in %d ms",
-          logFilePaths, latestInstantTime, latestMetaInstantTimestamp, 
logScannerOpenMs));
-
-      metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + 
logScannerOpenMs));
-    }
-  }
+  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> 
openReadersIfNeeded(String key, String partitionName) throws IOException {

Review comment:
       Can you help me understand something here. I assume we re-use readers 
and hence won't be reinitializing repeatedly. But validInstants from dataset is 
computed only during initialization of reader (MergedLogRecordScanner) and 
passed in with constructor of the same. So, it may never get updated unless we 
re-init the log scanner again. 
   Should't we be updating the validInstants at regular cadence ? 
   But I also notice that logs are scanned during initialization of 
HoodieMetadataMergedLogRecordScanner only. So, not clear how do we ensure new 
logs blocks/files are refreshed. May be I need to dig into code base more to 
understand this.  
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to