This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 78fc62554ce [HUDI-7633] Use try with resources for AutoCloseable 
(#11045)
78fc62554ce is described below

commit 78fc62554ce3798eb803332b42f84f9cfa74e526
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Wed Apr 17 21:31:44 2024 -0700

    [HUDI-7633] Use try with resources for AutoCloseable (#11045)
---
 .../hudi/cli/commands/ArchivedCommitsCommand.java  | 104 ++++++++--------
 .../apache/hudi/cli/commands/ExportCommand.java    |  93 +++++++-------
 .../hudi/cli/commands/HoodieLogFileCommand.java    | 104 ++++++++--------
 .../org/apache/hudi/cli/commands/TableCommand.java |   6 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |   8 +-
 .../hudi/common/model/HoodiePartitionMetadata.java |   8 +-
 .../hudi/common/table/log/LogReaderUtils.java      |  22 ++--
 .../table/log/block/HoodieAvroDataBlock.java       | 135 ++++++++++-----------
 .../hudi/common/util/SerializationUtils.java       |   6 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  24 ++--
 .../java/HoodieJavaWriteClientExample.java         |  70 +++++------
 .../examples/spark/HoodieWriteClientExample.java   |  90 +++++++-------
 .../org/apache/hudi/common/util/FileIOUtils.java   |  14 +--
 .../hudi/utilities/HoodieCompactionAdminTool.java  |   9 +-
 .../utilities/streamer/SchedulerConfGenerator.java |   6 +-
 15 files changed, 344 insertions(+), 355 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index 075a57d541c..5c57c8f5288 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -114,47 +114,46 @@ public class ArchivedCommitsCommand {
     List<Comparable[]> allStats = new ArrayList<>();
     for (FileStatus fs : fsStatuses) {
       // read the archived file
-      Reader reader = HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath, 
HoodieCLI.conf),
-          new HoodieLogFile(fs.getPath()), 
HoodieArchivedMetaEntry.getClassSchema());
-
-      List<IndexedRecord> readRecords = new ArrayList<>();
-      // read the avro blocks
-      while (reader.hasNext()) {
-        HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
-        blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> 
readRecords.add((IndexedRecord) r.getData()));
+      try (Reader reader = 
HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath, HoodieCLI.conf),
+          new HoodieLogFile(fs.getPath()), 
HoodieArchivedMetaEntry.getClassSchema())) {
+        List<IndexedRecord> readRecords = new ArrayList<>();
+        // read the avro blocks
+        while (reader.hasNext()) {
+          HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
+          blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> 
readRecords.add((IndexedRecord) r.getData()));
+        }
+        List<Comparable[]> readCommits = readRecords.stream().map(r -> 
(GenericRecord) r)
+            .filter(r -> 
r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
+                || 
r.get("actionType").toString().equals(HoodieTimeline.DELTA_COMMIT_ACTION))
+            .flatMap(r -> {
+              HoodieCommitMetadata metadata = (HoodieCommitMetadata) 
SpecificData.get()
+                  .deepCopy(HoodieCommitMetadata.SCHEMA$, 
r.get("hoodieCommitMetadata"));
+              final String instantTime = r.get("commitTime").toString();
+              final String action = r.get("actionType").toString();
+              return 
metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats 
-> hoodieWriteStats.stream().map(hoodieWriteStat -> {
+                List<Comparable> row = new ArrayList<>();
+                row.add(action);
+                row.add(instantTime);
+                row.add(hoodieWriteStat.getPartitionPath());
+                row.add(hoodieWriteStat.getFileId());
+                row.add(hoodieWriteStat.getPrevCommit());
+                row.add(hoodieWriteStat.getNumWrites());
+                row.add(hoodieWriteStat.getNumInserts());
+                row.add(hoodieWriteStat.getNumDeletes());
+                row.add(hoodieWriteStat.getNumUpdateWrites());
+                row.add(hoodieWriteStat.getTotalLogFiles());
+                row.add(hoodieWriteStat.getTotalLogBlocks());
+                row.add(hoodieWriteStat.getTotalCorruptLogBlock());
+                row.add(hoodieWriteStat.getTotalRollbackBlocks());
+                row.add(hoodieWriteStat.getTotalLogRecords());
+                row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted());
+                row.add(hoodieWriteStat.getTotalWriteBytes());
+                row.add(hoodieWriteStat.getTotalWriteErrors());
+                return row;
+              })).map(rowList -> rowList.toArray(new Comparable[0]));
+            }).collect(Collectors.toList());
+        allStats.addAll(readCommits);
       }
