This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2009b0f44660f1d1753685a3ea64494d591aebf2
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Mon Aug 28 23:56:52 2023 -0700

    [HUDI-6726] Fix connection leaks related to file reader and iterator close 
(#9539)
    
    
    
    ---------
    
    Co-authored-by: rmahindra123 <[email protected]>
---
 .../table/action/commit/HoodieMergeHelper.java     |   5 +-
 .../io/storage/TestHoodieHFileReaderWriter.java    |  10 +-
 .../bootstrap/index/HFileBootstrapIndex.java       |   8 +-
 .../hudi/common/table/TableSchemaResolver.java     |   5 +-
 .../table/log/block/HoodieHFileDataBlock.java      |  23 +--
 .../hudi/common/util/queue/SimpleExecutor.java     |   6 +-
 .../hudi/io/storage/HoodieAvroHFileReader.java     | 173 +++++++++++++++------
 .../apache/hudi/io/storage/HoodieHFileUtils.java   |  24 ++-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |   4 +-
 .../hudi/hadoop/HoodieHFileRecordReader.java       |   8 +-
 10 files changed, 185 insertions(+), 81 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 4df767b5e41..c1523d564e4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -123,7 +123,7 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
         // In case writer's schema is simply a projection of the reader's one 
we can read
         // the records in the projected schema directly
         recordSchema = isPureProjection ? writerSchema : readerSchema;
-        recordIterator = baseFileReader.getRecordIterator(recordSchema);
+        recordIterator = (ClosableIterator<HoodieRecord>) 
baseFileReader.getRecordIterator(recordSchema);
       }
 
       boolean isBufferingRecords = 
ExecutorFactory.isBufferingRecords(writeConfig);
@@ -155,6 +155,9 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
         executor.awaitTermination();
       } else {
         baseFileReader.close();
+        if (bootstrapFileReader != null) {
+          bootstrapFileReader.close();
+        }
         mergeHandle.close();
       }
     }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
index 90ad0fe1a74..0d2eefa0863 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
@@ -214,8 +214,9 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
     byte[] content = FileIOUtils.readAsByteArray(
         fs.open(getFilePath()), (int) 
fs.getFileStatus(getFilePath()).getLen());
     // Reading byte array in HFile format, without actual file path
+    Configuration hadoopConf = fs.getConf();
     HoodieAvroHFileReader hfileReader =
-        new HoodieAvroHFileReader(fs, new Path(DUMMY_BASE_PATH), content, 
Option.empty());
+        new HoodieAvroHFileReader(hadoopConf, new Path(DUMMY_BASE_PATH), new 
CacheConfig(hadoopConf), fs, content, Option.empty());
     Schema avroSchema = 
getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
     assertEquals(NUM_RECORDS, hfileReader.getTotalRecords());
     verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
@@ -420,8 +421,10 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
     verifyHFileReader(
         HoodieHFileUtils.createHFileReader(fs, new Path(DUMMY_BASE_PATH), 
content),
         hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE);
+
+    Configuration hadoopConf = fs.getConf();
     HoodieAvroHFileReader hfileReader =
-        new HoodieAvroHFileReader(fs, new Path(DUMMY_BASE_PATH), content, 
Option.empty());
+        new HoodieAvroHFileReader(hadoopConf, new Path(DUMMY_BASE_PATH), new 
CacheConfig(hadoopConf), fs, content, Option.empty());
     Schema avroSchema = 
getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
     assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords());
     verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
@@ -429,7 +432,8 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
     content = readHFileFromResources(complexHFile);
     verifyHFileReader(HoodieHFileUtils.createHFileReader(fs, new 
Path(DUMMY_BASE_PATH), content),
         hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE);
-    hfileReader = new HoodieAvroHFileReader(fs, new Path(DUMMY_BASE_PATH), 
content, Option.empty());
+    hfileReader =
+        new HoodieAvroHFileReader(hadoopConf, new Path(DUMMY_BASE_PATH), new 
CacheConfig(hadoopConf), fs, content, Option.empty());
     avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, 
