nsivabalan commented on a change in pull request #2494:
URL: https://github.com/apache/hudi/pull/2494#discussion_r577544617



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -188,41 +238,42 @@ private synchronized void openFileSliceIfNeeded() throws 
IOException {
 
     // Load the schema
     Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-    logRecordScanner = new 
HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath,
-            logFilePaths, schema, latestMetaInstantTimestamp, 
MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE,
+    HoodieMetadataMergedLogRecordScanner logRecordScanner = new 
HoodieMetadataMergedLogRecordScanner(metaClient.getFs(),
+            metadataBasePath, logFilePaths, schema, 
latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE,
             spillableMapDirectory, null);
 
-    LOG.info("Opened metadata log files from " + logFilePaths + " at instant " 
+ latestInstantTime
-        + "(dataset instant=" + latestInstantTime + ", metadata instant=" + 
latestMetaInstantTimestamp + ")");
+    timings[1] = timer.endTimer();
+    LOG.info(String.format("Opened metadata log files from %s at instant 
(dataset instant=%s, metadata instant=%s) in %d ms",
+        logFilePaths, latestInstantTime, latestMetaInstantTimestamp, 
timings[1]));
 
-    metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer()));
+    metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timings[0] + timings[1]));
+    return Pair.of(baseFileReader, logRecordScanner);
   }
 
-  private void closeIfNeeded() {
+  private void close(HoodieFileReader localFileReader, 
HoodieMetadataMergedLogRecordScanner localLogScanner) {
     try {
-      if (!metadataConfig.enableReuse()) {
-        close();
+      if (localFileReader != null) {
+        localFileReader.close();
+      }
+      if (localLogScanner != null) {
+        localLogScanner.close();
       }
     } catch (Exception e) {
       throw new HoodieException("Error closing resources during metadata table 
merge", e);
     }
   }
 
   @Override
-  public void close() throws Exception {
-    if (baseFileReader != null) {
-      baseFileReader.close();
-      baseFileReader = null;
-    }
-    if (logRecordScanner != null) {
-      logRecordScanner.close();
-      logRecordScanner = null;
-    }
+  public synchronized void close() throws Exception {
+    close(baseFileReader, logRecordScanner);
+    baseFileReader = null;

Review comment:
       Can you help me understand, why don't we mark these nulls within 
```close(HoodieFileReader localFileReader, HoodieMetadataMergedLogRecordScanner 
localLogScanner)?``` Might be cleaner IMO.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -112,13 +113,59 @@ private void initIfNeeded() {
 
   @Override
   protected Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKeyFromMetadata(String key) {
+    // This function can be called in parallel through multiple threads. For 
each thread, we determine the thread-local
+    // versions of the baseFile and logRecord readers to use.
+    // - If reuse is enabled, we use the same readers and dont close them
+    // - if reuse is disabled, we open new readers in each thread and close 
them
+    HoodieFileReader localFileReader = null;
+    HoodieMetadataMergedLogRecordScanner localLogRecordScanner = null;
+    synchronized (this) {
+      if (!metadataConfig.enableReuse()) {
+        // reuse is disabled so always open new readers
+        try {
+          Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers 
= openReaders();
+          localFileReader = readers.getKey();
+          localLogRecordScanner = readers.getValue();
+        } catch (IOException e) {
+          throw new HoodieIOException("Error opening readers", e);
+        }
+      } else if (baseFileReader == null && logRecordScanner == null) {
+        // reuse is enabled but we haven't opened the readers yet
+        try {
+          Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers 
= openReaders();
+          localFileReader = readers.getKey();
+          localLogRecordScanner = readers.getValue();
+          // cache the readers
+          baseFileReader = localFileReader;
+          logRecordScanner = localLogRecordScanner;
+        } catch (IOException e) {

Review comment:
       instead of having two try catch blocks, can we have just one for entire 
synchronized block? wdyt? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -147,23 +184,21 @@ private void initIfNeeded() {
         }
       }
       timings.add(timer.endTimer());
-      LOG.info(String.format("Metadata read for key %s took [open, 
baseFileRead, logMerge] %s ms", key, timings));
+      LOG.info(String.format("Metadata read for key %s took [baseFileRead, 
logMerge] %s ms", key, timings));
       return Option.ofNullable(hoodieRecord);
     } catch (IOException ioe) {
       throw new HoodieIOException("Error merging records from metadata table 
for key :" + key, ioe);
-    } finally {
-      closeIfNeeded();
     }
   }
 
   /**
-   * Open readers to the base and log files.
+   * Returns the readers to the base and log files.
+   *
+   * If reuse is allowed then cached readers are returned. Otherwise new 
readers are opened.

Review comment:
       Are these addressed? can you please respond.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -112,13 +113,59 @@ private void initIfNeeded() {
 
   @Override
   protected Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKeyFromMetadata(String key) {
+    // This function can be called in parallel through multiple threads. For 
each thread, we determine the thread-local
+    // versions of the baseFile and logRecord readers to use.
+    // - If reuse is enabled, we use the same readers and dont close them
+    // - if reuse is disabled, we open new readers in each thread and close 
them
+    HoodieFileReader localFileReader = null;
+    HoodieMetadataMergedLogRecordScanner localLogRecordScanner = null;
+    synchronized (this) {
+      if (!metadataConfig.enableReuse()) {
+        // reuse is disabled so always open new readers
+        try {
+          Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers 
= openReaders();
+          localFileReader = readers.getKey();
+          localLogRecordScanner = readers.getValue();
+        } catch (IOException e) {
+          throw new HoodieIOException("Error opening readers", e);
+        }
+      } else if (baseFileReader == null && logRecordScanner == null) {
+        // reuse is enabled but we haven't opened the readers yet
+        try {
+          Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers 
= openReaders();
+          localFileReader = readers.getKey();
+          localLogRecordScanner = readers.getValue();
+          // cache the readers
+          baseFileReader = localFileReader;
+          logRecordScanner = localLogRecordScanner;
+        } catch (IOException e) {
+          throw new HoodieIOException("Error opening readers", e);
+        }
+      } else {
+        // reuse the already open readers
+        ValidationUtils.checkState((baseFileReader != null || logRecordScanner 
!= null), "Readers should already be open");

Review comment:
       not sure if this has any purpose. Bcoz, previous else if check if both 
are null. so when execution comes to this else block, definitely both are not 
null. Or are we trying to catch the scenario where either of them is null ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to