This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 78fc62554ce [HUDI-7633] Use try with resources for AutoCloseable
(#11045)
78fc62554ce is described below
commit 78fc62554ce3798eb803332b42f84f9cfa74e526
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Apr 17 21:31:44 2024 -0700
[HUDI-7633] Use try with resources for AutoCloseable (#11045)
---
.../hudi/cli/commands/ArchivedCommitsCommand.java | 104 ++++++++--------
.../apache/hudi/cli/commands/ExportCommand.java | 93 +++++++-------
.../hudi/cli/commands/HoodieLogFileCommand.java | 104 ++++++++--------
.../org/apache/hudi/cli/commands/TableCommand.java | 6 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 8 +-
.../hudi/common/model/HoodiePartitionMetadata.java | 8 +-
.../hudi/common/table/log/LogReaderUtils.java | 22 ++--
.../table/log/block/HoodieAvroDataBlock.java | 135 ++++++++++-----------
.../hudi/common/util/SerializationUtils.java | 6 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 24 ++--
.../java/HoodieJavaWriteClientExample.java | 70 +++++------
.../examples/spark/HoodieWriteClientExample.java | 90 +++++++-------
.../org/apache/hudi/common/util/FileIOUtils.java | 14 +--
.../hudi/utilities/HoodieCompactionAdminTool.java | 9 +-
.../utilities/streamer/SchedulerConfGenerator.java | 6 +-
15 files changed, 344 insertions(+), 355 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index 075a57d541c..5c57c8f5288 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -114,47 +114,46 @@ public class ArchivedCommitsCommand {
List<Comparable[]> allStats = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
// read the archived file
- Reader reader = HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath,
HoodieCLI.conf),
- new HoodieLogFile(fs.getPath()),
HoodieArchivedMetaEntry.getClassSchema());
-
- List<IndexedRecord> readRecords = new ArrayList<>();
- // read the avro blocks
- while (reader.hasNext()) {
- HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r ->
readRecords.add((IndexedRecord) r.getData()));
+ try (Reader reader =
HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath, HoodieCLI.conf),
+ new HoodieLogFile(fs.getPath()),
HoodieArchivedMetaEntry.getClassSchema())) {
+ List<IndexedRecord> readRecords = new ArrayList<>();
+ // read the avro blocks
+ while (reader.hasNext()) {
+ HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
+ blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r ->
readRecords.add((IndexedRecord) r.getData()));
+ }
+ List<Comparable[]> readCommits = readRecords.stream().map(r ->
(GenericRecord) r)
+ .filter(r ->
r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
+ ||
r.get("actionType").toString().equals(HoodieTimeline.DELTA_COMMIT_ACTION))
+ .flatMap(r -> {
+ HoodieCommitMetadata metadata = (HoodieCommitMetadata)
SpecificData.get()
+ .deepCopy(HoodieCommitMetadata.SCHEMA$,
r.get("hoodieCommitMetadata"));
+ final String instantTime = r.get("commitTime").toString();
+ final String action = r.get("actionType").toString();
+ return
metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats
-> hoodieWriteStats.stream().map(hoodieWriteStat -> {
+ List<Comparable> row = new ArrayList<>();
+ row.add(action);
+ row.add(instantTime);
+ row.add(hoodieWriteStat.getPartitionPath());
+ row.add(hoodieWriteStat.getFileId());
+ row.add(hoodieWriteStat.getPrevCommit());
+ row.add(hoodieWriteStat.getNumWrites());
+ row.add(hoodieWriteStat.getNumInserts());
+ row.add(hoodieWriteStat.getNumDeletes());
+ row.add(hoodieWriteStat.getNumUpdateWrites());
+ row.add(hoodieWriteStat.getTotalLogFiles());
+ row.add(hoodieWriteStat.getTotalLogBlocks());
+ row.add(hoodieWriteStat.getTotalCorruptLogBlock());
+ row.add(hoodieWriteStat.getTotalRollbackBlocks());
+ row.add(hoodieWriteStat.getTotalLogRecords());
+ row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted());
+ row.add(hoodieWriteStat.getTotalWriteBytes());
+ row.add(hoodieWriteStat.getTotalWriteErrors());
+ return row;
+ })).map(rowList -> rowList.toArray(new Comparable[0]));
+ }).collect(Collectors.toList());
+ allStats.addAll(readCommits);
}
- List<Comparable[]> readCommits = readRecords.stream().map(r ->
(GenericRecord) r)
- .filter(r ->
r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
- ||
r.get("actionType").toString().equals(HoodieTimeline.DELTA_COMMIT_ACTION))
- .flatMap(r -> {
- HoodieCommitMetadata metadata = (HoodieCommitMetadata)
SpecificData.get()
- .deepCopy(HoodieCommitMetadata.SCHEMA$,
r.get("hoodieCommitMetadata"));
- final String instantTime = r.get("commitTime").toString();
- final String action = r.get("actionType").toString();
- return
metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats
-> hoodieWriteStats.stream().map(hoodieWriteStat -> {
- List<Comparable> row = new ArrayList<>();
- row.add(action);
- row.add(instantTime);
- row.add(hoodieWriteStat.getPartitionPath());
- row.add(hoodieWriteStat.getFileId());
- row.add(hoodieWriteStat.getPrevCommit());
- row.add(hoodieWriteStat.getNumWrites());
- row.add(hoodieWriteStat.getNumInserts());
- row.add(hoodieWriteStat.getNumDeletes());
- row.add(hoodieWriteStat.getNumUpdateWrites());
- row.add(hoodieWriteStat.getTotalLogFiles());
- row.add(hoodieWriteStat.getTotalLogBlocks());
- row.add(hoodieWriteStat.getTotalCorruptLogBlock());
- row.add(hoodieWriteStat.getTotalRollbackBlocks());
- row.add(hoodieWriteStat.getTotalLogRecords());
- row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted());
- row.add(hoodieWriteStat.getTotalWriteBytes());
- row.add(hoodieWriteStat.getTotalWriteErrors());
- return row;
- })).map(rowList -> rowList.toArray(new Comparable[0]));
- }).collect(Collectors.toList());
- allStats.addAll(readCommits);
- reader.close();
}
TableHeader header = new
TableHeader().addTableHeaderField("action").addTableHeaderField("instant")
.addTableHeaderField("partition").addTableHeaderField("file_id").addTableHeaderField("prev_instant")
@@ -188,21 +187,20 @@ public class ArchivedCommitsCommand {
List<Comparable[]> allCommits = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
// read the archived file
- HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath, HoodieCLI.conf),
- new HoodieLogFile(fs.getPath()),
HoodieArchivedMetaEntry.getClassSchema());
-
- List<IndexedRecord> readRecords = new ArrayList<>();
- // read the avro blocks
- while (reader.hasNext()) {
- HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr =
blk.getRecordIterator(HoodieRecordType.AVRO)) {
- recordItr.forEachRemaining(r -> readRecords.add(r.getData()));
+ try (HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath, HoodieCLI.conf),
+ new HoodieLogFile(fs.getPath()),
HoodieArchivedMetaEntry.getClassSchema())) {
+ List<IndexedRecord> readRecords = new ArrayList<>();
+ // read the avro blocks
+ while (reader.hasNext()) {
+ HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
+ try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr =
blk.getRecordIterator(HoodieRecordType.AVRO)) {
+ recordItr.forEachRemaining(r -> readRecords.add(r.getData()));
+ }
}
+ List<Comparable[]> readCommits = readRecords.stream().map(r ->
(GenericRecord) r)
+ .map(r -> readCommit(r,
skipMetadata)).collect(Collectors.toList());
+ allCommits.addAll(readCommits);
}
- List<Comparable[]> readCommits = readRecords.stream().map(r ->
(GenericRecord) r)
- .map(r -> readCommit(r, skipMetadata)).collect(Collectors.toList());
- allCommits.addAll(readCommits);
- reader.close();
}
TableHeader header = new
TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("CommitType");
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
index effa096bfa9..eda0d0de219 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
@@ -125,57 +125,56 @@ public class ExportCommand {
for (FileStatus fs : statuses) {
// read the archived file
- Reader reader = HoodieLogFormat.newReader(fileSystem, new
HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
-
- // read the avro blocks
- while (reader.hasNext() && copyCount++ < limit) {
- HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr =
blk.getRecordIterator(HoodieRecordType.AVRO)) {
- while (recordItr.hasNext()) {
- IndexedRecord ir = recordItr.next().getData();
- // Archived instants are saved as arvo encoded
HoodieArchivedMetaEntry records. We need to get the
- // metadata record from the entry and convert it to json.
- HoodieArchivedMetaEntry archiveEntryRecord =
(HoodieArchivedMetaEntry) SpecificData.get()
- .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir);
- final String action =
archiveEntryRecord.get("actionType").toString();
- if (!actionSet.contains(action)) {
- continue;
+ try (Reader reader = HoodieLogFormat.newReader(fileSystem, new
HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
+
+ // read the avro blocks
+ while (reader.hasNext() && copyCount++ < limit) {
+ HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
+ try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr =
blk.getRecordIterator(HoodieRecordType.AVRO)) {
+ while (recordItr.hasNext()) {
+ IndexedRecord ir = recordItr.next().getData();
+ // Archived instants are saved as arvo encoded
HoodieArchivedMetaEntry records. We need to get the
+ // metadata record from the entry and convert it to json.
+ HoodieArchivedMetaEntry archiveEntryRecord =
(HoodieArchivedMetaEntry) SpecificData.get()
+ .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir);
+ final String action =
archiveEntryRecord.get("actionType").toString();
+ if (!actionSet.contains(action)) {
+ continue;
+ }
+
+ GenericRecord metadata = null;
+ switch (action) {
+ case HoodieTimeline.CLEAN_ACTION:
+ metadata = archiveEntryRecord.getHoodieCleanMetadata();
+ break;
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ metadata = archiveEntryRecord.getHoodieCommitMetadata();
+ break;
+ case HoodieTimeline.ROLLBACK_ACTION:
+ metadata = archiveEntryRecord.getHoodieRollbackMetadata();
+ break;
+ case HoodieTimeline.SAVEPOINT_ACTION:
+ metadata = archiveEntryRecord.getHoodieSavePointMetadata();
+ break;
+ case HoodieTimeline.COMPACTION_ACTION:
+ metadata = archiveEntryRecord.getHoodieCompactionMetadata();
+ break;
+ default:
+ throw new HoodieException("Unknown type of action " +
action);
+ }
+
+ final String instantTime =
archiveEntryRecord.get("commitTime").toString();
+ if (metadata == null) {
+ LOG.error("Could not load metadata for action " + action + "
at instant time " + instantTime);
+ continue;
+ }
+ final String outPath = localFolder + StoragePath.SEPARATOR +
instantTime + "." + action;
+ writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true));
}
-
- GenericRecord metadata = null;
- switch (action) {
- case HoodieTimeline.CLEAN_ACTION:
- metadata = archiveEntryRecord.getHoodieCleanMetadata();
- break;
- case HoodieTimeline.COMMIT_ACTION:
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- metadata = archiveEntryRecord.getHoodieCommitMetadata();
- break;
- case HoodieTimeline.ROLLBACK_ACTION:
- metadata = archiveEntryRecord.getHoodieRollbackMetadata();
- break;
- case HoodieTimeline.SAVEPOINT_ACTION:
- metadata = archiveEntryRecord.getHoodieSavePointMetadata();
- break;
- case HoodieTimeline.COMPACTION_ACTION:
- metadata = archiveEntryRecord.getHoodieCompactionMetadata();
- break;
- default:
- throw new HoodieException("Unknown type of action " + action);
- }
-
- final String instantTime =
archiveEntryRecord.get("commitTime").toString();
- if (metadata == null) {
- LOG.error("Could not load metadata for action " + action + " at
instant time " + instantTime);
- continue;
- }
- final String outPath = localFolder + StoragePath.SEPARATOR +
instantTime + "." + action;
- writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true));
}
}
}
-
- reader.close();
}
return copyCount;
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 138a3e376b2..fc1a4b4ec42 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -114,52 +114,52 @@ public class HoodieLogFileCommand {
MessageType schema = TableSchemaResolver.readSchemaFromLogFile(fs, path);
Schema writerSchema = schema != null
? new AvroSchemaConverter().convert(Objects.requireNonNull(schema))
: null;
- Reader reader = HoodieLogFormat.newReader(fs, new
HoodieLogFile(fsStatus[0].getPath()), writerSchema);
+ try (Reader reader = HoodieLogFormat.newReader(fs, new
HoodieLogFile(fsStatus[0].getPath()), writerSchema)) {
- // read the avro blocks
- while (reader.hasNext()) {
- HoodieLogBlock n = reader.next();
- String instantTime;
- AtomicInteger recordCount = new AtomicInteger(0);
- if (n instanceof HoodieCorruptBlock) {
- try {
+ // read the avro blocks
+ while (reader.hasNext()) {
+ HoodieLogBlock n = reader.next();
+ String instantTime;
+ AtomicInteger recordCount = new AtomicInteger(0);
+ if (n instanceof HoodieCorruptBlock) {
+ try {
+ instantTime =
n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
+ if (instantTime == null) {
+ throw new Exception("Invalid instant time " + instantTime);
+ }
+ } catch (Exception e) {
+ numCorruptBlocks++;
+ instantTime = "corrupt_block_" + numCorruptBlocks;
+ // could not read metadata for corrupt block
+ }
+ } else {
instantTime =
n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
if (instantTime == null) {
- throw new Exception("Invalid instant time " + instantTime);
+ // This can happen when reading archived commit files since they
were written without any instant time
+ dummyInstantTimeCount++;
+ instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
}
- } catch (Exception e) {
- numCorruptBlocks++;
- instantTime = "corrupt_block_" + numCorruptBlocks;
- // could not read metadata for corrupt block
- }
- } else {
- instantTime =
n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
- if (instantTime == null) {
- // This can happen when reading archived commit files since they
were written without any instant time
- dummyInstantTimeCount++;
- instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
- }
- if (n instanceof HoodieDataBlock) {
- try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr =
((HoodieDataBlock) n).getRecordIterator(HoodieRecordType.AVRO)) {
- recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
+ if (n instanceof HoodieDataBlock) {
+ try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr =
((HoodieDataBlock) n).getRecordIterator(HoodieRecordType.AVRO)) {
+ recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
+ }
}
}
- }
- if (commitCountAndMetadata.containsKey(instantTime)) {
- commitCountAndMetadata.get(instantTime).add(
- new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()),
- new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()),
recordCount.get()));
- } else {
- List<Tuple3<Tuple2<String, HoodieLogBlockType>,
Tuple2<Map<HeaderMetadataType, String>,
- Map<HeaderMetadataType, String>>, Integer>> list =
- new ArrayList<>();
- list.add(
- new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()),
- new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()),
recordCount.get()));
- commitCountAndMetadata.put(instantTime, list);
+ if (commitCountAndMetadata.containsKey(instantTime)) {
+ commitCountAndMetadata.get(instantTime).add(
+ new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()),
+ new Tuple2<>(n.getLogBlockHeader(),
n.getLogBlockFooter()), recordCount.get()));
+ } else {
+ List<Tuple3<Tuple2<String, HoodieLogBlockType>,
Tuple2<Map<HeaderMetadataType, String>,
+ Map<HeaderMetadataType, String>>, Integer>> list =
+ new ArrayList<>();
+ list.add(
+ new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()),
+ new Tuple2<>(n.getLogBlockHeader(),
n.getLogBlockFooter()), recordCount.get()));
+ commitCountAndMetadata.put(instantTime, list);
+ }
}
}
- reader.close();
}
List<Comparable[]> rows = new ArrayList<>();
ObjectMapper objectMapper = new ObjectMapper();
@@ -260,23 +260,23 @@ public class HoodieLogFileCommand {
MessageType schema =
TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new
CachingPath(logFile));
Schema writerSchema = schema != null
? new
AvroSchemaConverter().convert(Objects.requireNonNull(schema)) : null;
- HoodieLogFormat.Reader reader =
- HoodieLogFormat.newReader(fs, new HoodieLogFile(new
CachingPath(logFile)), writerSchema);
- // read the avro blocks
- while (reader.hasNext()) {
- HoodieLogBlock n = reader.next();
- if (n instanceof HoodieDataBlock) {
- HoodieDataBlock blk = (HoodieDataBlock) n;
- try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr =
blk.getRecordIterator(HoodieRecordType.AVRO)) {
- recordItr.forEachRemaining(record -> {
- if (allRecords.size() < limit) {
- allRecords.add(record.getData());
- }
- });
+ try (HoodieLogFormat.Reader reader =
+ HoodieLogFormat.newReader(fs, new HoodieLogFile(new
CachingPath(logFile)), writerSchema)) {
+ // read the avro blocks
+ while (reader.hasNext()) {
+ HoodieLogBlock n = reader.next();
+ if (n instanceof HoodieDataBlock) {
+ HoodieDataBlock blk = (HoodieDataBlock) n;
+ try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr =
blk.getRecordIterator(HoodieRecordType.AVRO)) {
+ recordItr.forEachRemaining(record -> {
+ if (allRecords.size() < limit) {
+ allRecords.add(record.getData());
+ }
+ });
+ }
}
}
}
- reader.close();
if (allRecords.size() >= limit) {
break;
}
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index e3c3e810c5c..b6256f74006 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -403,12 +403,8 @@ public class TableCommand {
if (outFile.exists()) {
outFile.delete();
}
- OutputStream os = null;
- try {
- os = new FileOutputStream(outFile);
+ try (OutputStream os = new FileOutputStream(outFile)) {
os.write(getUTF8Bytes(data), 0, data.length());
- } finally {
- os.close();
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index a541de03cb3..7b175db53a5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -781,7 +781,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
final HoodieDeleteBlock block = new
HoodieDeleteBlock(Collections.emptyList(), false, blockHeader);
- HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
+ try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
partitionName))
.withFileId(fileGroupFileId)
.withDeltaCommit(instantTime)
@@ -791,9 +791,9 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
.withFs(dataMetaClient.getFs())
.withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
.withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
- writer.appendBlock(block);
- writer.close();
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build()) {
+ writer.appendBlock(block);
+ }
} catch (InterruptedException e) {
throw new HoodieException(String.format("Failed to created fileGroup
%s for partition %s", fileGroupFileId, partitionName), e);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index bbf505c8670..d84a529a084 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -141,10 +141,10 @@ public class HoodiePartitionMetadata {
BaseFileUtils.getInstance(format.get()).writeMetaFile(fs, filePath,
props);
} else {
// Backwards compatible properties file format
- OutputStream os = fs.create(filePath, true);
- props.store(os, "partition metadata");
- os.flush();
- os.close();
+ try (OutputStream os = fs.create(filePath, true)) {
+ props.store(os, "partition metadata");
+ os.flush();
+ }
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index 653c8211556..203c38b93f9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -53,21 +53,21 @@ public class LogReaderUtils {
private static Schema readSchemaFromLogFileInReverse(FileSystem fs,
HoodieActiveTimeline activeTimeline, HoodieLogFile hoodieLogFile)
throws IOException {
// set length for the HoodieLogFile as it will be leveraged by
HoodieLogFormat.Reader with reverseReading enabled
- Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, true);
Schema writerSchema = null;
- HoodieTimeline completedTimeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
- while (reader.hasPrev()) {
- HoodieLogBlock block = reader.prev();
- if (block instanceof HoodieDataBlock) {
- HoodieDataBlock lastBlock = (HoodieDataBlock) block;
- if (completedTimeline
-
.containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME)))
{
- writerSchema = new
Schema.Parser().parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
- break;
+ try (Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null,
true)) {
+ HoodieTimeline completedTimeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+ while (reader.hasPrev()) {
+ HoodieLogBlock block = reader.prev();
+ if (block instanceof HoodieDataBlock) {
+ HoodieDataBlock lastBlock = (HoodieDataBlock) block;
+ if (completedTimeline
+
.containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME)))
{
+ writerSchema = new
Schema.Parser().parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+ break;
+ }
}
}
}
- reader.close();
return writerSchema;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 590d9a17a0d..5ba9e1906b8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -104,38 +104,37 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
Schema schema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
GenericDatumWriter<IndexedRecord> writer = new
GenericDatumWriter<>(schema);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream output = new DataOutputStream(baos);
-
- // 1. Write out the log block version
- output.writeInt(HoodieLogBlock.version);
-
- // 2. Write total number of records
- output.writeInt(records.size());
-
- // 3. Write the records
- for (HoodieRecord<?> s : records) {
- ByteArrayOutputStream temp = new ByteArrayOutputStream();
- BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp,
encoderCache.get());
- encoderCache.set(encoder);
- try {
- // Encode the record into bytes
- // Spark Record not support write avro log
- IndexedRecord data = s.toIndexedRecord(schema, new
Properties()).get().getData();
- writer.write(data, encoder);
- encoder.flush();
-
- // Get the size of the bytes
- int size = temp.toByteArray().length;
- // Write the record size
- output.writeInt(size);
- // Write the content
- output.write(temp.toByteArray());
- } catch (IOException e) {
- throw new HoodieIOException("IOException converting
HoodieAvroDataBlock to bytes", e);
+ try (DataOutputStream output = new DataOutputStream(baos)) {
+ // 1. Write out the log block version
+ output.writeInt(HoodieLogBlock.version);
+
+ // 2. Write total number of records
+ output.writeInt(records.size());
+
+ // 3. Write the records
+ for (HoodieRecord<?> s : records) {
+ ByteArrayOutputStream temp = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp,
encoderCache.get());
+ encoderCache.set(encoder);
+ try {
+ // Encode the record into bytes
+ // Spark Record not support write avro log
+ IndexedRecord data = s.toIndexedRecord(schema, new
Properties()).get().getData();
+ writer.write(data, encoder);
+ encoder.flush();
+
+ // Get the size of the bytes
+ int size = temp.toByteArray().length;
+ // Write the record size
+ output.writeInt(size);
+ // Write the content
+ output.write(temp.toByteArray());
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException converting
HoodieAvroDataBlock to bytes", e);
+ }
}
+ encoderCache.remove();
}
- encoderCache.remove();
- output.close();
return baos.toByteArray();
}
@@ -287,9 +286,9 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
private static byte[] compress(String text) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
- OutputStream out = new DeflaterOutputStream(baos);
- out.write(getUTF8Bytes(text));
- out.close();
+ try (OutputStream out = new DeflaterOutputStream(baos)) {
+ out.write(getUTF8Bytes(text));
+ }
} catch (IOException e) {
throw new HoodieIOException("IOException while compressing text " +
text, e);
}
@@ -316,45 +315,43 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
GenericDatumWriter<IndexedRecord> writer = new
GenericDatumWriter<>(schema);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream output = new DataOutputStream(baos);
-
- // 1. Compress and Write schema out
- byte[] schemaContent = compress(schema.toString());
- output.writeInt(schemaContent.length);
- output.write(schemaContent);
-
- List<HoodieRecord<?>> records = new ArrayList<>();
- try (ClosableIterator<HoodieRecord<Object>> recordItr =
getRecordIterator(HoodieRecordType.AVRO)) {
- recordItr.forEachRemaining(records::add);
- }
-
- // 2. Write total number of records
- output.writeInt(records.size());
+ try (DataOutputStream output = new DataOutputStream(baos)) {
+ // 1. Compress and Write schema out
+ byte[] schemaContent = compress(schema.toString());
+ output.writeInt(schemaContent.length);
+ output.write(schemaContent);
+
+ List<HoodieRecord<?>> records = new ArrayList<>();
+ try (ClosableIterator<HoodieRecord<Object>> recordItr =
getRecordIterator(HoodieRecordType.AVRO)) {
+ recordItr.forEachRemaining(records::add);
+ }
- // 3. Write the records
- Iterator<HoodieRecord<?>> itr = records.iterator();
- while (itr.hasNext()) {
- IndexedRecord s = itr.next().toIndexedRecord(schema, new
Properties()).get().getData();
- ByteArrayOutputStream temp = new ByteArrayOutputStream();
- Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null);
- try {
- // Encode the record into bytes
- writer.write(s, encoder);
- encoder.flush();
-
- // Get the size of the bytes
- int size = temp.toByteArray().length;
- // Write the record size
- output.writeInt(size);
- // Write the content
- output.write(temp.toByteArray());
- itr.remove();
- } catch (IOException e) {
- throw new HoodieIOException("IOException converting
HoodieAvroDataBlock to bytes", e);
+ // 2. Write total number of records
+ output.writeInt(records.size());
+
+ // 3. Write the records
+ Iterator<HoodieRecord<?>> itr = records.iterator();
+ while (itr.hasNext()) {
+ IndexedRecord s = itr.next().toIndexedRecord(schema, new
Properties()).get().getData();
+ ByteArrayOutputStream temp = new ByteArrayOutputStream();
+ Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null);
+ try {
+ // Encode the record into bytes
+ writer.write(s, encoder);
+ encoder.flush();
+
+ // Get the size of the bytes
+ int size = temp.toByteArray().length;
+ // Write the record size
+ output.writeInt(size);
+ // Write the content
+ output.write(temp.toByteArray());
+ itr.remove();
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException converting
HoodieAvroDataBlock to bytes", e);
+ }
}
}
-
- output.close();
return baos.toByteArray();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
index 6b1069847f3..de5df5c73b7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
@@ -92,9 +92,9 @@ public class SerializationUtils {
byte[] serialize(Object obj) {
kryo.reset();
baos.reset();
- Output output = new Output(baos);
- this.kryo.writeClassAndObject(output, obj);
- output.close();
+ try (Output output = new Output(baos)) {
+ this.kryo.writeClassAndObject(output, obj);
+ }
return baos.toByteArray();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index cd16294da72..5f31cc6aa95 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -419,19 +419,19 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
List<String> sortedKeys,
boolean fullKeys,
String partitionName) throws IOException {
- ClosableIterator<HoodieRecord<?>> records = fullKeys
+ Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+ try (ClosableIterator<HoodieRecord<?>> records = fullKeys
? reader.getRecordsByKeysIterator(sortedKeys)
- : reader.getRecordsByKeyPrefixIterator(sortedKeys);
-
- Map<String, HoodieRecord<HoodieMetadataPayload>> result = toStream(records)
- .map(record -> {
- GenericRecord data = (GenericRecord) record.getData();
- return Pair.of(
- (String) (data).get(HoodieMetadataPayload.KEY_FIELD_NAME),
- composeRecord(data, partitionName));
- })
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
- records.close();
+ : reader.getRecordsByKeyPrefixIterator(sortedKeys)) {
+ result = toStream(records)
+ .map(record -> {
+ GenericRecord data = (GenericRecord) record.getData();
+ return Pair.of(
+ (String) (data).get(HoodieMetadataPayload.KEY_FIELD_NAME),
+ composeRecord(data, partitionName));
+ })
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ }
return result;
}
diff --git
a/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
b/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
index fe6dd497b2f..352444faa34 100644
---
a/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
+++
b/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
@@ -75,8 +75,8 @@ public class HoodieJavaWriteClientExample {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setTableName(tableName)
- .setPayloadClassName(HoodieAvroPayload.class.getName())
- .initTable(hadoopConf, tablePath);
+ .setPayloadClassName(HoodieAvroPayload.class.getName())
+ .initTable(hadoopConf, tablePath);
}
// Create the write client to write some records in
@@ -85,38 +85,38 @@ public class HoodieJavaWriteClientExample {
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20,
30).build()).build();
- HoodieJavaWriteClient<HoodieAvroPayload> client =
- new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf),
cfg);
-
- // inserts
- String newCommitTime = client.startCommit();
- LOG.info("Starting commit " + newCommitTime);
-
- List<HoodieRecord<HoodieAvroPayload>> records =
dataGen.generateInserts(newCommitTime, 10);
- List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new
ArrayList<>(records);
- List<HoodieRecord<HoodieAvroPayload>> writeRecords =
- recordsSoFar.stream().map(r -> new
HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
- client.insert(writeRecords, newCommitTime);
-
- // updates
- newCommitTime = client.startCommit();
- LOG.info("Starting commit " + newCommitTime);
- List<HoodieRecord<HoodieAvroPayload>> toBeUpdated =
dataGen.generateUpdates(newCommitTime, 2);
- records.addAll(toBeUpdated);
- recordsSoFar.addAll(toBeUpdated);
- writeRecords =
- recordsSoFar.stream().map(r -> new
HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
- client.upsert(writeRecords, newCommitTime);
-
- // Delete
- newCommitTime = client.startCommit();
- LOG.info("Starting commit " + newCommitTime);
- // just delete half of the records
- int numToDelete = recordsSoFar.size() / 2;
- List<HoodieKey> toBeDeleted =
-
recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
- client.delete(toBeDeleted, newCommitTime);
-
- client.close();
+
+ try (HoodieJavaWriteClient<HoodieAvroPayload> client =
+ new HoodieJavaWriteClient<>(new
HoodieJavaEngineContext(hadoopConf), cfg)) {
+
+ // inserts
+ String newCommitTime = client.startCommit();
+ LOG.info("Starting commit " + newCommitTime);
+
+ List<HoodieRecord<HoodieAvroPayload>> records =
dataGen.generateInserts(newCommitTime, 10);
+ List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new
ArrayList<>(records);
+ List<HoodieRecord<HoodieAvroPayload>> writeRecords =
+ recordsSoFar.stream().map(r -> new
HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
+ client.insert(writeRecords, newCommitTime);
+
+ // updates
+ newCommitTime = client.startCommit();
+ LOG.info("Starting commit " + newCommitTime);
+ List<HoodieRecord<HoodieAvroPayload>> toBeUpdated =
dataGen.generateUpdates(newCommitTime, 2);
+ records.addAll(toBeUpdated);
+ recordsSoFar.addAll(toBeUpdated);
+ writeRecords =
+ recordsSoFar.stream().map(r -> new
HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
+ client.upsert(writeRecords, newCommitTime);
+
+ // Delete
+ newCommitTime = client.startCommit();
+ LOG.info("Starting commit " + newCommitTime);
+ // just delete half of the records
+ int numToDelete = recordsSoFar.size() / 2;
+ List<HoodieKey> toBeDeleted =
+
recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
+ client.delete(toBeDeleted, newCommitTime);
+ }
}
}
diff --git
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
index cbe505b7012..b57ce25671c 100644
---
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
+++
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
@@ -99,52 +99,52 @@ public class HoodieWriteClientExample {
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20,
30).build()).build();
- SparkRDDWriteClient<HoodieAvroPayload> client = new
SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
-
- // inserts
- String newCommitTime = client.startCommit();
- LOG.info("Starting commit " + newCommitTime);
-
- List<HoodieRecord<HoodieAvroPayload>> records =
dataGen.generateInserts(newCommitTime, 10);
- List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new
ArrayList<>(records);
- JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords =
jsc.parallelize(records, 1);
- client.insert(writeRecords, newCommitTime);
-
- // updates
- newCommitTime = client.startCommit();
- LOG.info("Starting commit " + newCommitTime);
- List<HoodieRecord<HoodieAvroPayload>> toBeUpdated =
dataGen.generateUpdates(newCommitTime, 2);
- records.addAll(toBeUpdated);
- recordsSoFar.addAll(toBeUpdated);
- writeRecords = jsc.parallelize(records, 1);
- client.upsert(writeRecords, newCommitTime);
-
- // Delete
- newCommitTime = client.startCommit();
- LOG.info("Starting commit " + newCommitTime);
- // just delete half of the records
- int numToDelete = recordsSoFar.size() / 2;
- List<HoodieKey> toBeDeleted =
recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
- JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
- client.delete(deleteRecords, newCommitTime);
-
- // Delete by partition
- newCommitTime = client.startCommit();
- client.startCommitWithTime(newCommitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
- LOG.info("Starting commit " + newCommitTime);
- // The partition where the data needs to be deleted
- List<String> partitionList = toBeDeleted.stream().map(s ->
s.getPartitionPath()).distinct().collect(Collectors.toList());
- List<String> deleteList = recordsSoFar.stream().filter(f ->
!partitionList.contains(f.getPartitionPath()))
- .map(m ->
m.getKey().getPartitionPath()).distinct().collect(Collectors.toList());
- client.deletePartitions(deleteList, newCommitTime);
-
- // compaction
- if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ)
{
- Option<String> instant = client.scheduleCompaction(Option.empty());
- HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
client.compact(instant.get());
- client.commitCompaction(instant.get(),
compactionMetadata.getCommitMetadata().get(), Option.empty());
+ try (SparkRDDWriteClient<HoodieAvroPayload> client = new
SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg)) {
+
+ // inserts
+ String newCommitTime = client.startCommit();
+ LOG.info("Starting commit " + newCommitTime);
+
+ List<HoodieRecord<HoodieAvroPayload>> records =
dataGen.generateInserts(newCommitTime, 10);
+ List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new
ArrayList<>(records);
+ JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords =
jsc.parallelize(records, 1);
+ client.insert(writeRecords, newCommitTime);
+
+ // updates
+ newCommitTime = client.startCommit();
+ LOG.info("Starting commit " + newCommitTime);
+ List<HoodieRecord<HoodieAvroPayload>> toBeUpdated =
dataGen.generateUpdates(newCommitTime, 2);
+ records.addAll(toBeUpdated);
+ recordsSoFar.addAll(toBeUpdated);
+ writeRecords = jsc.parallelize(records, 1);
+ client.upsert(writeRecords, newCommitTime);
+
+ // Delete
+ newCommitTime = client.startCommit();
+ LOG.info("Starting commit " + newCommitTime);
+ // just delete half of the records
+ int numToDelete = recordsSoFar.size() / 2;
+ List<HoodieKey> toBeDeleted =
recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
+ JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
+ client.delete(deleteRecords, newCommitTime);
+
+ // Delete by partition
+ newCommitTime = client.startCommit();
+ client.startCommitWithTime(newCommitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
+ LOG.info("Starting commit " + newCommitTime);
+ // The partition where the data needs to be deleted
+ List<String> partitionList = toBeDeleted.stream().map(s ->
s.getPartitionPath()).distinct().collect(Collectors.toList());
+ List<String> deleteList = recordsSoFar.stream().filter(f ->
!partitionList.contains(f.getPartitionPath()))
+ .map(m ->
m.getKey().getPartitionPath()).distinct().collect(Collectors.toList());
+ client.deletePartitions(deleteList, newCommitTime);
+
+ // compaction
+ if (HoodieTableType.valueOf(tableType) ==
HoodieTableType.MERGE_ON_READ) {
+ Option<String> instant = client.scheduleCompaction(Option.empty());
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
client.compact(instant.get());
+ client.commitCompaction(instant.get(),
compactionMetadata.getCommitMetadata().get(), Option.empty());
+ }
}
- client.close();
}
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
b/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
index 37c573a173c..5bc91ebed14 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
@@ -140,10 +140,10 @@ public class FileIOUtils {
}
public static void writeStringToFile(String str, String filePath) throws
IOException {
- PrintStream out = new PrintStream(new FileOutputStream(filePath));
- out.println(str);
- out.flush();
- out.close();
+ try (PrintStream out = new PrintStream(new FileOutputStream(filePath))) {
+ out.println(str);
+ out.flush();
+ }
}
/**
@@ -174,9 +174,9 @@ public class FileIOUtils {
}
if (content.isPresent()) {
- OutputStream out = fileSystem.create(fullPath, true);
- out.write(content.get());
- out.close();
+ try (OutputStream out = fileSystem.create(fullPath, true)) {
+ out.write(content.get());
+ }
}
} catch (IOException e) {
LOG.warn("Failed to create file " + fullPath, e);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
index 8806ce46ea3..4194547894d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
@@ -107,11 +107,10 @@ public class HoodieCompactionAdminTool {
private <T> void serializeOperationResult(FileSystem fs, T result) throws
Exception {
if ((cfg.outputPath != null) && (result != null)) {
Path outputPath = new Path(cfg.outputPath);
- OutputStream stream = fs.create(outputPath, true);
- ObjectOutputStream out = new ObjectOutputStream(stream);
- out.writeObject(result);
- out.close();
- stream.close();
+ try (OutputStream stream = fs.create(outputPath, true);
+ ObjectOutputStream out = new ObjectOutputStream(stream)) {
+ out.writeObject(result);
+ }
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
index 66b4382d784..669af8dca9f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
@@ -131,9 +131,9 @@ public class SchedulerConfGenerator {
private static String generateAndStoreConfig(Integer deltaSyncWeight,
Integer compactionWeight,
Integer deltaSyncMinShare, Integer compactionMinShare, Integer
clusteringWeight, Integer clusteringMinShare) throws IOException {
File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(),
".xml");
- BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile));
- bw.write(generateConfig(deltaSyncWeight, compactionWeight,
deltaSyncMinShare, compactionMinShare, clusteringWeight, clusteringMinShare));
- bw.close();
+ try (BufferedWriter bw = new BufferedWriter(new
FileWriter(tempConfigFile))) {
+ bw.write(generateConfig(deltaSyncWeight, compactionWeight,
deltaSyncMinShare, compactionMinShare, clusteringWeight, clusteringMinShare));
+ }
// SPARK-35083 introduces remote scheduler pool files, so the file must
include scheme since Spark 3.2
String path = HoodieSparkUtils.gteqSpark3_2() ?
tempConfigFile.toURI().toString() : tempConfigFile.getAbsolutePath();
LOG.info("Configs written to file " + path);