"/exampleSchemaWithUDT.avsc");
     assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords());
     verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
index ee4eeec68d6..9b5e323e4f7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
@@ -182,12 +182,8 @@ public class HFileBootstrapIndex extends BootstrapIndex {
    * @param fileSystem File System
    */
   private static HFile.Reader createReader(String hFilePath, Configuration 
conf, FileSystem fileSystem) {
-    try {
-      LOG.info("Opening HFile for reading :" + hFilePath);
-      return HoodieHFileUtils.createHFileReader(fileSystem, new 
HFilePathForReader(hFilePath), new CacheConfig(conf), conf);
-    } catch (IOException ioe) {
-      throw new HoodieIOException(ioe.getMessage(), ioe);
-    }
+    LOG.info("Opening HFile for reading :" + hFilePath);
+    return HoodieHFileUtils.createHFileReader(fileSystem, new 
HFilePathForReader(hFilePath), new CacheConfig(conf), conf);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 0e7e2cd4bf2..e757affe4bd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -327,8 +327,9 @@ public class TableSchemaResolver {
 
     FileSystem fs = metaClient.getRawFs();
     CacheConfig cacheConfig = new CacheConfig(fs.getConf());
-    HoodieAvroHFileReader hFileReader = new 
HoodieAvroHFileReader(fs.getConf(), hFilePath, cacheConfig);
-    return convertAvroSchemaToParquet(hFileReader.getSchema());
+    try (HoodieAvroHFileReader hFileReader = new 
HoodieAvroHFileReader(fs.getConf(), hFilePath, cacheConfig)) {
+      return convertAvroSchemaToParquet(hFileReader.getSchema());
+    }
   }
 
   private MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws 
IOException {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 96436811429..a0f9d43ba39 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -172,10 +172,13 @@ public class HoodieHFileDataBlock extends HoodieDataBlock 
{
   protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(byte[] 
content, HoodieRecordType type) throws IOException {
     checkState(readerSchema != null, "Reader's schema has to be non-null");
 
-    FileSystem fs = FSUtils.getFs(pathForReader.toString(), 
FSUtils.buildInlineConf(getBlockContentLocation().get().getHadoopConf()));
+    Configuration hadoopConf = 
FSUtils.buildInlineConf(getBlockContentLocation().get().getHadoopConf());
+    FileSystem fs = FSUtils.getFs(pathForReader.toString(), hadoopConf);
     // Read the content
-    HoodieAvroHFileReader reader = new HoodieAvroHFileReader(fs, 
pathForReader, content, Option.of(getSchemaFromHeader()));
-    return unsafeCast(reader.getRecordIterator(readerSchema));
+    try (HoodieAvroHFileReader reader = new HoodieAvroHFileReader(hadoopConf, 
pathForReader, new CacheConfig(hadoopConf),
+                                                             fs, content, 
Option.of(getSchemaFromHeader()))) {
+      return unsafeCast(reader.getRecordIterator(readerSchema));
+    }
   }
 
   // TODO abstract this w/in HoodieDataBlock
@@ -193,15 +196,15 @@ public class HoodieHFileDataBlock extends HoodieDataBlock 
{
         blockContentLoc.getContentPositionInLogFile(),
         blockContentLoc.getBlockSize());
 
-    final HoodieAvroHFileReader reader =
+    try (final HoodieAvroHFileReader reader =
              new HoodieAvroHFileReader(inlineConf, inlinePath, new 
CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf),
-             Option.of(getSchemaFromHeader()));
-
-    // Get writer's schema from the header
-    final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
-        fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) : 
reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
+             Option.of(getSchemaFromHeader()))) {
+      // Get writer's schema from the header
+      final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
+          fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) 
: reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
 
-    return new CloseableMappingIterator<>(recordIterator, data -> 
(HoodieRecord<T>) data);
+      return new CloseableMappingIterator<>(recordIterator, data -> 
(HoodieRecord<T>) data);
+    }
   }
 
   private byte[] serializeRecord(HoodieRecord<?> record, Schema schema) throws 
