This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 78fc62554ce [HUDI-7633] Use try with resources for AutoCloseable (#11045) 78fc62554ce is described below commit 78fc62554ce3798eb803332b42f84f9cfa74e526 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Wed Apr 17 21:31:44 2024 -0700 [HUDI-7633] Use try with resources for AutoCloseable (#11045) --- .../hudi/cli/commands/ArchivedCommitsCommand.java | 104 ++++++++-------- .../apache/hudi/cli/commands/ExportCommand.java | 93 +++++++------- .../hudi/cli/commands/HoodieLogFileCommand.java | 104 ++++++++-------- .../org/apache/hudi/cli/commands/TableCommand.java | 6 +- .../metadata/HoodieBackedTableMetadataWriter.java | 8 +- .../hudi/common/model/HoodiePartitionMetadata.java | 8 +- .../hudi/common/table/log/LogReaderUtils.java | 22 ++-- .../table/log/block/HoodieAvroDataBlock.java | 135 ++++++++++----------- .../hudi/common/util/SerializationUtils.java | 6 +- .../hudi/metadata/HoodieBackedTableMetadata.java | 24 ++-- .../java/HoodieJavaWriteClientExample.java | 70 +++++------ .../examples/spark/HoodieWriteClientExample.java | 90 +++++++------- .../org/apache/hudi/common/util/FileIOUtils.java | 14 +-- .../hudi/utilities/HoodieCompactionAdminTool.java | 9 +- .../utilities/streamer/SchedulerConfGenerator.java | 6 +- 15 files changed, 344 insertions(+), 355 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index 075a57d541c..5c57c8f5288 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -114,47 +114,46 @@ public class ArchivedCommitsCommand { List<Comparable[]> allStats = new ArrayList<>(); for (FileStatus fs : fsStatuses) { // read the archived file - Reader reader = HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath, HoodieCLI.conf), - new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); - - List<IndexedRecord> readRecords = new ArrayList<>(); - // read the avro blocks - while (reader.hasNext()) { - HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); - blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData())); + try (Reader reader = HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath, HoodieCLI.conf), + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) { + List<IndexedRecord> readRecords = new ArrayList<>(); + // read the avro blocks + while (reader.hasNext()) { + HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); + blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData())); + } + List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r) + .filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION) + || r.get("actionType").toString().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) + .flatMap(r -> { + HoodieCommitMetadata metadata = (HoodieCommitMetadata) SpecificData.get() + .deepCopy(HoodieCommitMetadata.SCHEMA$, r.get("hoodieCommitMetadata")); + final String instantTime = r.get("commitTime").toString(); + final String action = r.get("actionType").toString(); + return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> hoodieWriteStats.stream().map(hoodieWriteStat -> { + List<Comparable> row = new ArrayList<>(); + row.add(action); + row.add(instantTime); + row.add(hoodieWriteStat.getPartitionPath()); + row.add(hoodieWriteStat.getFileId()); + row.add(hoodieWriteStat.getPrevCommit()); + row.add(hoodieWriteStat.getNumWrites()); + row.add(hoodieWriteStat.getNumInserts()); + row.add(hoodieWriteStat.getNumDeletes()); + row.add(hoodieWriteStat.getNumUpdateWrites()); + row.add(hoodieWriteStat.getTotalLogFiles()); + row.add(hoodieWriteStat.getTotalLogBlocks()); + row.add(hoodieWriteStat.getTotalCorruptLogBlock()); + row.add(hoodieWriteStat.getTotalRollbackBlocks()); + row.add(hoodieWriteStat.getTotalLogRecords()); + row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted()); + row.add(hoodieWriteStat.getTotalWriteBytes()); + row.add(hoodieWriteStat.getTotalWriteErrors()); + return row; + })).map(rowList -> rowList.toArray(new Comparable[0])); + }).collect(Collectors.toList()); + allStats.addAll(readCommits); } - List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r) - .filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION) - || r.get("actionType").toString().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) - .flatMap(r -> { - HoodieCommitMetadata metadata = (HoodieCommitMetadata) SpecificData.get() - .deepCopy(HoodieCommitMetadata.SCHEMA$, r.get("hoodieCommitMetadata")); - final String instantTime = r.get("commitTime").toString(); - final String action = r.get("actionType").toString(); - return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> hoodieWriteStats.stream().map(hoodieWriteStat -> { - List<Comparable> row = new ArrayList<>(); - row.add(action); - row.add(instantTime); - row.add(hoodieWriteStat.getPartitionPath()); - row.add(hoodieWriteStat.getFileId()); - row.add(hoodieWriteStat.getPrevCommit()); - row.add(hoodieWriteStat.getNumWrites()); - row.add(hoodieWriteStat.getNumInserts()); - row.add(hoodieWriteStat.getNumDeletes()); - row.add(hoodieWriteStat.getNumUpdateWrites()); - row.add(hoodieWriteStat.getTotalLogFiles()); - row.add(hoodieWriteStat.getTotalLogBlocks()); - row.add(hoodieWriteStat.getTotalCorruptLogBlock()); - row.add(hoodieWriteStat.getTotalRollbackBlocks()); - row.add(hoodieWriteStat.getTotalLogRecords()); - row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted()); - row.add(hoodieWriteStat.getTotalWriteBytes()); - row.add(hoodieWriteStat.getTotalWriteErrors()); - return row; - })).map(rowList -> rowList.toArray(new Comparable[0])); - }).collect(Collectors.toList()); - allStats.addAll(readCommits); - reader.close(); } TableHeader header = new TableHeader().addTableHeaderField("action").addTableHeaderField("instant") .addTableHeaderField("partition").addTableHeaderField("file_id").addTableHeaderField("prev_instant") @@ -188,21 +187,20 @@ public class ArchivedCommitsCommand { List<Comparable[]> allCommits = new ArrayList<>(); for (FileStatus fs : fsStatuses) { // read the archived file - HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath, HoodieCLI.conf), - new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); - - List<IndexedRecord> readRecords = new ArrayList<>(); - // read the avro blocks - while (reader.hasNext()) { - HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); - try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) { - recordItr.forEachRemaining(r -> readRecords.add(r.getData())); + try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath, HoodieCLI.conf), + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) { + List<IndexedRecord> readRecords = new ArrayList<>(); + // read the avro blocks + while (reader.hasNext()) { + HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); + try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) { + recordItr.forEachRemaining(r -> readRecords.add(r.getData())); + } } + List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r) + .map(r -> readCommit(r, skipMetadata)).collect(Collectors.toList()); + allCommits.addAll(readCommits); } - List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r) - .map(r -> readCommit(r, skipMetadata)).collect(Collectors.toList()); - allCommits.addAll(readCommits); - reader.close(); } TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("CommitType"); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java index effa096bfa9..eda0d0de219 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java @@ -125,57 +125,56 @@ public class ExportCommand { for (FileStatus fs : statuses) { // read the archived file - Reader reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); - - // read the avro blocks - while (reader.hasNext() && copyCount++ < limit) { - HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); - try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) { - while (recordItr.hasNext()) { - IndexedRecord ir = recordItr.next().getData(); - // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the - // metadata record from the entry and convert it to json. - HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get() - .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir); - final String action = archiveEntryRecord.get("actionType").toString(); - if (!actionSet.contains(action)) { - continue; + try (Reader reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) { + + // read the avro blocks + while (reader.hasNext() && copyCount++ < limit) { + HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); + try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) { + while (recordItr.hasNext()) { + IndexedRecord ir = recordItr.next().getData(); + // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the + // metadata record from the entry and convert it to json. + HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get() + .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir); + final String action = archiveEntryRecord.get("actionType").toString(); + if (!actionSet.contains(action)) { + continue; + } + + GenericRecord metadata = null; + switch (action) { + case HoodieTimeline.CLEAN_ACTION: + metadata = archiveEntryRecord.getHoodieCleanMetadata(); + break; + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.DELTA_COMMIT_ACTION: + metadata = archiveEntryRecord.getHoodieCommitMetadata(); + break; + case HoodieTimeline.ROLLBACK_ACTION: + metadata = archiveEntryRecord.getHoodieRollbackMetadata(); + break; + case HoodieTimeline.SAVEPOINT_ACTION: + metadata = archiveEntryRecord.getHoodieSavePointMetadata(); + break; + case HoodieTimeline.COMPACTION_ACTION: + metadata = archiveEntryRecord.getHoodieCompactionMetadata(); + break; + default: + throw new HoodieException("Unknown type of action " + action); + } + + final String instantTime = archiveEntryRecord.get("commitTime").toString(); + if (metadata == null) { + LOG.error("Could not load metadata for action " + action + " at instant time " + instantTime); + continue; + } + final String outPath = localFolder + StoragePath.SEPARATOR + instantTime + "." + action; + writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true)); } - - GenericRecord metadata = null; - switch (action) { - case HoodieTimeline.CLEAN_ACTION: - metadata = archiveEntryRecord.getHoodieCleanMetadata(); - break; - case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.DELTA_COMMIT_ACTION: - metadata = archiveEntryRecord.getHoodieCommitMetadata(); - break; - case HoodieTimeline.ROLLBACK_ACTION: - metadata = archiveEntryRecord.getHoodieRollbackMetadata(); - break; - case HoodieTimeline.SAVEPOINT_ACTION: - metadata = archiveEntryRecord.getHoodieSavePointMetadata(); - break; - case HoodieTimeline.COMPACTION_ACTION: - metadata = archiveEntryRecord.getHoodieCompactionMetadata(); - break; - default: - throw new HoodieException("Unknown type of action " + action); - } - - final String instantTime = archiveEntryRecord.get("commitTime").toString(); - if (metadata == null) { - LOG.error("Could not load metadata for action " + action + " at instant time " + instantTime); - continue; - } - final String outPath = localFolder + StoragePath.SEPARATOR + instantTime + "." + action; - writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true)); } } } - - reader.close(); } return copyCount; diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 138a3e376b2..fc1a4b4ec42 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -114,52 +114,52 @@ public class HoodieLogFileCommand { MessageType schema = TableSchemaResolver.readSchemaFromLogFile(fs, path); Schema writerSchema = schema != null ? new AvroSchemaConverter().convert(Objects.requireNonNull(schema)) : null; - Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema); + try (Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema)) { - // read the avro blocks - while (reader.hasNext()) { - HoodieLogBlock n = reader.next(); - String instantTime; - AtomicInteger recordCount = new AtomicInteger(0); - if (n instanceof HoodieCorruptBlock) { - try { + // read the avro blocks + while (reader.hasNext()) { + HoodieLogBlock n = reader.next(); + String instantTime; + AtomicInteger recordCount = new AtomicInteger(0); + if (n instanceof HoodieCorruptBlock) { + try { + instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME); + if (instantTime == null) { + throw new Exception("Invalid instant time " + instantTime); + } + } catch (Exception e) { + numCorruptBlocks++; + instantTime = "corrupt_block_" + numCorruptBlocks; + // could not read metadata for corrupt block + } + } else { instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME); if (instantTime == null) { - throw new Exception("Invalid instant time " + instantTime); + // This can happen when reading archived commit files since they were written without any instant time + dummyInstantTimeCount++; + instantTime = "dummy_instant_time_" + dummyInstantTimeCount; } - } catch (Exception e) { - numCorruptBlocks++; - instantTime = "corrupt_block_" + numCorruptBlocks; - // could not read metadata for corrupt block - } - } else { - instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME); - if (instantTime == null) { - // This can happen when reading archived commit files since they were written without any instant time - dummyInstantTimeCount++; - instantTime = "dummy_instant_time_" + dummyInstantTimeCount; - } - if (n instanceof HoodieDataBlock) { - try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = ((HoodieDataBlock) n).getRecordIterator(HoodieRecordType.AVRO)) { - recordItr.forEachRemaining(r -> recordCount.incrementAndGet()); + if (n instanceof HoodieDataBlock) { + try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = ((HoodieDataBlock) n).getRecordIterator(HoodieRecordType.AVRO)) { + recordItr.forEachRemaining(r -> recordCount.incrementAndGet()); + } } } - } - if (commitCountAndMetadata.containsKey(instantTime)) { - commitCountAndMetadata.get(instantTime).add( - new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()), - new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get())); - } else { - List<Tuple3<Tuple2<String, HoodieLogBlockType>, Tuple2<Map<HeaderMetadataType, String>, - Map<HeaderMetadataType, String>>, Integer>> list = - new ArrayList<>(); - list.add( - new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()), - new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get())); - commitCountAndMetadata.put(instantTime, list); + if (commitCountAndMetadata.containsKey(instantTime)) { + commitCountAndMetadata.get(instantTime).add( + new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()), + new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get())); + } else { + List<Tuple3<Tuple2<String, HoodieLogBlockType>, Tuple2<Map<HeaderMetadataType, String>, + Map<HeaderMetadataType, String>>, Integer>> list = + new ArrayList<>(); + list.add( + new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()), + new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get())); + commitCountAndMetadata.put(instantTime, list); + } } } - reader.close(); } List<Comparable[]> rows = new ArrayList<>(); ObjectMapper objectMapper = new ObjectMapper(); @@ -260,23 +260,23 @@ public class HoodieLogFileCommand { MessageType schema = TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new CachingPath(logFile)); Schema writerSchema = schema != null ? new AvroSchemaConverter().convert(Objects.requireNonNull(schema)) : null; - HoodieLogFormat.Reader reader = - HoodieLogFormat.newReader(fs, new HoodieLogFile(new CachingPath(logFile)), writerSchema); - // read the avro blocks - while (reader.hasNext()) { - HoodieLogBlock n = reader.next(); - if (n instanceof HoodieDataBlock) { - HoodieDataBlock blk = (HoodieDataBlock) n; - try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) { - recordItr.forEachRemaining(record -> { - if (allRecords.size() < limit) { - allRecords.add(record.getData()); - } - }); + try (HoodieLogFormat.Reader reader = + HoodieLogFormat.newReader(fs, new HoodieLogFile(new CachingPath(logFile)), writerSchema)) { + // read the avro blocks + while (reader.hasNext()) { + HoodieLogBlock n = reader.next(); + if (n instanceof HoodieDataBlock) { + HoodieDataBlock blk = (HoodieDataBlock) n; + try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) { + recordItr.forEachRemaining(record -> { + if (allRecords.size() < limit) { + allRecords.add(record.getData()); + } + }); + } } } } - reader.close(); if (allRecords.size() >= limit) { break; } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java index e3c3e810c5c..b6256f74006 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java @@ -403,12 +403,8 @@ public class TableCommand { if (outFile.exists()) { outFile.delete(); } - OutputStream os = null; - try { - os = new FileOutputStream(outFile); + try (OutputStream os = new FileOutputStream(outFile)) { os.write(getUTF8Bytes(data), 0, data.length()); - } finally { - os.close(); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index a541de03cb3..7b175db53a5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -781,7 +781,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM final HoodieDeleteBlock block = new HoodieDeleteBlock(Collections.emptyList(), false, blockHeader); - HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder() + try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), partitionName)) .withFileId(fileGroupFileId) .withDeltaCommit(instantTime) @@ -791,9 +791,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM .withFs(dataMetaClient.getFs()) .withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN) .withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - writer.appendBlock(block); - writer.close(); + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build()) { + writer.appendBlock(block); + } } catch (InterruptedException e) { throw new HoodieException(String.format("Failed to created fileGroup %s for partition %s", fileGroupFileId, partitionName), e); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java index bbf505c8670..d84a529a084 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java @@ -141,10 +141,10 @@ public class HoodiePartitionMetadata { BaseFileUtils.getInstance(format.get()).writeMetaFile(fs, filePath, props); } else { // Backwards compatible properties file format - OutputStream os = fs.create(filePath, true); - props.store(os, "partition metadata"); - os.flush(); - os.close(); + try (OutputStream os = fs.create(filePath, true)) { + props.store(os, "partition metadata"); + os.flush(); + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java index 653c8211556..203c38b93f9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java @@ -53,21 +53,21 @@ public class LogReaderUtils { private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, HoodieLogFile hoodieLogFile) throws IOException { // set length for the HoodieLogFile as it will be leveraged by HoodieLogFormat.Reader with reverseReading enabled - Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, true); Schema writerSchema = null; - HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); - while (reader.hasPrev()) { - HoodieLogBlock block = reader.prev(); - if (block instanceof HoodieDataBlock) { - HoodieDataBlock lastBlock = (HoodieDataBlock) block; - if (completedTimeline - .containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME))) { - writerSchema = new Schema.Parser().parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); - break; + try (Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, true)) { + HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); + while (reader.hasPrev()) { + HoodieLogBlock block = reader.prev(); + if (block instanceof HoodieDataBlock) { + HoodieDataBlock lastBlock = (HoodieDataBlock) block; + if (completedTimeline + .containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME))) { + writerSchema = new Schema.Parser().parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); + break; + } } } } - reader.close(); return writerSchema; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index 590d9a17a0d..5ba9e1906b8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -104,38 +104,37 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { Schema schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream output = new DataOutputStream(baos); - - // 1. Write out the log block version - output.writeInt(HoodieLogBlock.version); - - // 2. Write total number of records - output.writeInt(records.size()); - - // 3. Write the records - for (HoodieRecord<?> s : records) { - ByteArrayOutputStream temp = new ByteArrayOutputStream(); - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get()); - encoderCache.set(encoder); - try { - // Encode the record into bytes - // Spark Record not support write avro log - IndexedRecord data = s.toIndexedRecord(schema, new Properties()).get().getData(); - writer.write(data, encoder); - encoder.flush(); - - // Get the size of the bytes - int size = temp.toByteArray().length; - // Write the record size - output.writeInt(size); - // Write the content - output.write(temp.toByteArray()); - } catch (IOException e) { - throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e); + try (DataOutputStream output = new DataOutputStream(baos)) { + // 1. Write out the log block version + output.writeInt(HoodieLogBlock.version); + + // 2. Write total number of records + output.writeInt(records.size()); + + // 3. Write the records + for (HoodieRecord<?> s : records) { + ByteArrayOutputStream temp = new ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get()); + encoderCache.set(encoder); + try { + // Encode the record into bytes + // Spark Record not support write avro log + IndexedRecord data = s.toIndexedRecord(schema, new Properties()).get().getData(); + writer.write(data, encoder); + encoder.flush(); + + // Get the size of the bytes + int size = temp.toByteArray().length; + // Write the record size + output.writeInt(size); + // Write the content + output.write(temp.toByteArray()); + } catch (IOException e) { + throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e); + } } + encoderCache.remove(); } - encoderCache.remove(); - output.close(); return baos.toByteArray(); } @@ -287,9 +286,9 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { private static byte[] compress(String text) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { - OutputStream out = new DeflaterOutputStream(baos); - out.write(getUTF8Bytes(text)); - out.close(); + try (OutputStream out = new DeflaterOutputStream(baos)) { + out.write(getUTF8Bytes(text)); + } } catch (IOException e) { throw new HoodieIOException("IOException while compressing text " + text, e); } @@ -316,45 +315,43 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream output = new DataOutputStream(baos); - - // 1. Compress and Write schema out - byte[] schemaContent = compress(schema.toString()); - output.writeInt(schemaContent.length); - output.write(schemaContent); - - List<HoodieRecord<?>> records = new ArrayList<>(); - try (ClosableIterator<HoodieRecord<Object>> recordItr = getRecordIterator(HoodieRecordType.AVRO)) { - recordItr.forEachRemaining(records::add); - } - - // 2. Write total number of records - output.writeInt(records.size()); + try (DataOutputStream output = new DataOutputStream(baos)) { + // 1. Compress and Write schema out + byte[] schemaContent = compress(schema.toString()); + output.writeInt(schemaContent.length); + output.write(schemaContent); + + List<HoodieRecord<?>> records = new ArrayList<>(); + try (ClosableIterator<HoodieRecord<Object>> recordItr = getRecordIterator(HoodieRecordType.AVRO)) { + recordItr.forEachRemaining(records::add); + } - // 3. Write the records - Iterator<HoodieRecord<?>> itr = records.iterator(); - while (itr.hasNext()) { - IndexedRecord s = itr.next().toIndexedRecord(schema, new Properties()).get().getData(); - ByteArrayOutputStream temp = new ByteArrayOutputStream(); - Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null); - try { - // Encode the record into bytes - writer.write(s, encoder); - encoder.flush(); - - // Get the size of the bytes - int size = temp.toByteArray().length; - // Write the record size - output.writeInt(size); - // Write the content - output.write(temp.toByteArray()); - itr.remove(); - } catch (IOException e) { - throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e); + // 2. Write total number of records + output.writeInt(records.size()); + + // 3. Write the records + Iterator<HoodieRecord<?>> itr = records.iterator(); + while (itr.hasNext()) { + IndexedRecord s = itr.next().toIndexedRecord(schema, new Properties()).get().getData(); + ByteArrayOutputStream temp = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null); + try { + // Encode the record into bytes + writer.write(s, encoder); + encoder.flush(); + + // Get the size of the bytes + int size = temp.toByteArray().length; + // Write the record size + output.writeInt(size); + // Write the content + output.write(temp.toByteArray()); + itr.remove(); + } catch (IOException e) { + throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e); + } } } - - output.close(); return baos.toByteArray(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java index 6b1069847f3..de5df5c73b7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java @@ -92,9 +92,9 @@ public class SerializationUtils { byte[] serialize(Object obj) { kryo.reset(); baos.reset(); - Output output = new Output(baos); - this.kryo.writeClassAndObject(output, obj); - output.close(); + try (Output output = new Output(baos)) { + this.kryo.writeClassAndObject(output, obj); + } return baos.toByteArray(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index cd16294da72..5f31cc6aa95 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -419,19 +419,19 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { List<String> sortedKeys, boolean fullKeys, String partitionName) throws IOException { - ClosableIterator<HoodieRecord<?>> records = fullKeys + Map<String, HoodieRecord<HoodieMetadataPayload>> result; + try (ClosableIterator<HoodieRecord<?>> records = fullKeys ? reader.getRecordsByKeysIterator(sortedKeys) - : reader.getRecordsByKeyPrefixIterator(sortedKeys); - - Map<String, HoodieRecord<HoodieMetadataPayload>> result = toStream(records) - .map(record -> { - GenericRecord data = (GenericRecord) record.getData(); - return Pair.of( - (String) (data).get(HoodieMetadataPayload.KEY_FIELD_NAME), - composeRecord(data, partitionName)); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - records.close(); + : reader.getRecordsByKeyPrefixIterator(sortedKeys)) { + result = toStream(records) + .map(record -> { + GenericRecord data = (GenericRecord) record.getData(); + return Pair.of( + (String) (data).get(HoodieMetadataPayload.KEY_FIELD_NAME), + composeRecord(data, partitionName)); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + } return result; } diff --git a/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java b/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java index fe6dd497b2f..352444faa34 100644 --- a/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java +++ b/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java @@ -75,8 +75,8 @@ public class HoodieJavaWriteClientExample { HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) .setTableName(tableName) - .setPayloadClassName(HoodieAvroPayload.class.getName()) - .initTable(hadoopConf, tablePath); + .setPayloadClassName(HoodieAvroPayload.class.getName()) + .initTable(hadoopConf, tablePath); } // Create the write client to write some records in @@ -85,38 +85,38 @@ public class HoodieJavaWriteClientExample { .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); - HoodieJavaWriteClient<HoodieAvroPayload> client = - new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); - - // inserts - String newCommitTime = client.startCommit(); - LOG.info("Starting commit " + newCommitTime); - - List<HoodieRecord<HoodieAvroPayload>> records = dataGen.generateInserts(newCommitTime, 10); - List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records); - List<HoodieRecord<HoodieAvroPayload>> writeRecords = - recordsSoFar.stream().map(r -> new HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList()); - client.insert(writeRecords, newCommitTime); - - // updates - newCommitTime = client.startCommit(); - LOG.info("Starting commit " + newCommitTime); - List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2); - records.addAll(toBeUpdated); - recordsSoFar.addAll(toBeUpdated); - writeRecords = - recordsSoFar.stream().map(r -> new HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList()); - client.upsert(writeRecords, newCommitTime); - - // Delete - newCommitTime = client.startCommit(); - LOG.info("Starting commit " + newCommitTime); - // just delete half of the records - int numToDelete = recordsSoFar.size() / 2; - List<HoodieKey> toBeDeleted = - recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList()); - client.delete(toBeDeleted, newCommitTime); - - client.close(); + + try (HoodieJavaWriteClient<HoodieAvroPayload> client = + new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)) { + + // inserts + String newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + + List<HoodieRecord<HoodieAvroPayload>> records = dataGen.generateInserts(newCommitTime, 10); + List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records); + List<HoodieRecord<HoodieAvroPayload>> writeRecords = + recordsSoFar.stream().map(r -> new HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList()); + client.insert(writeRecords, newCommitTime); + + // updates + newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2); + records.addAll(toBeUpdated); + recordsSoFar.addAll(toBeUpdated); + writeRecords = + recordsSoFar.stream().map(r -> new HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList()); + client.upsert(writeRecords, newCommitTime); + + // Delete + newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + // just delete half of the records + int numToDelete = recordsSoFar.size() / 2; + List<HoodieKey> toBeDeleted = + recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList()); + client.delete(toBeDeleted, newCommitTime); + } } } diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java index cbe505b7012..b57ce25671c 100644 --- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java @@ -99,52 +99,52 @@ public class HoodieWriteClientExample { .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); - SparkRDDWriteClient<HoodieAvroPayload> client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg); - - // inserts - String newCommitTime = client.startCommit(); - LOG.info("Starting commit " + newCommitTime); - - List<HoodieRecord<HoodieAvroPayload>> records = dataGen.generateInserts(newCommitTime, 10); - List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records); - JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords = jsc.parallelize(records, 1); - client.insert(writeRecords, newCommitTime); - - // updates - newCommitTime = client.startCommit(); - LOG.info("Starting commit " + newCommitTime); - List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2); - records.addAll(toBeUpdated); - recordsSoFar.addAll(toBeUpdated); - writeRecords = jsc.parallelize(records, 1); - client.upsert(writeRecords, newCommitTime); - - // Delete - newCommitTime = client.startCommit(); - LOG.info("Starting commit " + newCommitTime); - // just delete half of the records - int numToDelete = recordsSoFar.size() / 2; - List<HoodieKey> toBeDeleted = recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList()); - JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1); - client.delete(deleteRecords, newCommitTime); - - // Delete by partition - newCommitTime = client.startCommit(); - client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION); - LOG.info("Starting commit " + newCommitTime); - // The partition where the data needs to be deleted - List<String> partitionList = toBeDeleted.stream().map(s -> s.getPartitionPath()).distinct().collect(Collectors.toList()); - List<String> deleteList = recordsSoFar.stream().filter(f -> !partitionList.contains(f.getPartitionPath())) - .map(m -> m.getKey().getPartitionPath()).distinct().collect(Collectors.toList()); - client.deletePartitions(deleteList, newCommitTime); - - // compaction - if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) { - Option<String> instant = client.scheduleCompaction(Option.empty()); - HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client.compact(instant.get()); - client.commitCompaction(instant.get(), compactionMetadata.getCommitMetadata().get(), Option.empty()); + try (SparkRDDWriteClient<HoodieAvroPayload> client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg)) { + + // inserts + String newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + + List<HoodieRecord<HoodieAvroPayload>> records = dataGen.generateInserts(newCommitTime, 10); + List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records); + JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords = jsc.parallelize(records, 1); + client.insert(writeRecords, newCommitTime); + + // updates + newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2); + records.addAll(toBeUpdated); + recordsSoFar.addAll(toBeUpdated); + writeRecords = jsc.parallelize(records, 1); + client.upsert(writeRecords, newCommitTime); + + // Delete + newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + // just delete half of the records + int numToDelete = recordsSoFar.size() / 2; + List<HoodieKey> toBeDeleted = recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList()); + JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1); + client.delete(deleteRecords, newCommitTime); + + // Delete by partition + newCommitTime = client.startCommit(); + client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION); + LOG.info("Starting commit " + newCommitTime); + // The partition where the data needs to be deleted + List<String> partitionList = toBeDeleted.stream().map(s -> s.getPartitionPath()).distinct().collect(Collectors.toList()); + List<String> deleteList = recordsSoFar.stream().filter(f -> !partitionList.contains(f.getPartitionPath())) + .map(m -> m.getKey().getPartitionPath()).distinct().collect(Collectors.toList()); + client.deletePartitions(deleteList, newCommitTime); + + // compaction + if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) { + Option<String> instant = client.scheduleCompaction(Option.empty()); + HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client.compact(instant.get()); + client.commitCompaction(instant.get(), compactionMetadata.getCommitMetadata().get(), Option.empty()); + } } - client.close(); } } diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java b/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java index 37c573a173c..5bc91ebed14 100644 --- a/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java +++ b/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java @@ -140,10 +140,10 @@ public class FileIOUtils { } public static void writeStringToFile(String str, String filePath) throws IOException { - PrintStream out = new PrintStream(new FileOutputStream(filePath)); - out.println(str); - out.flush(); - out.close(); + try (PrintStream out = new PrintStream(new FileOutputStream(filePath))) { + out.println(str); + out.flush(); + } } /** @@ -174,9 +174,9 @@ public class FileIOUtils { } if (content.isPresent()) { - OutputStream out = fileSystem.create(fullPath, true); - out.write(content.get()); - out.close(); + try (OutputStream out = fileSystem.create(fullPath, true)) { + out.write(content.get()); + } } } catch (IOException e) { LOG.warn("Failed to create file " + fullPath, e); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java index 8806ce46ea3..4194547894d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java @@ -107,11 +107,10 @@ public class HoodieCompactionAdminTool { private <T> void serializeOperationResult(FileSystem fs, T result) throws Exception { if ((cfg.outputPath != null) && (result != null)) { Path outputPath = new Path(cfg.outputPath); - OutputStream stream = fs.create(outputPath, true); - ObjectOutputStream out = new ObjectOutputStream(stream); - out.writeObject(result); - out.close(); - stream.close(); + try (OutputStream stream = fs.create(outputPath, true); + ObjectOutputStream out = new ObjectOutputStream(stream)) { + out.writeObject(result); + } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java index 66b4382d784..669af8dca9f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java @@ -131,9 +131,9 @@ public class SchedulerConfGenerator { private static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare, Integer compactionMinShare, Integer clusteringWeight, Integer clusteringMinShare) throws IOException { File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml"); - BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile)); - bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare, clusteringWeight, clusteringMinShare)); - bw.close(); + try (BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile))) { + bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare, clusteringWeight, clusteringMinShare)); + } // SPARK-35083 introduces remote scheduler pool files, so the file must include scheme since Spark 3.2 String path = HoodieSparkUtils.gteqSpark3_2() ? tempConfigFile.toURI().toString() : tempConfigFile.getAbsolutePath(); LOG.info("Configs written to file " + path);