-      List<Comparable[]> readCommits = readRecords.stream().map(r -> 
(GenericRecord) r)
-          .filter(r -> 
r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
-              || 
r.get("actionType").toString().equals(HoodieTimeline.DELTA_COMMIT_ACTION))
-          .flatMap(r -> {
-            HoodieCommitMetadata metadata = (HoodieCommitMetadata) 
SpecificData.get()
-                .deepCopy(HoodieCommitMetadata.SCHEMA$, 
r.get("hoodieCommitMetadata"));
-            final String instantTime = r.get("commitTime").toString();
-            final String action = r.get("actionType").toString();
-            return 
metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats 
-> hoodieWriteStats.stream().map(hoodieWriteStat -> {
-              List<Comparable> row = new ArrayList<>();
-              row.add(action);
-              row.add(instantTime);
-              row.add(hoodieWriteStat.getPartitionPath());
-              row.add(hoodieWriteStat.getFileId());
-              row.add(hoodieWriteStat.getPrevCommit());
-              row.add(hoodieWriteStat.getNumWrites());
-              row.add(hoodieWriteStat.getNumInserts());
-              row.add(hoodieWriteStat.getNumDeletes());
-              row.add(hoodieWriteStat.getNumUpdateWrites());
-              row.add(hoodieWriteStat.getTotalLogFiles());
-              row.add(hoodieWriteStat.getTotalLogBlocks());
-              row.add(hoodieWriteStat.getTotalCorruptLogBlock());
-              row.add(hoodieWriteStat.getTotalRollbackBlocks());
-              row.add(hoodieWriteStat.getTotalLogRecords());
-              row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted());
-              row.add(hoodieWriteStat.getTotalWriteBytes());
-              row.add(hoodieWriteStat.getTotalWriteErrors());
-              return row;
-            })).map(rowList -> rowList.toArray(new Comparable[0]));
-          }).collect(Collectors.toList());
-      allStats.addAll(readCommits);
-      reader.close();
     }
     TableHeader header = new 
TableHeader().addTableHeaderField("action").addTableHeaderField("instant")
         
.addTableHeaderField("partition").addTableHeaderField("file_id").addTableHeaderField("prev_instant")
@@ -188,21 +187,20 @@ public class ArchivedCommitsCommand {
     List<Comparable[]> allCommits = new ArrayList<>();
     for (FileStatus fs : fsStatuses) {
       // read the archived file
-      HoodieLogFormat.Reader reader = 
HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath, HoodieCLI.conf),
-          new HoodieLogFile(fs.getPath()), 
HoodieArchivedMetaEntry.getClassSchema());
-
-      List<IndexedRecord> readRecords = new ArrayList<>();
-      // read the avro blocks
-      while (reader.hasNext()) {
-        HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
-        try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = 
blk.getRecordIterator(HoodieRecordType.AVRO)) {
-          recordItr.forEachRemaining(r -> readRecords.add(r.getData()));
+      try (HoodieLogFormat.Reader reader = 
HoodieLogFormat.newReader(HadoopFSUtils.getFs(basePath, HoodieCLI.conf),
+          new HoodieLogFile(fs.getPath()), 
HoodieArchivedMetaEntry.getClassSchema())) {
+        List<IndexedRecord> readRecords = new ArrayList<>();
+        // read the avro blocks
+        while (reader.hasNext()) {
+          HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
+          try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = 
blk.getRecordIterator(HoodieRecordType.AVRO)) {
+            recordItr.forEachRemaining(r -> readRecords.add(r.getData()));
+          }
         }
+        List<Comparable[]> readCommits = readRecords.stream().map(r -> 
(GenericRecord) r)
+            .map(r -> readCommit(r, 
skipMetadata)).collect(Collectors.toList());
+        allCommits.addAll(readCommits);
       }
-      List<Comparable[]> readCommits = readRecords.stream().map(r -> 
(GenericRecord) r)
-          .map(r -> readCommit(r, skipMetadata)).collect(Collectors.toList());
-      allCommits.addAll(readCommits);
-      reader.close();
     }
 
     TableHeader header = new 
TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("CommitType");
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
index effa096bfa9..eda0d0de219 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
@@ -125,57 +125,56 @@ public class ExportCommand {
 
     for (FileStatus fs : statuses) {
       // read the archived file
-      Reader reader = HoodieLogFormat.newReader(fileSystem, new 
HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
-
-      // read the avro blocks
-      while (reader.hasNext() && copyCount++ < limit) {
-        HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
-        try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = 
blk.getRecordIterator(HoodieRecordType.AVRO)) {
-          while (recordItr.hasNext()) {
-            IndexedRecord ir = recordItr.next().getData();
-            // Archived instants are saved as arvo encoded 
HoodieArchivedMetaEntry records. We need to get the
-            // metadata record from the entry and convert it to json.
-            HoodieArchivedMetaEntry archiveEntryRecord = 
(HoodieArchivedMetaEntry) SpecificData.get()
-                .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir);
-            final String action = 
archiveEntryRecord.get("actionType").toString();
-            if (!actionSet.contains(action)) {
-              continue;
+      try (Reader reader = HoodieLogFormat.newReader(fileSystem, new 
HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
+
+        // read the avro blocks
+        while (reader.hasNext() && copyCount++ < limit) {
+          HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
+          try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = 
blk.getRecordIterator(HoodieRecordType.AVRO)) {
+            while (recordItr.hasNext()) {
+              IndexedRecord ir = recordItr.next().getData();
+              // Archived instants are saved as arvo encoded 
HoodieArchivedMetaEntry records. We need to get the
+              // metadata record from the entry and convert it to json.
+              HoodieArchivedMetaEntry archiveEntryRecord = 
(HoodieArchivedMetaEntry) SpecificData.get()
+                  .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir);
+              final String action = 
archiveEntryRecord.get("actionType").toString();
+              if (!actionSet.contains(action)) {
+                continue;
+              }
+
+              GenericRecord metadata = null;
+              switch (action) {
+                case HoodieTimeline.CLEAN_ACTION:
+                  metadata = archiveEntryRecord.getHoodieCleanMetadata();
+                  break;
+                case HoodieTimeline.COMMIT_ACTION:
+                case HoodieTimeline.DELTA_COMMIT_ACTION:
+                  metadata = archiveEntryRecord.getHoodieCommitMetadata();
+                  break;
+                case HoodieTimeline.ROLLBACK_ACTION:
+                  metadata = archiveEntryRecord.getHoodieRollbackMetadata();
+                  break;
+                case HoodieTimeline.SAVEPOINT_ACTION:
+                  metadata = archiveEntryRecord.getHoodieSavePointMetadata();
+                  break;
+                case HoodieTimeline.COMPACTION_ACTION:
+                  metadata = archiveEntryRecord.getHoodieCompactionMetadata();
+                  break;
+                default:
+                  throw new HoodieException("Unknown type of action " + 
action);
+              }
+
+              final String instantTime = 
archiveEntryRecord.get("commitTime").toString();
+              if (metadata == null) {
+                LOG.error("Could not load metadata for action " + action + " 
at instant time " + instantTime);
+                continue;
+              }
+              final String outPath = localFolder + StoragePath.SEPARATOR + 
instantTime + "." + action;
+              writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true));
             }
-
-            GenericRecord metadata = null;
-            switch (action) {
-              case HoodieTimeline.CLEAN_ACTION:
-                metadata = archiveEntryRecord.getHoodieCleanMetadata();
-                break;
-              case HoodieTimeline.COMMIT_ACTION:
-              case HoodieTimeline.DELTA_COMMIT_ACTION:
-                metadata = archiveEntryRecord.getHoodieCommitMetadata();
-                break;
-              case HoodieTimeline.ROLLBACK_ACTION:
-                metadata = archiveEntryRecord.getHoodieRollbackMetadata();
-                break;
-              case HoodieTimeline.SAVEPOINT_ACTION:
-                metadata = archiveEntryRecord.getHoodieSavePointMetadata();
-                break;
-              case HoodieTimeline.COMPACTION_ACTION:
-                metadata = archiveEntryRecord.getHoodieCompactionMetadata();
-                break;
-              default:
-                throw new HoodieException("Unknown type of action " + action);
-            }
-
-            final String instantTime = 
archiveEntryRecord.get("commitTime").toString();
-            if (metadata == null) {
-              LOG.error("Could not load metadata for action " + action + " at 
instant time " + instantTime);
-              continue;
-            }
-            final String outPath = localFolder + StoragePath.SEPARATOR + 
instantTime + "." + action;
-            writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true));
           }
         }
       }
-
-      reader.close();
     }
 
     return copyCount;
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 138a3e376b2..fc1a4b4ec42 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -114,52 +114,52 @@ public class HoodieLogFileCommand {
       MessageType schema = TableSchemaResolver.readSchemaFromLogFile(fs, path);
       Schema writerSchema = schema != null
           ? new AvroSchemaConverter().convert(Objects.requireNonNull(schema)) 
: null;
-      Reader reader = HoodieLogFormat.newReader(fs, new 
HoodieLogFile(fsStatus[0].getPath()), writerSchema);
+      try (Reader reader = HoodieLogFormat.newReader(fs, new 
HoodieLogFile(fsStatus[0].getPath()), writerSchema)) {
 
-      // read the avro blocks
-      while (reader.hasNext()) {
-        HoodieLogBlock n = reader.next();
-        String instantTime;
-        AtomicInteger recordCount = new AtomicInteger(0);
-        if (n instanceof HoodieCorruptBlock) {
-          try {
+        // read the avro blocks
+        while (reader.hasNext()) {
+          HoodieLogBlock n = reader.next();
+          String instantTime;
+          AtomicInteger recordCount = new AtomicInteger(0);
+          if (n instanceof HoodieCorruptBlock) {
+            try {
+              instantTime = 
n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
+              if (instantTime == null) {
+                throw new Exception("Invalid instant time " + instantTime);
+              }
+            } catch (Exception e) {
+              numCorruptBlocks++;
+              instantTime = "corrupt_block_" + numCorruptBlocks;
+              // could not read metadata for corrupt block
+            }
+          } else {
             instantTime = 
n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
             if (instantTime == null) {
-              throw new Exception("Invalid instant time " + instantTime);
+              // This can happen when reading archived commit files since they 
were written without any instant time
+              dummyInstantTimeCount++;
+              instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
             }
-          } catch (Exception e) {
-            numCorruptBlocks++;
-            instantTime = "corrupt_block_" + numCorruptBlocks;
-            // could not read metadata for corrupt block
-          }
-        } else {
-          instantTime = 
n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
-          if (instantTime == null) {
-            // This can happen when reading archived commit files since they 
were written without any instant time
-            dummyInstantTimeCount++;
-            instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
-          }
-          if (n instanceof HoodieDataBlock) {
-            try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = 
((HoodieDataBlock) n).getRecordIterator(HoodieRecordType.AVRO)) {
-              recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
+            if (n instanceof HoodieDataBlock) {
+              try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = 
((HoodieDataBlock) n).getRecordIterator(HoodieRecordType.AVRO)) {
+                recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
+              }
             }
           }
-        }
-        if (commitCountAndMetadata.containsKey(instantTime)) {
-          commitCountAndMetadata.get(instantTime).add(
-              new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()),
-                  new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), 
recordCount.get()));
-        } else {
-          List<Tuple3<Tuple2<String, HoodieLogBlockType>, 
Tuple2<Map<HeaderMetadataType, String>,
-              Map<HeaderMetadataType, String>>, Integer>> list =
-              new ArrayList<>();
-          list.add(
-              new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()),
-                  new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), 
recordCount.get()));
-          commitCountAndMetadata.put(instantTime, list);
+          if (commitCountAndMetadata.containsKey(instantTime)) {
+            commitCountAndMetadata.get(instantTime).add(
+                new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()),
+                    new Tuple2<>(n.getLogBlockHeader(), 
n.getLogBlockFooter()), recordCount.get()));
+          } else {
+            List<Tuple3<Tuple2<String, HoodieLogBlockType>, 
Tuple2<Map<HeaderMetadataType, String>,
+                Map<HeaderMetadataType, String>>, Integer>> list =
+                new ArrayList<>();
+            list.add(
+                new Tuple3<>(new Tuple2<>(fileName, n.getBlockType()),
+                    new Tuple2<>(n.getLogBlockHeader(), 
n.getLogBlockFooter()), recordCount.get()));
+            commitCountAndMetadata.put(instantTime, list);
+          }
         }
       }