IOException {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java
index 10cb5240899..86512333ec4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.util.queue;
 
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieException;
 
 import org.slf4j.Logger;
@@ -77,7 +78,10 @@ public class SimpleExecutor<I, O, E> implements 
HoodieExecutor<E> {
 
   @Override
   public void shutdownNow() {
-    // no-op
+    // Consumer is already closed when the execution completes
+    if (itr instanceof ClosableIterator) {
+      ((ClosableIterator<I>) itr).close();
+    }
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
index 3d6533a3429..c26ac6d1a48 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
@@ -84,54 +84,68 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieAvroHFileReader.class);
 
   private final Path path;
-
+  private final FileSystem fs;
+  private final Configuration hadoopConf;
+  private final CacheConfig config;
+  private final Option<byte[]> content;
   private final Lazy<Schema> schema;
 
   // NOTE: Reader is ONLY THREAD-SAFE for {@code Scanner} operating in 
Positional Read ("pread")
   //       mode (ie created w/ "pread = true")
-  private final HFile.Reader reader;
+  // Common reader is not used for the iterators since they can be closed 
independently.
+  // Use {@link getSharedReader()} instead of accessing directly.
+  private Option<HFile.Reader> sharedReader;
   // NOTE: Scanner caches read blocks, therefore it's important to re-use 
scanner
   //       wherever possible
-  private final HFileScanner sharedScanner;
+  private Option<HFileScanner> sharedScanner;
 
-  private final Object sharedScannerLock = new Object();
+  private final Object sharedLock = new Object();
 
   public HoodieAvroHFileReader(Configuration hadoopConf, Path path, 
CacheConfig cacheConfig) throws IOException {
-    this(path,
-        HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), 
hadoopConf), path, cacheConfig, hadoopConf),
-        Option.empty());
+    this(path, FSUtils.getFs(path.toString(), hadoopConf), hadoopConf, 
cacheConfig, Option.empty());
   }
 
   public HoodieAvroHFileReader(Configuration hadoopConf, Path path, 
CacheConfig cacheConfig, FileSystem fs, Option<Schema> schemaOpt) throws 
IOException {
-    this(path, HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, 
hadoopConf), schemaOpt);
+    this(path, fs, hadoopConf, cacheConfig, schemaOpt);
+  }
+
+  public HoodieAvroHFileReader(Configuration hadoopConf, Path path, 
CacheConfig cacheConfig, FileSystem fs, byte[] content, Option<Schema> 
schemaOpt) throws IOException {
+    this(path, fs, hadoopConf, cacheConfig, schemaOpt, Option.of(content));
   }
 
-  public HoodieAvroHFileReader(FileSystem fs, Path dummyPath, byte[] content, 
Option<Schema> schemaOpt) throws IOException {
-    this(null, HoodieHFileUtils.createHFileReader(fs, dummyPath, content), 
schemaOpt);
+  public HoodieAvroHFileReader(Path path, FileSystem fs, Configuration 
hadoopConf, CacheConfig config, Option<Schema> schemaOpt) throws IOException {
+    this(path, fs, hadoopConf, config, schemaOpt, Option.empty());
   }
 
-  public HoodieAvroHFileReader(Path path, HFile.Reader reader, Option<Schema> 
schemaOpt) throws IOException {
+  public HoodieAvroHFileReader(Path path, FileSystem fs, Configuration 
hadoopConf, CacheConfig config, Option<Schema> schemaOpt, Option<byte[]> 
content) throws IOException {
     this.path = path;
-    this.reader = reader;
-    // For shared scanner, which is primarily used for point-lookups, we're 
caching blocks
-    // by default, to minimize amount of traffic to the underlying storage
-    this.sharedScanner = getHFileScanner(reader, true);
+    this.fs = fs;
+    this.hadoopConf = hadoopConf;
+    this.config = config;
+    this.content = content;
+
+    // Shared reader is instantiated lazily.
+    this.sharedReader = Option.empty();
+    this.sharedScanner = Option.empty();
     this.schema = schemaOpt.map(Lazy::eagerly)
-        .orElseGet(() -> Lazy.lazily(() -> fetchSchema(reader)));
+        .orElseGet(() -> Lazy.lazily(() -> 
fetchSchema(getSharedHFileReader())));
   }
 
   @Override
   public ClosableIterator<HoodieRecord<IndexedRecord>> 
getRecordsByKeysIterator(List<String> sortedKeys, Schema schema) throws 
IOException {
+    // Iterators do not use the shared reader or scanner
     // We're caching blocks for this scanner to minimize amount of traffic
     // to the underlying storage as we fetched (potentially) sparsely 
distributed
     // keys
+    HFile.Reader reader = getHFileReader();
     HFileScanner scanner = getHFileScanner(reader, true);
-    ClosableIterator<IndexedRecord> iterator = new 
RecordByKeyIterator(scanner, sortedKeys, getSchema(), schema);
+    ClosableIterator<IndexedRecord> iterator = new RecordByKeyIterator(reader, 
scanner, sortedKeys, getSchema(), schema);
     return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new 
HoodieAvroIndexedRecord(data)));
   }
 
   @Override
   public ClosableIterator<HoodieRecord<IndexedRecord>> 
