This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2009b0f44660f1d1753685a3ea64494d591aebf2 Author: Rajesh Mahindra <[email protected]> AuthorDate: Mon Aug 28 23:56:52 2023 -0700 [HUDI-6726] Fix connection leaks related to file reader and iterator close (#9539) --------- Co-authored-by: rmahindra123 <[email protected]> --- .../table/action/commit/HoodieMergeHelper.java | 5 +- .../io/storage/TestHoodieHFileReaderWriter.java | 10 +- .../bootstrap/index/HFileBootstrapIndex.java | 8 +- .../hudi/common/table/TableSchemaResolver.java | 5 +- .../table/log/block/HoodieHFileDataBlock.java | 23 +-- .../hudi/common/util/queue/SimpleExecutor.java | 6 +- .../hudi/io/storage/HoodieAvroHFileReader.java | 173 +++++++++++++++------ .../apache/hudi/io/storage/HoodieHFileUtils.java | 24 ++- .../hudi/metadata/HoodieBackedTableMetadata.java | 4 +- .../hudi/hadoop/HoodieHFileRecordReader.java | 8 +- 10 files changed, 185 insertions(+), 81 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 4df767b5e41..c1523d564e4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -123,7 +123,7 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper { // In case writer's schema is simply a projection of the reader's one we can read // the records in the projected schema directly recordSchema = isPureProjection ? writerSchema : readerSchema; - recordIterator = baseFileReader.getRecordIterator(recordSchema); + recordIterator = (ClosableIterator<HoodieRecord>) baseFileReader.getRecordIterator(recordSchema); } boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig); @@ -155,6 +155,9 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper { executor.awaitTermination(); } else { baseFileReader.close(); + if (bootstrapFileReader != null) { + bootstrapFileReader.close(); + } mergeHandle.close(); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java index 90ad0fe1a74..0d2eefa0863 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java @@ -214,8 +214,9 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { byte[] content = FileIOUtils.readAsByteArray( fs.open(getFilePath()), (int) fs.getFileStatus(getFilePath()).getLen()); // Reading byte array in HFile format, without actual file path + Configuration hadoopConf = fs.getConf(); HoodieAvroHFileReader hfileReader = - new HoodieAvroHFileReader(fs, new Path(DUMMY_BASE_PATH), content, Option.empty()); + new HoodieAvroHFileReader(hadoopConf, new Path(DUMMY_BASE_PATH), new CacheConfig(hadoopConf), fs, content, Option.empty()); Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc"); assertEquals(NUM_RECORDS, hfileReader.getTotalRecords()); verifySimpleRecords(hfileReader.getRecordIterator(avroSchema)); @@ -420,8 +421,10 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { verifyHFileReader( HoodieHFileUtils.createHFileReader(fs, new Path(DUMMY_BASE_PATH), content), hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE); + + Configuration hadoopConf = fs.getConf(); HoodieAvroHFileReader hfileReader = - new HoodieAvroHFileReader(fs, new Path(DUMMY_BASE_PATH), content, Option.empty()); + new HoodieAvroHFileReader(hadoopConf, new Path(DUMMY_BASE_PATH), new CacheConfig(hadoopConf), fs, content, Option.empty()); Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc"); assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords()); verifySimpleRecords(hfileReader.getRecordIterator(avroSchema)); @@ -429,7 +432,8 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { content = readHFileFromResources(complexHFile); verifyHFileReader(HoodieHFileUtils.createHFileReader(fs, new Path(DUMMY_BASE_PATH), content), hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE); - hfileReader = new HoodieAvroHFileReader(fs, new Path(DUMMY_BASE_PATH), content, Option.empty()); + hfileReader = + new HoodieAvroHFileReader(hadoopConf, new Path(DUMMY_BASE_PATH), new CacheConfig(hadoopConf), fs, content, Option.empty()); avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchemaWithUDT.avsc"); assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords()); verifySimpleRecords(hfileReader.getRecordIterator(avroSchema)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java index ee4eeec68d6..9b5e323e4f7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java @@ -182,12 +182,8 @@ public class HFileBootstrapIndex extends BootstrapIndex { * @param fileSystem File System */ private static HFile.Reader createReader(String hFilePath, Configuration conf, FileSystem fileSystem) { - try { - LOG.info("Opening HFile for reading :" + hFilePath); - return HoodieHFileUtils.createHFileReader(fileSystem, new HFilePathForReader(hFilePath), new CacheConfig(conf), conf); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } + LOG.info("Opening HFile for reading :" + hFilePath); + return HoodieHFileUtils.createHFileReader(fileSystem, new HFilePathForReader(hFilePath), new CacheConfig(conf), conf); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 0e7e2cd4bf2..e757affe4bd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -327,8 +327,9 @@ public class TableSchemaResolver { FileSystem fs = metaClient.getRawFs(); CacheConfig cacheConfig = new CacheConfig(fs.getConf()); - HoodieAvroHFileReader hFileReader = new HoodieAvroHFileReader(fs.getConf(), hFilePath, cacheConfig); - return convertAvroSchemaToParquet(hFileReader.getSchema()); + try (HoodieAvroHFileReader hFileReader = new HoodieAvroHFileReader(fs.getConf(), hFilePath, cacheConfig)) { + return convertAvroSchemaToParquet(hFileReader.getSchema()); + } } private MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 96436811429..a0f9d43ba39 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -172,10 +172,13 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(byte[] content, HoodieRecordType type) throws IOException { checkState(readerSchema != null, "Reader's schema has to be non-null"); - FileSystem fs = FSUtils.getFs(pathForReader.toString(), FSUtils.buildInlineConf(getBlockContentLocation().get().getHadoopConf())); + Configuration hadoopConf = FSUtils.buildInlineConf(getBlockContentLocation().get().getHadoopConf()); + FileSystem fs = FSUtils.getFs(pathForReader.toString(), hadoopConf); // Read the content - HoodieAvroHFileReader reader = new HoodieAvroHFileReader(fs, pathForReader, content, Option.of(getSchemaFromHeader())); - return unsafeCast(reader.getRecordIterator(readerSchema)); + try (HoodieAvroHFileReader reader = new HoodieAvroHFileReader(hadoopConf, pathForReader, new CacheConfig(hadoopConf), + fs, content, Option.of(getSchemaFromHeader()))) { + return unsafeCast(reader.getRecordIterator(readerSchema)); + } } // TODO abstract this w/in HoodieDataBlock @@ -193,15 +196,15 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { blockContentLoc.getContentPositionInLogFile(), blockContentLoc.getBlockSize()); - final HoodieAvroHFileReader reader = + try (final HoodieAvroHFileReader reader = new HoodieAvroHFileReader(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf), - Option.of(getSchemaFromHeader())); - - // Get writer's schema from the header - final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator = - fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema); + Option.of(getSchemaFromHeader()))) { + // Get writer's schema from the header + final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator = + fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema); - return new CloseableMappingIterator<>(recordIterator, data -> (HoodieRecord<T>) data); + return new CloseableMappingIterator<>(recordIterator, data -> (HoodieRecord<T>) data); + } } private byte[] serializeRecord(HoodieRecord<?> record, Schema schema) throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java index 10cb5240899..86512333ec4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.util.queue; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieException; import org.slf4j.Logger; @@ -77,7 +78,10 @@ public class SimpleExecutor<I, O, E> implements HoodieExecutor<E> { @Override public void shutdownNow() { - // no-op + // Consumer is already closed when the execution completes + if (itr instanceof ClosableIterator) { + ((ClosableIterator<I>) itr).close(); + } } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java index 3d6533a3429..c26ac6d1a48 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java @@ -84,54 +84,68 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H private static final Logger LOG = LoggerFactory.getLogger(HoodieAvroHFileReader.class); private final Path path; - + private final FileSystem fs; + private final Configuration hadoopConf; + private final CacheConfig config; + private final Option<byte[]> content; private final Lazy<Schema> schema; // NOTE: Reader is ONLY THREAD-SAFE for {@code Scanner} operating in Positional Read ("pread") // mode (ie created w/ "pread = true") - private final HFile.Reader reader; + // Common reader is not used for the iterators since they can be closed independently. + // Use {@link getSharedReader()} instead of accessing directly. + private Option<HFile.Reader> sharedReader; // NOTE: Scanner caches read blocks, therefore it's important to re-use scanner // wherever possible - private final HFileScanner sharedScanner; + private Option<HFileScanner> sharedScanner; - private final Object sharedScannerLock = new Object(); + private final Object sharedLock = new Object(); public HoodieAvroHFileReader(Configuration hadoopConf, Path path, CacheConfig cacheConfig) throws IOException { - this(path, - HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), hadoopConf), path, cacheConfig, hadoopConf), - Option.empty()); + this(path, FSUtils.getFs(path.toString(), hadoopConf), hadoopConf, cacheConfig, Option.empty()); } public HoodieAvroHFileReader(Configuration hadoopConf, Path path, CacheConfig cacheConfig, FileSystem fs, Option<Schema> schemaOpt) throws IOException { - this(path, HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, hadoopConf), schemaOpt); + this(path, fs, hadoopConf, cacheConfig, schemaOpt); + } + + public HoodieAvroHFileReader(Configuration hadoopConf, Path path, CacheConfig cacheConfig, FileSystem fs, byte[] content, Option<Schema> schemaOpt) throws IOException { + this(path, fs, hadoopConf, cacheConfig, schemaOpt, Option.of(content)); } - public HoodieAvroHFileReader(FileSystem fs, Path dummyPath, byte[] content, Option<Schema> schemaOpt) throws IOException { - this(null, HoodieHFileUtils.createHFileReader(fs, dummyPath, content), schemaOpt); + public HoodieAvroHFileReader(Path path, FileSystem fs, Configuration hadoopConf, CacheConfig config, Option<Schema> schemaOpt) throws IOException { + this(path, fs, hadoopConf, config, schemaOpt, Option.empty()); } - public HoodieAvroHFileReader(Path path, HFile.Reader reader, Option<Schema> schemaOpt) throws IOException { + public HoodieAvroHFileReader(Path path, FileSystem fs, Configuration hadoopConf, CacheConfig config, Option<Schema> schemaOpt, Option<byte[]> content) throws IOException { this.path = path; - this.reader = reader; - // For shared scanner, which is primarily used for point-lookups, we're caching blocks - // by default, to minimize amount of traffic to the underlying storage - this.sharedScanner = getHFileScanner(reader, true); + this.fs = fs; + this.hadoopConf = hadoopConf; + this.config = config; + this.content = content; + + // Shared reader is instantiated lazily. + this.sharedReader = Option.empty(); + this.sharedScanner = Option.empty(); this.schema = schemaOpt.map(Lazy::eagerly) - .orElseGet(() -> Lazy.lazily(() -> fetchSchema(reader))); + .orElseGet(() -> Lazy.lazily(() -> fetchSchema(getSharedHFileReader()))); } @Override public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeysIterator(List<String> sortedKeys, Schema schema) throws IOException { + // Iterators do not use the shared reader or scanner // We're caching blocks for this scanner to minimize amount of traffic // to the underlying storage as we fetched (potentially) sparsely distributed // keys + HFile.Reader reader = getHFileReader(); HFileScanner scanner = getHFileScanner(reader, true); - ClosableIterator<IndexedRecord> iterator = new RecordByKeyIterator(scanner, sortedKeys, getSchema(), schema); + ClosableIterator<IndexedRecord> iterator = new RecordByKeyIterator(reader, scanner, sortedKeys, getSchema(), schema); return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); } @Override public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeyPrefixIterator(List<String> sortedKeyPrefixes, Schema schema) throws IOException { + // Iterators do not use the shared reader or scanner ClosableIterator<IndexedRecord> iterator = getIndexedRecordsByKeyPrefixIterator(sortedKeyPrefixes, schema); return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); } @@ -139,7 +153,7 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H @Override public String[] readMinMaxRecordKeys() { // NOTE: This access to reader is thread-safe - HFileInfo fileInfo = reader.getHFileInfo(); + HFileInfo fileInfo = getSharedHFileReader().getHFileInfo(); return new String[]{new String(fileInfo.get(KEY_MIN_RECORD.getBytes())), new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))}; } @@ -148,8 +162,8 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H public BloomFilter readBloomFilter() { try { // NOTE: This access to reader is thread-safe - HFileInfo fileInfo = reader.getHFileInfo(); - ByteBuff buf = reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false).getBufferWithoutHeader(); + HFileInfo fileInfo = getSharedHFileReader().getHFileInfo(); + ByteBuff buf = getSharedHFileReader().getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false).getBufferWithoutHeader(); // We have to copy bytes here, since we can't reuse buffer's underlying // array as is, since it contains additional metadata (header) byte[] bytes = new byte[buf.remaining()]; @@ -179,10 +193,15 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H checkState(candidateRowKeys instanceof TreeSet, String.format("HFile reader expects a TreeSet as iterating over ordered keys is more performant, got (%s)", candidateRowKeys.getClass().getSimpleName())); - synchronized (sharedScannerLock) { + synchronized (sharedLock) { + if (!sharedScanner.isPresent()) { + // For shared scanner, which is primarily used for point-lookups, we're caching blocks + // by default, to minimize amount of traffic to the underlying storage + sharedScanner = Option.of(getHFileScanner(getSharedHFileReader(), true)); + } return candidateRowKeys.stream().filter(k -> { try { - return isKeyAvailable(k, sharedScanner); + return isKeyAvailable(k, sharedScanner.get()); } catch (IOException e) { LOG.error("Failed to check key availability: " + k); return false; @@ -197,14 +216,10 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H throw new UnsupportedOperationException("Schema projections are not supported in HFile reader"); } + HFile.Reader reader = getHFileReader(); // TODO eval whether seeking scanner would be faster than pread - HFileScanner scanner = null; - try { - scanner = getHFileScanner(reader, false, false); - } catch (IOException e) { - throw new HoodieIOException("Instantiation HfileScanner failed for " + reader.getHFileInfo().toString()); - } - return new RecordIterator(scanner, getSchema(), readerSchema); + HFileScanner scanner = getHFileScanner(reader, false, false); + return new RecordIterator(reader, scanner, getSchema(), readerSchema); } @VisibleForTesting @@ -212,8 +227,9 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H // We're caching blocks for this scanner to minimize amount of traffic // to the underlying storage as we fetched (potentially) sparsely distributed // keys + HFile.Reader reader = getHFileReader(); HFileScanner scanner = getHFileScanner(reader, true); - return new RecordByKeyIterator(scanner, keys, getSchema(), readerSchema); + return new RecordByKeyIterator(reader, scanner, keys, getSchema(), readerSchema); } @VisibleForTesting @@ -221,27 +237,59 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H // We're caching blocks for this scanner to minimize amount of traffic // to the underlying storage as we fetched (potentially) sparsely distributed // keys + HFile.Reader reader = getHFileReader(); HFileScanner scanner = getHFileScanner(reader, true); - return new RecordByKeyPrefixIterator(scanner, sortedKeyPrefixes, getSchema(), readerSchema); + return new RecordByKeyPrefixIterator(reader, scanner, sortedKeyPrefixes, getSchema(), readerSchema); } @Override public long getTotalRecords() { // NOTE: This access to reader is thread-safe - return reader.getEntries(); + return getSharedHFileReader().getEntries(); } @Override public void close() { try { synchronized (this) { - reader.close(); + if (sharedScanner.isPresent()) { + sharedScanner.get().close(); + } + if (sharedReader.isPresent()) { + sharedReader.get().close(); + } } } catch (IOException e) { throw new HoodieIOException("Error closing the hfile reader", e); } } + /** + * Instantiates the shared HFile reader if not instantiated + * @return the shared HFile reader + */ + private HFile.Reader getSharedHFileReader() { + if (!sharedReader.isPresent()) { + synchronized (sharedLock) { + if (!sharedReader.isPresent()) { + sharedReader = Option.of(getHFileReader()); + } + } + } + return sharedReader.get(); + } + + /** + * Instantiate a new reader for HFile files. + * @return an instance of {@link HFile.Reader} + */ + private HFile.Reader getHFileReader() { + if (content.isPresent()) { + return HoodieHFileUtils.createHFileReader(fs, path, content.get()); + } + return HoodieHFileUtils.createHFileReader(fs, path, config, hadoopConf); + } + private boolean isKeyAvailable(String key, HFileScanner keyScanner) throws IOException { final KeyValue kv = new KeyValue(key.getBytes(), null, null, null); return keyScanner.seekTo(kv) == 0; @@ -437,18 +485,22 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H .collect(Collectors.toList()); } - private static HFileScanner getHFileScanner(HFile.Reader reader, boolean cacheBlocks) throws IOException { + private static HFileScanner getHFileScanner(HFile.Reader reader, boolean cacheBlocks) { return getHFileScanner(reader, cacheBlocks, true); } - private static HFileScanner getHFileScanner(HFile.Reader reader, boolean cacheBlocks, boolean doSeek) throws IOException { + private static HFileScanner getHFileScanner(HFile.Reader reader, boolean cacheBlocks, boolean doSeek) { // NOTE: Only scanners created in Positional Read ("pread") mode could share the same reader, // since scanners in default mode will be seeking w/in the underlying stream - HFileScanner scanner = reader.getScanner(cacheBlocks, true); - if (doSeek) { - scanner.seekTo(); // places the cursor at the beginning of the first data block. + try { + HFileScanner scanner = reader.getScanner(cacheBlocks, true); + if (doSeek) { + scanner.seekTo(); // places the cursor at the beginning of the first data block. + } + return scanner; + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize HFile scanner for " + reader.getPath(), e); } - return scanner; } private static Option<Schema.Field> getKeySchema(Schema schema) { @@ -459,6 +511,7 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H private final Iterator<String> sortedKeyPrefixesIterator; private Iterator<IndexedRecord> recordsIterator; + private final HFile.Reader reader; private final HFileScanner scanner; private final Schema writerSchema; @@ -466,9 +519,9 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H private IndexedRecord next = null; - RecordByKeyPrefixIterator(HFileScanner scanner, List<String> sortedKeyPrefixes, Schema writerSchema, Schema readerSchema) throws IOException { + RecordByKeyPrefixIterator(HFile.Reader reader, HFileScanner scanner, List<String> sortedKeyPrefixes, Schema writerSchema, Schema readerSchema) throws IOException { this.sortedKeyPrefixesIterator = sortedKeyPrefixes.iterator(); - + this.reader = reader; this.scanner = scanner; this.scanner.seekTo(); // position at the beginning of the file @@ -508,13 +561,19 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H @Override public void close() { - scanner.close(); + try { + scanner.close(); + reader.close(); + } catch (IOException e) { + throw new HoodieIOException("Error closing the hfile reader and scanner", e); + } } } private static class RecordByKeyIterator implements ClosableIterator<IndexedRecord> { private final Iterator<String> sortedKeyIterator; + private final HFile.Reader reader; private final HFileScanner scanner; private final Schema readerSchema; @@ -522,9 +581,9 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H private IndexedRecord next = null; - RecordByKeyIterator(HFileScanner scanner, List<String> sortedKeys, Schema writerSchema, Schema readerSchema) throws IOException { + RecordByKeyIterator(HFile.Reader reader, HFileScanner scanner, List<String> sortedKeys, Schema writerSchema, Schema readerSchema) throws IOException { this.sortedKeyIterator = sortedKeys.iterator(); - + this.reader = reader; this.scanner = scanner; this.scanner.seekTo(); // position at the beginning of the file @@ -562,12 +621,18 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H @Override public void close() { - scanner.close(); + try { + scanner.close(); + reader.close(); + } catch (IOException e) { + throw new HoodieIOException("Error closing the hfile reader and scanner", e); + } } } @Override public ClosableIterator<String> getRecordKeyIterator() { + HFile.Reader reader = getHFileReader(); final HFileScanner scanner = reader.getScanner(false, false); return new ClosableIterator<String>() { @Override @@ -588,12 +653,18 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H @Override public void close() { - scanner.close(); + try { + scanner.close(); + reader.close(); + } catch (IOException e) { + throw new HoodieIOException("Error closing the hfile reader and scanner", e); + } } }; } private static class RecordIterator implements ClosableIterator<IndexedRecord> { + private final HFile.Reader reader; private final HFileScanner scanner; private final Schema writerSchema; @@ -601,7 +672,8 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H private IndexedRecord next = null; - RecordIterator(HFileScanner scanner, Schema writerSchema, Schema readerSchema) { + RecordIterator(HFile.Reader reader, HFileScanner scanner, Schema writerSchema, Schema readerSchema) { + this.reader = reader; this.scanner = scanner; this.writerSchema = writerSchema; this.readerSchema = readerSchema; @@ -642,7 +714,12 @@ public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements H @Override public void close() { - scanner.close(); + try { + scanner.close(); + reader.close(); + } catch (IOException e) { + throw new HoodieIOException("Error closing the hfile reader and scanner", e); + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java index 7e888842e66..3dc60fc84a7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java @@ -19,6 +19,8 @@ package org.apache.hudi.io.storage; +import org.apache.hudi.exception.HoodieIOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -52,8 +54,12 @@ public class HoodieHFileUtils { * @throws IOException Upon error. */ public static HFile.Reader createHFileReader( - FileSystem fs, Path path, CacheConfig cacheConfig, Configuration configuration) throws IOException { - return HFile.createReader(fs, path, cacheConfig, USE_PRIMARY_REPLICA_READER, configuration); + FileSystem fs, Path path, CacheConfig cacheConfig, Configuration configuration) { + try { + return HFile.createReader(fs, path, cacheConfig, USE_PRIMARY_REPLICA_READER, configuration); + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize HFile reader for " + path, e); + } } /** @@ -66,7 +72,7 @@ public class HoodieHFileUtils { * @throws IOException Upon error. */ public static HFile.Reader createHFileReader( - FileSystem fs, Path dummyPath, byte[] content) throws IOException { + FileSystem fs, Path dummyPath, byte[] content) { // Avoid loading default configs, from the FS, since this configuration is mostly // used as a stub to initialize HFile reader Configuration conf = new Configuration(false); @@ -81,9 +87,13 @@ public class HoodieHFileUtils { .withPrimaryReplicaReader(USE_PRIMARY_REPLICA_READER) .withReaderType(ReaderContext.ReaderType.STREAM) .build(); - HFileInfo fileInfo = new HFileInfo(context, conf); - HFile.Reader reader = HFile.createReader(context, fileInfo, new CacheConfig(conf), conf); - fileInfo.initMetaAndIndex(reader); - return reader; + try { + HFileInfo fileInfo = new HFileInfo(context, conf); + HFile.Reader reader = HFile.createReader(context, fileInfo, new CacheConfig(conf), conf); + fileInfo.initMetaAndIndex(reader); + return reader; + } catch (IOException e) { + throw new HoodieIOException("Failed to initialize HFile reader for " + dummyPath, e); + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 295f7159b78..373945975be 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -375,7 +375,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { ? reader.getRecordsByKeysIterator(sortedKeys) : reader.getRecordsByKeyPrefixIterator(sortedKeys); - return toStream(records) + Map<String, HoodieRecord<HoodieMetadataPayload>> result = toStream(records) .map(record -> { GenericRecord data = (GenericRecord) record.getData(); return Pair.of( @@ -383,6 +383,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { composeRecord(data, partitionName)); }) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + records.close(); + return result; } private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord avroRecord, String partitionName) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java index a3b4a6c1660..2fda963f8de 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java @@ -31,18 +31,18 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.hudi.io.storage.HoodieAvroHFileReader; import java.io.IOException; -import java.util.Iterator; public class HoodieHFileRecordReader implements RecordReader<NullWritable, ArrayWritable> { private long count = 0; private ArrayWritable valueObj; private HoodieAvroHFileReader reader; - private Iterator<HoodieRecord<IndexedRecord>> recordIterator; + private ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator; private Schema schema; public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf job) throws IOException { @@ -93,6 +93,10 @@ public class HoodieHFileRecordReader implements RecordReader<NullWritable, Array reader.close(); reader = null; } + if (recordIterator != null) { + recordIterator.close(); + recordIterator = null; + } } @Override