-      reader.close();
     }
     List<Comparable[]> rows = new ArrayList<>();
     ObjectMapper objectMapper = new ObjectMapper();
@@ -260,23 +260,23 @@ public class HoodieLogFileCommand {
         MessageType schema = 
TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new 
CachingPath(logFile));
         Schema writerSchema = schema != null
             ? new 
AvroSchemaConverter().convert(Objects.requireNonNull(schema)) : null;
-        HoodieLogFormat.Reader reader =
-            HoodieLogFormat.newReader(fs, new HoodieLogFile(new 
CachingPath(logFile)), writerSchema);
-        // read the avro blocks
-        while (reader.hasNext()) {
-          HoodieLogBlock n = reader.next();
-          if (n instanceof HoodieDataBlock) {
-            HoodieDataBlock blk = (HoodieDataBlock) n;
-            try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = 
blk.getRecordIterator(HoodieRecordType.AVRO)) {
-              recordItr.forEachRemaining(record -> {
-                if (allRecords.size() < limit) {
-                  allRecords.add(record.getData());
-                }
-              });
+        try (HoodieLogFormat.Reader reader =
+                 HoodieLogFormat.newReader(fs, new HoodieLogFile(new 
CachingPath(logFile)), writerSchema)) {
+          // read the avro blocks
+          while (reader.hasNext()) {
+            HoodieLogBlock n = reader.next();
+            if (n instanceof HoodieDataBlock) {
+              HoodieDataBlock blk = (HoodieDataBlock) n;
+              try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = 
blk.getRecordIterator(HoodieRecordType.AVRO)) {
+                recordItr.forEachRemaining(record -> {
+                  if (allRecords.size() < limit) {
+                    allRecords.add(record.getData());
+                  }
+                });
+              }
             }
           }
         }
-        reader.close();
         if (allRecords.size() >= limit) {
           break;
         }
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index e3c3e810c5c..b6256f74006 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -403,12 +403,8 @@ public class TableCommand {
     if (outFile.exists()) {
       outFile.delete();
     }
-    OutputStream os = null;
-    try {
-      os = new FileOutputStream(outFile);
+    try (OutputStream os = new FileOutputStream(outFile)) {
       os.write(getUTF8Bytes(data), 0, data.length());
-    } finally {
-      os.close();
     }
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index a541de03cb3..7b175db53a5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -781,7 +781,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
         final HoodieDeleteBlock block = new 
HoodieDeleteBlock(Collections.emptyList(), false, blockHeader);
 
-        HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
+        try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
             
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), 
partitionName))
             .withFileId(fileGroupFileId)
             .withDeltaCommit(instantTime)
@@ -791,9 +791,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
             .withFs(dataMetaClient.getFs())
             .withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
             .withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