getRecordsByKeyPrefixIterator(List<String> sortedKeyPrefixes, Schema schema) 
throws IOException {
+    // Iterators do not use the shared reader or scanner
     ClosableIterator<IndexedRecord> iterator = 
getIndexedRecordsByKeyPrefixIterator(sortedKeyPrefixes, schema);
     return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new 
HoodieAvroIndexedRecord(data)));
   }
@@ -139,7 +153,7 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
   @Override
   public String[] readMinMaxRecordKeys() {
     // NOTE: This access to reader is thread-safe
-    HFileInfo fileInfo = reader.getHFileInfo();
+    HFileInfo fileInfo = getSharedHFileReader().getHFileInfo();
     return new String[]{new String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
         new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
   }
@@ -148,8 +162,8 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
   public BloomFilter readBloomFilter() {
     try {
       // NOTE: This access to reader is thread-safe
-      HFileInfo fileInfo = reader.getHFileInfo();
-      ByteBuff buf = reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, 
false).getBufferWithoutHeader();
+      HFileInfo fileInfo = getSharedHFileReader().getHFileInfo();
+      ByteBuff buf = 
getSharedHFileReader().getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, 
false).getBufferWithoutHeader();
       // We have to copy bytes here, since we can't reuse buffer's underlying
       // array as is, since it contains additional metadata (header)
       byte[] bytes = new byte[buf.remaining()];
@@ -179,10 +193,15 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
     checkState(candidateRowKeys instanceof TreeSet,
         String.format("HFile reader expects a TreeSet as iterating over 
ordered keys is more performant, got (%s)", 
candidateRowKeys.getClass().getSimpleName()));
 
-    synchronized (sharedScannerLock) {
+    synchronized (sharedLock) {
+      if (!sharedScanner.isPresent()) {
+        // For shared scanner, which is primarily used for point-lookups, 
we're caching blocks
+        // by default, to minimize amount of traffic to the underlying storage
+        sharedScanner = Option.of(getHFileScanner(getSharedHFileReader(), 
true));
+      }
       return candidateRowKeys.stream().filter(k -> {
         try {
-          return isKeyAvailable(k, sharedScanner);
+          return isKeyAvailable(k, sharedScanner.get());
         } catch (IOException e) {
           LOG.error("Failed to check key availability: " + k);
           return false;
@@ -197,14 +216,10 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
       throw new UnsupportedOperationException("Schema projections are not 
supported in HFile reader");
     }
 
+    HFile.Reader reader = getHFileReader();
     // TODO eval whether seeking scanner would be faster than pread
-    HFileScanner scanner = null;
-    try {
-      scanner = getHFileScanner(reader, false, false);
-    } catch (IOException e) {
-      throw new HoodieIOException("Instantiation HfileScanner failed for " + 
reader.getHFileInfo().toString());
-    }
-    return new RecordIterator(scanner, getSchema(), readerSchema);
+    HFileScanner scanner = getHFileScanner(reader, false, false);
+    return new RecordIterator(reader, scanner, getSchema(), readerSchema);
   }
 
   @VisibleForTesting
@@ -212,8 +227,9 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
     // We're caching blocks for this scanner to minimize amount of traffic
     // to the underlying storage as we fetched (potentially) sparsely 
distributed
     // keys
+    HFile.Reader reader = getHFileReader();
     HFileScanner scanner = getHFileScanner(reader, true);
-    return new RecordByKeyIterator(scanner, keys, getSchema(), readerSchema);
+    return new RecordByKeyIterator(reader, scanner, keys, getSchema(), 
readerSchema);
   }
 
   @VisibleForTesting
@@ -221,27 +237,59 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
     // We're caching blocks for this scanner to minimize amount of traffic
     // to the underlying storage as we fetched (potentially) sparsely 
distributed
     // keys
+    HFile.Reader reader = getHFileReader();
     HFileScanner scanner = getHFileScanner(reader, true);
-    return new RecordByKeyPrefixIterator(scanner, sortedKeyPrefixes, 
getSchema(), readerSchema);
+    return new RecordByKeyPrefixIterator(reader, scanner, sortedKeyPrefixes, 
getSchema(), readerSchema);
   }
 
   @Override
   public long getTotalRecords() {
     // NOTE: This access to reader is thread-safe
-    return reader.getEntries();
+    return getSharedHFileReader().getEntries();
   }
 
   @Override
   public void close() {
     try {
       synchronized (this) {
-        reader.close();
+        if (sharedScanner.isPresent()) {
+          sharedScanner.get().close();
+        }
+        if (sharedReader.isPresent()) {
+          sharedReader.get().close();
+        }
       }
     } catch (IOException e) {
       throw new HoodieIOException("Error closing the hfile reader", e);
     }
   }
 
+  /**
+   * Instantiates the shared HFile reader if not instantiated
+   * @return the shared HFile reader
+   */
+  private HFile.Reader getSharedHFileReader() {
+    if (!sharedReader.isPresent()) {
+      synchronized (sharedLock) {
+        if (!sharedReader.isPresent()) {
+          sharedReader = Option.of(getHFileReader());
+        }
+      }
+    }
+    return sharedReader.get();
+  }
+
+  /**
+   * Instantiate a new reader for HFile files.
+   * @return an instance of {@link HFile.Reader}
+   */
+  private HFile.Reader getHFileReader() {
+    if (content.isPresent()) {
+      return HoodieHFileUtils.createHFileReader(fs, path, content.get());
+    }
+    return HoodieHFileUtils.createHFileReader(fs, path, config, hadoopConf);
+  }
+
   private boolean isKeyAvailable(String key, HFileScanner keyScanner) throws 
IOException {
     final KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
     return keyScanner.seekTo(kv) == 0;
@@ -437,18 +485,22 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
         .collect(Collectors.toList());
   }
 
-  private static HFileScanner getHFileScanner(HFile.Reader reader, boolean 
cacheBlocks) throws IOException {
+  private static HFileScanner getHFileScanner(HFile.Reader reader, boolean 
cacheBlocks) {
     return getHFileScanner(reader, cacheBlocks, true);
   }
 
-  private static HFileScanner getHFileScanner(HFile.Reader reader, boolean 
cacheBlocks, boolean doSeek) throws IOException {
+  private static HFileScanner getHFileScanner(HFile.Reader reader, boolean 
cacheBlocks, boolean doSeek) {
     // NOTE: Only scanners created in Positional Read ("pread") mode could 
share the same reader,
     //       since scanners in default mode will be seeking w/in the 
underlying stream
-    HFileScanner scanner = reader.getScanner(cacheBlocks, true);
-    if (doSeek) {
-      scanner.seekTo(); // places the cursor at the beginning of the first 
data block.
+    try {
+      HFileScanner scanner = reader.getScanner(cacheBlocks, true);
+      if (doSeek) {
+        scanner.seekTo(); // places the cursor at the beginning of the first 
data block.
+      }
+      return scanner;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to initialize HFile scanner for  " + 
reader.getPath(), e);
     }
-    return scanner;
   }
 
   private static Option<Schema.Field> getKeySchema(Schema schema) {
@@ -459,6 +511,7 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
     private final Iterator<String> sortedKeyPrefixesIterator;
     private Iterator<IndexedRecord> recordsIterator;
 
+    private final HFile.Reader reader;
     private final HFileScanner scanner;
 
     private final Schema writerSchema;
@@ -466,9 +519,9 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
 
     private IndexedRecord next = null;
 
-    RecordByKeyPrefixIterator(HFileScanner scanner, List<String> 
sortedKeyPrefixes, Schema writerSchema, Schema readerSchema) throws IOException 
{
+    RecordByKeyPrefixIterator(HFile.Reader reader, HFileScanner scanner, 
List<String> sortedKeyPrefixes, Schema writerSchema, Schema readerSchema) 
throws IOException {
       this.sortedKeyPrefixesIterator = sortedKeyPrefixes.iterator();
-
+      this.reader = reader;
       this.scanner = scanner;
       this.scanner.seekTo(); // position at the beginning of the file
 
@@ -508,13 +561,19 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
 
     @Override
     public void close() {
-      scanner.close();
+      try {
+        scanner.close();
+        reader.close();
+      } catch (IOException e) {
+        throw new HoodieIOException("Error closing the hfile reader and 
scanner", e);
+      }
     }
   }
 
   private static class RecordByKeyIterator implements 
ClosableIterator<IndexedRecord> {
     private final Iterator<String> sortedKeyIterator;
 
+    private final HFile.Reader reader;
     private final HFileScanner scanner;
 
     private final Schema readerSchema;
@@ -522,9 +581,9 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
 
     private IndexedRecord next = null;
 
-    RecordByKeyIterator(HFileScanner scanner, List<String> sortedKeys, Schema 
writerSchema, Schema readerSchema) throws IOException {
+    RecordByKeyIterator(HFile.Reader reader, HFileScanner scanner, 
List<String> sortedKeys, Schema writerSchema, Schema readerSchema) throws 
IOException {
       this.sortedKeyIterator = sortedKeys.iterator();
-
+      this.reader = reader;
       this.scanner = scanner;
       this.scanner.seekTo(); // position at the beginning of the file
 
@@ -562,12 +621,18 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
 
     @Override
     public void close() {
-      scanner.close();
+      try {
+        scanner.close();
+        reader.close();
+      } catch (IOException e) {
+        throw new HoodieIOException("Error closing the hfile reader and 
scanner", e);
+      }
     }
   }
 
   @Override
   public ClosableIterator<String> getRecordKeyIterator() {
+    HFile.Reader reader = getHFileReader();
     final HFileScanner scanner = reader.getScanner(false, false);
     return new ClosableIterator<String>() {
       @Override
@@ -588,12 +653,18 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
 
       @Override
       public void close() {
-        scanner.close();
+        try {
+          scanner.close();
+          reader.close();
+        } catch (IOException e) {
+          throw new HoodieIOException("Error closing the hfile reader and 
scanner", e);
+        }
       }
     };
   }
 
   private static class RecordIterator implements 
ClosableIterator<IndexedRecord> {
+    private final HFile.Reader reader;
     private final HFileScanner scanner;
 
     private final Schema writerSchema;
@@ -601,7 +672,8 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
 
     private IndexedRecord next = null;
 
-    RecordIterator(HFileScanner scanner, Schema writerSchema, Schema 
readerSchema) {
+    RecordIterator(HFile.Reader reader, HFileScanner scanner, Schema 
writerSchema, Schema readerSchema) {
+      this.reader = reader;
       this.scanner = scanner;
       this.writerSchema = writerSchema;
       this.readerSchema = readerSchema;
@@ -642,7 +714,12 @@ public class HoodieAvroHFileReader extends 
HoodieAvroFileReaderBase implements H
 
     @Override
     public void close() {
-      scanner.close();
+      try {
+        scanner.close();
+        reader.close();
+      } catch (IOException e) {
+        throw new HoodieIOException("Error closing the hfile reader and 
scanner", e);
+      }
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java
index 7e888842e66..3dc60fc84a7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.io.storage;
 
+import org.apache.hudi.exception.HoodieIOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -52,8 +54,12 @@ public class HoodieHFileUtils {
    * @throws IOException Upon error.
    */
   public static HFile.Reader createHFileReader(
-      FileSystem fs, Path path, CacheConfig cacheConfig, Configuration 
configuration) throws IOException {
-    return HFile.createReader(fs, path, cacheConfig, 
USE_PRIMARY_REPLICA_READER, configuration);
+      FileSystem fs, Path path, CacheConfig cacheConfig, Configuration 
configuration) {
+    try {
+      return HFile.createReader(fs, path, cacheConfig, 
USE_PRIMARY_REPLICA_READER, configuration);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to initialize HFile reader for  " + 
path, e);
+    }
   }
 
   /**
@@ -66,7 +72,7 @@ public class HoodieHFileUtils {
    * @throws IOException Upon error.
    */
   public static HFile.Reader createHFileReader(
-      FileSystem fs, Path dummyPath, byte[] content) throws IOException {
+      FileSystem fs, Path dummyPath, byte[] content) {
     // Avoid loading default configs, from the FS, since this configuration is 
mostly
     // used as a stub to initialize HFile reader
     Configuration conf = new Configuration(false);
@@ -81,9 +87,13 @@ public class HoodieHFileUtils {
         .withPrimaryReplicaReader(USE_PRIMARY_REPLICA_READER)
         .withReaderType(ReaderContext.ReaderType.STREAM)
         .build();
-    HFileInfo fileInfo = new HFileInfo(context, conf);
-    HFile.Reader reader = HFile.createReader(context, fileInfo, new 
CacheConfig(conf), conf);
-    fileInfo.initMetaAndIndex(reader);
-    return reader;
+    try {
+      HFileInfo fileInfo = new HFileInfo(context, conf);
+      HFile.Reader reader = HFile.createReader(context, fileInfo, new 
CacheConfig(conf), conf);
+      fileInfo.initMetaAndIndex(reader);
+      return reader;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to initialize HFile reader for  " + 
dummyPath, e);
+    }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 295f7159b78..373945975be 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -375,7 +375,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         ? reader.getRecordsByKeysIterator(sortedKeys)
         : reader.getRecordsByKeyPrefixIterator(sortedKeys);
 
-    return toStream(records)
+    Map<String, HoodieRecord<HoodieMetadataPayload>> result = toStream(records)
         .map(record -> {
           GenericRecord data = (GenericRecord) record.getData();
           return Pair.of(
@@ -383,6 +383,8 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
               composeRecord(data, partitionName));
         })
         .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+    records.close();
+    return result;
   }
 
   private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord 
avroRecord, String partitionName) {
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
index a3b4a6c1660..2fda963f8de 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
@@ -31,18 +31,18 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.hudi.io.storage.HoodieAvroHFileReader;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 public class HoodieHFileRecordReader implements RecordReader<NullWritable, 
ArrayWritable> {
 
   private long count = 0;
   private ArrayWritable valueObj;
   private HoodieAvroHFileReader reader;
-  private Iterator<HoodieRecord<IndexedRecord>> recordIterator;
+  private ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator;
   private Schema schema;
 
   public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf 
job) throws IOException {
@@ -93,6 +93,10 @@ public class HoodieHFileRecordReader implements 
RecordReader<NullWritable, Array
       reader.close();
       reader = null;
     }
+    if (recordIterator != null) {
+      recordIterator.close();
+      recordIterator = null;
+    }
   }
 
   @Override

Reply via email to