-            .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-        writer.appendBlock(block);
-        writer.close();
+            .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build()) {
+          writer.appendBlock(block);
+        }
       } catch (InterruptedException e) {
         throw new HoodieException(String.format("Failed to created fileGroup 
%s for partition %s", fileGroupFileId, partitionName), e);
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index bbf505c8670..d84a529a084 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -141,10 +141,10 @@ public class HoodiePartitionMetadata {
       BaseFileUtils.getInstance(format.get()).writeMetaFile(fs, filePath, 
props);
     } else {
       // Backwards compatible properties file format
-      OutputStream os = fs.create(filePath, true);
-      props.store(os, "partition metadata");
-      os.flush();
-      os.close();
+      try (OutputStream os = fs.create(filePath, true)) {
+        props.store(os, "partition metadata");
+        os.flush();
+      }
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index 653c8211556..203c38b93f9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -53,21 +53,21 @@ public class LogReaderUtils {
   private static Schema readSchemaFromLogFileInReverse(FileSystem fs, 
HoodieActiveTimeline activeTimeline, HoodieLogFile hoodieLogFile)
       throws IOException {
     // set length for the HoodieLogFile as it will be leveraged by 
HoodieLogFormat.Reader with reverseReading enabled
-    Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, true);
     Schema writerSchema = null;
-    HoodieTimeline completedTimeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
-    while (reader.hasPrev()) {
-      HoodieLogBlock block = reader.prev();
-      if (block instanceof HoodieDataBlock) {
-        HoodieDataBlock lastBlock = (HoodieDataBlock) block;
-        if (completedTimeline
-            
.containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME)))
 {
-          writerSchema = new 
Schema.Parser().parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-          break;
+    try (Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, 
true)) {
+      HoodieTimeline completedTimeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+      while (reader.hasPrev()) {
+        HoodieLogBlock block = reader.prev();
+        if (block instanceof HoodieDataBlock) {
+          HoodieDataBlock lastBlock = (HoodieDataBlock) block;
+          if (completedTimeline
+              
.containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME)))
 {
+            writerSchema = new 
Schema.Parser().parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+            break;
+          }
         }
       }
     }
-    reader.close();
     return writerSchema;
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 590d9a17a0d..5ba9e1906b8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -104,38 +104,37 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
     Schema schema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
     GenericDatumWriter<IndexedRecord> writer = new 
GenericDatumWriter<>(schema);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream output = new DataOutputStream(baos);
-
-    // 1. Write out the log block version
-    output.writeInt(HoodieLogBlock.version);
-
-    // 2. Write total number of records
-    output.writeInt(records.size());
-
-    // 3. Write the records
-    for (HoodieRecord<?> s : records) {
-      ByteArrayOutputStream temp = new ByteArrayOutputStream();
-      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, 
encoderCache.get());
-      encoderCache.set(encoder);
-      try {
-        // Encode the record into bytes
-        // Spark Record not support write avro log
-        IndexedRecord data = s.toIndexedRecord(schema, new 
Properties()).get().getData();
-        writer.write(data, encoder);
-        encoder.flush();
-
-        // Get the size of the bytes
-        int size = temp.toByteArray().length;
-        // Write the record size
-        output.writeInt(size);
-        // Write the content
-        output.write(temp.toByteArray());
-      } catch (IOException e) {
-        throw new HoodieIOException("IOException converting 
HoodieAvroDataBlock to bytes", e);
+    try (DataOutputStream output = new DataOutputStream(baos)) {
+      // 1. Write out the log block version
+      output.writeInt(HoodieLogBlock.version);
+
+      // 2. Write total number of records
+      output.writeInt(records.size());
+
+      // 3. Write the records
+      for (HoodieRecord<?> s : records) {
+        ByteArrayOutputStream temp = new ByteArrayOutputStream();
+        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, 
encoderCache.get());
+        encoderCache.set(encoder);
+        try {
+          // Encode the record into bytes
+          // Spark Record not support write avro log
+          IndexedRecord data = s.toIndexedRecord(schema, new 
Properties()).get().getData();
+          writer.write(data, encoder);
+          encoder.flush();
+
+          // Get the size of the bytes
+          int size = temp.toByteArray().length;
+          // Write the record size
+          output.writeInt(size);
+          // Write the content
+          output.write(temp.toByteArray());
+        } catch (IOException e) {
+          throw new HoodieIOException("IOException converting 
HoodieAvroDataBlock to bytes", e);
+        }
       }
+      encoderCache.remove();
     }
-    encoderCache.remove();
-    output.close();
     return baos.toByteArray();
   }
 
@@ -287,9 +286,9 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
   private static byte[] compress(String text) {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try {
-      OutputStream out = new DeflaterOutputStream(baos);
-      out.write(getUTF8Bytes(text));
-      out.close();
+      try (OutputStream out = new DeflaterOutputStream(baos)) {
+        out.write(getUTF8Bytes(text));
+      }
     } catch (IOException e) {
       throw new HoodieIOException("IOException while compressing text " + 
text, e);
     }
@@ -316,45 +315,43 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
 
     GenericDatumWriter<IndexedRecord> writer = new 
GenericDatumWriter<>(schema);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream output = new DataOutputStream(baos);
-
-    // 1. Compress and Write schema out
-    byte[] schemaContent = compress(schema.toString());
-    output.writeInt(schemaContent.length);
-    output.write(schemaContent);
-
-    List<HoodieRecord<?>> records = new ArrayList<>();
-    try (ClosableIterator<HoodieRecord<Object>> recordItr = 
getRecordIterator(HoodieRecordType.AVRO)) {
-      recordItr.forEachRemaining(records::add);
-    }
-
-    // 2. Write total number of records
-    output.writeInt(records.size());
+    try (DataOutputStream output = new DataOutputStream(baos)) {
+      // 1. Compress and Write schema out
+      byte[] schemaContent = compress(schema.toString());
+      output.writeInt(schemaContent.length);
+      output.write(schemaContent);
+
+      List<HoodieRecord<?>> records = new ArrayList<>();
+      try (ClosableIterator<HoodieRecord<Object>> recordItr = 
getRecordIterator(HoodieRecordType.AVRO)) {
+        recordItr.forEachRemaining(records::add);
+      }
 
-    // 3. Write the records
-    Iterator<HoodieRecord<?>> itr = records.iterator();
-    while (itr.hasNext()) {
-      IndexedRecord s = itr.next().toIndexedRecord(schema, new 
Properties()).get().getData();
-      ByteArrayOutputStream temp = new ByteArrayOutputStream();
-      Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null);
-      try {
-        // Encode the record into bytes
-        writer.write(s, encoder);
-        encoder.flush();
-
-        // Get the size of the bytes
-        int size = temp.toByteArray().length;
-        // Write the record size
-        output.writeInt(size);
-        // Write the content
-        output.write(temp.toByteArray());
-        itr.remove();
-      } catch (IOException e) {
-        throw new HoodieIOException("IOException converting 
HoodieAvroDataBlock to bytes", e);
+      // 2. Write total number of records
+      output.writeInt(records.size());
+
+      // 3. Write the records
+      Iterator<HoodieRecord<?>> itr = records.iterator();
+      while (itr.hasNext()) {
+        IndexedRecord s = itr.next().toIndexedRecord(schema, new 
Properties()).get().getData();
+        ByteArrayOutputStream temp = new ByteArrayOutputStream();
+        Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null);
+        try {
+          // Encode the record into bytes
+          writer.write(s, encoder);
+          encoder.flush();
+
+          // Get the size of the bytes
+          int size = temp.toByteArray().length;
+          // Write the record size
+          output.writeInt(size);
+          // Write the content
+          output.write(temp.toByteArray());
+          itr.remove();
+        } catch (IOException e) {
+          throw new HoodieIOException("IOException converting 
HoodieAvroDataBlock to bytes", e);
+        }
       }
     }
-
-    output.close();
     return baos.toByteArray();
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
index 6b1069847f3..de5df5c73b7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
@@ -92,9 +92,9 @@ public class SerializationUtils {
     byte[] serialize(Object obj) {
       kryo.reset();
       baos.reset();
-      Output output = new Output(baos);
-      this.kryo.writeClassAndObject(output, obj);
-      output.close();
+      try (Output output = new Output(baos)) {
+        this.kryo.writeClassAndObject(output, obj);
+      }
       return baos.toByteArray();
     }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index cd16294da72..5f31cc6aa95 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -419,19 +419,19 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
                                                                                
       List<String> sortedKeys,
                                                                                
       boolean fullKeys,
                                                                                
       String partitionName) throws IOException {
-    ClosableIterator<HoodieRecord<?>> records = fullKeys
+    Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+    try (ClosableIterator<HoodieRecord<?>> records = fullKeys
         ? reader.getRecordsByKeysIterator(sortedKeys)
-        : reader.getRecordsByKeyPrefixIterator(sortedKeys);
-
-    Map<String, HoodieRecord<HoodieMetadataPayload>> result = toStream(records)
-        .map(record -> {
-          GenericRecord data = (GenericRecord) record.getData();
-          return Pair.of(
-              (String) (data).get(HoodieMetadataPayload.KEY_FIELD_NAME),
-              composeRecord(data, partitionName));
-        })
-        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-    records.close();
+        : reader.getRecordsByKeyPrefixIterator(sortedKeys)) {
+      result = toStream(records)
+          .map(record -> {
+            GenericRecord data = (GenericRecord) record.getData();
+            return Pair.of(
+                (String) (data).get(HoodieMetadataPayload.KEY_FIELD_NAME),
+                composeRecord(data, partitionName));
+          })
+          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+    }
     return result;
   }
 
diff --git 
a/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
 
b/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
index fe6dd497b2f..352444faa34 100644
--- 
a/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
+++ 
b/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
@@ -75,8 +75,8 @@ public class HoodieJavaWriteClientExample {
       HoodieTableMetaClient.withPropertyBuilder()
         .setTableType(tableType)
         .setTableName(tableName)
-        .setPayloadClassName(HoodieAvroPayload.class.getName())
-        .initTable(hadoopConf, tablePath);
+          .setPayloadClassName(HoodieAvroPayload.class.getName())
+          .initTable(hadoopConf, tablePath);
     }
 
     // Create the write client to write some records in
@@ -85,38 +85,38 @@ public class HoodieJavaWriteClientExample {
         .withDeleteParallelism(2).forTable(tableName)
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
         
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 
30).build()).build();
-    HoodieJavaWriteClient<HoodieAvroPayload> client =
-        new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), 
cfg);
-
-    // inserts
-    String newCommitTime = client.startCommit();
-    LOG.info("Starting commit " + newCommitTime);
-
-    List<HoodieRecord<HoodieAvroPayload>> records = 
dataGen.generateInserts(newCommitTime, 10);
-    List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new 
ArrayList<>(records);
-    List<HoodieRecord<HoodieAvroPayload>> writeRecords =
-        recordsSoFar.stream().map(r -> new 
HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
-    client.insert(writeRecords, newCommitTime);
-
-    // updates
-    newCommitTime = client.startCommit();
-    LOG.info("Starting commit " + newCommitTime);
-    List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = 
dataGen.generateUpdates(newCommitTime, 2);
-    records.addAll(toBeUpdated);
-    recordsSoFar.addAll(toBeUpdated);
-    writeRecords =
-        recordsSoFar.stream().map(r -> new 
HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
-    client.upsert(writeRecords, newCommitTime);
-
-    // Delete
-    newCommitTime = client.startCommit();
-    LOG.info("Starting commit " + newCommitTime);
-    // just delete half of the records
-    int numToDelete = recordsSoFar.size() / 2;
-    List<HoodieKey> toBeDeleted =
-        
recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
-    client.delete(toBeDeleted, newCommitTime);
-
-    client.close();
+
+    try (HoodieJavaWriteClient<HoodieAvroPayload> client =
+             new HoodieJavaWriteClient<>(new 
HoodieJavaEngineContext(hadoopConf), cfg)) {
+
+      // inserts
+      String newCommitTime = client.startCommit();
+      LOG.info("Starting commit " + newCommitTime);
+
+      List<HoodieRecord<HoodieAvroPayload>> records = 
dataGen.generateInserts(newCommitTime, 10);
+      List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new 
ArrayList<>(records);
+      List<HoodieRecord<HoodieAvroPayload>> writeRecords =
+          recordsSoFar.stream().map(r -> new 
HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
+      client.insert(writeRecords, newCommitTime);
+
+      // updates
+      newCommitTime = client.startCommit();
+      LOG.info("Starting commit " + newCommitTime);
+      List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = 
dataGen.generateUpdates(newCommitTime, 2);
+      records.addAll(toBeUpdated);
+      recordsSoFar.addAll(toBeUpdated);
+      writeRecords =
+          recordsSoFar.stream().map(r -> new 
HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
+      client.upsert(writeRecords, newCommitTime);
+
+      // Delete
+      newCommitTime = client.startCommit();
+      LOG.info("Starting commit " + newCommitTime);
+      // just delete half of the records
+      int numToDelete = recordsSoFar.size() / 2;
+      List<HoodieKey> toBeDeleted =
+          
recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
+      client.delete(toBeDeleted, newCommitTime);
+    }
   }
 }
diff --git 
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
 
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
index cbe505b7012..b57ce25671c 100644
--- 
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
+++ 
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
@@ -99,52 +99,52 @@ public class HoodieWriteClientExample {
               .withDeleteParallelism(2).forTable(tableName)
               
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
               
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 
30).build()).build();
-      SparkRDDWriteClient<HoodieAvroPayload> client = new 
SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
-
-      // inserts
-      String newCommitTime = client.startCommit();
-      LOG.info("Starting commit " + newCommitTime);
-
-      List<HoodieRecord<HoodieAvroPayload>> records = 
dataGen.generateInserts(newCommitTime, 10);
-      List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new 
ArrayList<>(records);
-      JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords = 
jsc.parallelize(records, 1);
-      client.insert(writeRecords, newCommitTime);
-
-      // updates
-      newCommitTime = client.startCommit();
-      LOG.info("Starting commit " + newCommitTime);
-      List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = 
dataGen.generateUpdates(newCommitTime, 2);
-      records.addAll(toBeUpdated);
-      recordsSoFar.addAll(toBeUpdated);
-      writeRecords = jsc.parallelize(records, 1);
-      client.upsert(writeRecords, newCommitTime);
-
-      // Delete
-      newCommitTime = client.startCommit();
-      LOG.info("Starting commit " + newCommitTime);
-      // just delete half of the records
-      int numToDelete = recordsSoFar.size() / 2;
-      List<HoodieKey> toBeDeleted = 
recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
-      JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
-      client.delete(deleteRecords, newCommitTime);
-
-      // Delete by partition
-      newCommitTime = client.startCommit();
-      client.startCommitWithTime(newCommitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
-      LOG.info("Starting commit " + newCommitTime);
-      // The partition where the data needs to be deleted
-      List<String> partitionList = toBeDeleted.stream().map(s -> 
s.getPartitionPath()).distinct().collect(Collectors.toList());
-      List<String> deleteList = recordsSoFar.stream().filter(f -> 
!partitionList.contains(f.getPartitionPath()))
-          .map(m -> 
m.getKey().getPartitionPath()).distinct().collect(Collectors.toList());
-      client.deletePartitions(deleteList, newCommitTime);
-
-      // compaction
-      if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) 
{
-        Option<String> instant = client.scheduleCompaction(Option.empty());
-        HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = 
client.compact(instant.get());
-        client.commitCompaction(instant.get(), 
compactionMetadata.getCommitMetadata().get(), Option.empty());
+      try (SparkRDDWriteClient<HoodieAvroPayload> client = new 
SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg)) {
+
+        // inserts
+        String newCommitTime = client.startCommit();
+        LOG.info("Starting commit " + newCommitTime);
+
+        List<HoodieRecord<HoodieAvroPayload>> records = 
dataGen.generateInserts(newCommitTime, 10);
+        List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new 
ArrayList<>(records);
+        JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords = 
jsc.parallelize(records, 1);
+        client.insert(writeRecords, newCommitTime);
+
+        // updates
+        newCommitTime = client.startCommit();
+        LOG.info("Starting commit " + newCommitTime);
+        List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = 
dataGen.generateUpdates(newCommitTime, 2);
+        records.addAll(toBeUpdated);
+        recordsSoFar.addAll(toBeUpdated);
+        writeRecords = jsc.parallelize(records, 1);
+        client.upsert(writeRecords, newCommitTime);
+
+        // Delete
+        newCommitTime = client.startCommit();
+        LOG.info("Starting commit " + newCommitTime);
+        // just delete half of the records
+        int numToDelete = recordsSoFar.size() / 2;
+        List<HoodieKey> toBeDeleted = 
recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
+        JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
+        client.delete(deleteRecords, newCommitTime);
+
+        // Delete by partition
+        newCommitTime = client.startCommit();
+        client.startCommitWithTime(newCommitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+        LOG.info("Starting commit " + newCommitTime);
+        // The partition where the data needs to be deleted
+        List<String> partitionList = toBeDeleted.stream().map(s -> 
s.getPartitionPath()).distinct().collect(Collectors.toList());
+        List<String> deleteList = recordsSoFar.stream().filter(f -> 
!partitionList.contains(f.getPartitionPath()))
+            .map(m -> 
m.getKey().getPartitionPath()).distinct().collect(Collectors.toList());
+        client.deletePartitions(deleteList, newCommitTime);
+
+        // compaction
+        if (HoodieTableType.valueOf(tableType) == 
HoodieTableType.MERGE_ON_READ) {
+          Option<String> instant = client.scheduleCompaction(Option.empty());
+          HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = 
client.compact(instant.get());
+          client.commitCompaction(instant.get(), 
compactionMetadata.getCommitMetadata().get(), Option.empty());
+        }
       }
-      client.close();
     }
   }
 
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java 
b/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
index 37c573a173c..5bc91ebed14 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/FileIOUtils.java
@@ -140,10 +140,10 @@ public class FileIOUtils {
   }
 
   public static void writeStringToFile(String str, String filePath) throws 
IOException {
-    PrintStream out = new PrintStream(new FileOutputStream(filePath));
-    out.println(str);
-    out.flush();
-    out.close();
+    try (PrintStream out = new PrintStream(new FileOutputStream(filePath))) {
+      out.println(str);
+      out.flush();
+    }
   }
 
   /**
@@ -174,9 +174,9 @@ public class FileIOUtils {
       }
 
       if (content.isPresent()) {
-        OutputStream out = fileSystem.create(fullPath, true);
-        out.write(content.get());
-        out.close();
+        try (OutputStream out = fileSystem.create(fullPath, true)) {
+          out.write(content.get());
+        }
       }
     } catch (IOException e) {
       LOG.warn("Failed to create file " + fullPath, e);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
index 8806ce46ea3..4194547894d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
@@ -107,11 +107,10 @@ public class HoodieCompactionAdminTool {
   private <T> void serializeOperationResult(FileSystem fs, T result) throws 
Exception {
     if ((cfg.outputPath != null) && (result != null)) {
       Path outputPath = new Path(cfg.outputPath);
-      OutputStream stream = fs.create(outputPath, true);
-      ObjectOutputStream out = new ObjectOutputStream(stream);
-      out.writeObject(result);
-      out.close();
-      stream.close();
+      try (OutputStream stream = fs.create(outputPath, true);
+           ObjectOutputStream out = new ObjectOutputStream(stream)) {
+        out.writeObject(result);
+      }
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
index 66b4382d784..669af8dca9f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SchedulerConfGenerator.java
@@ -131,9 +131,9 @@ public class SchedulerConfGenerator {
   private static String generateAndStoreConfig(Integer deltaSyncWeight, 
Integer compactionWeight,
       Integer deltaSyncMinShare, Integer compactionMinShare, Integer 
clusteringWeight, Integer clusteringMinShare) throws IOException {
     File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), 
".xml");
-    BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile));
-    bw.write(generateConfig(deltaSyncWeight, compactionWeight, 
deltaSyncMinShare, compactionMinShare, clusteringWeight, clusteringMinShare));
-    bw.close();
+    try (BufferedWriter bw = new BufferedWriter(new 
FileWriter(tempConfigFile))) {
+      bw.write(generateConfig(deltaSyncWeight, compactionWeight, 
deltaSyncMinShare, compactionMinShare, clusteringWeight, clusteringMinShare));
+    }
     // SPARK-35083 introduces remote scheduler pool files, so the file must 
include scheme since Spark 3.2
     String path = HoodieSparkUtils.gteqSpark3_2() ? 
tempConfigFile.toURI().toString() : tempConfigFile.getAbsolutePath();
     LOG.info("Configs written to file " + path);

Reply via email to