This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 51d122b Close Hoodie Clients which are opened to properly shutdown
embedded timeline service
51d122b is described below
commit 51d122b5c39525720ce946755f57326ce24f6e38
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Tue Jun 11 12:58:58 2019 -0700
Close Hoodie Clients which are opened to properly shutdown embedded
timeline service
---
.../main/java/com/uber/hoodie/DataSourceUtils.java | 7 +-
.../utilities/HoodieCompactionAdminTool.java | 87 ++++++++++++----------
2 files changed, 52 insertions(+), 42 deletions(-)
diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
index 0d70f8a..3401266 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
@@ -183,14 +183,19 @@ public class DataSourceUtils {
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig) throws Exception {
+ HoodieReadClient client = null;
try {
- HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig);
+ client = new HoodieReadClient<>(jssc, writeConfig);
return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>)
r).isCurrentLocationKnown());
} catch (DatasetNotFoundException e) {
// this will be executed when there is no hoodie dataset yet
// so no dups to drop
return incomingHoodieRecords;
+ } finally {
+ if (null != client) {
+ client.close();
+ }
}
}
diff --git
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java
index 4f5c95d..0c640f6 100644
---
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java
+++
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java
@@ -58,47 +58,52 @@ public class HoodieCompactionAdminTool {
*/
public void run(JavaSparkContext jsc) throws Exception {
HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath);
- CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath);
- final FileSystem fs = FSUtils.getFs(cfg.basePath,
jsc.hadoopConfiguration());
- if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) {
- throw new IllegalStateException("Output File Path already exists");
- }
- switch (cfg.operation) {
- case VALIDATE:
- List<ValidationOpResult> res =
- admin.validateCompactionPlan(metaClient,
cfg.compactionInstantTime, cfg.parallelism);
- if (cfg.printOutput) {
- printOperationResult("Result of Validation Operation :", res);
- }
- serializeOperationResult(fs, res);
- break;
- case UNSCHEDULE_FILE:
- List<RenameOpResult> r =
- admin.unscheduleCompactionFileId(new
HoodieFileGroupId(cfg.partitionPath, cfg.fileId),
- cfg.skipValidation, cfg.dryRun);
- if (cfg.printOutput) {
- System.out.println(r);
- }
- serializeOperationResult(fs, r);
- break;
- case UNSCHEDULE_PLAN:
- List<RenameOpResult> r2 =
- admin.unscheduleCompactionPlan(cfg.compactionInstantTime,
cfg.skipValidation, cfg.parallelism, cfg.dryRun);
- if (cfg.printOutput) {
- printOperationResult("Result of Unscheduling Compaction Plan :", r2);
- }
- serializeOperationResult(fs, r2);
- break;
- case REPAIR:
- List<RenameOpResult> r3 =
- admin.repairCompaction(cfg.compactionInstantTime, cfg.parallelism,
cfg.dryRun);
- if (cfg.printOutput) {
- printOperationResult("Result of Repair Operation :", r3);
- }
- serializeOperationResult(fs, r3);
- break;
- default:
- throw new IllegalStateException("Not yet implemented !!");
+ final CompactionAdminClient admin = new CompactionAdminClient(jsc,
cfg.basePath);
+ try {
+ final FileSystem fs = FSUtils.getFs(cfg.basePath,
jsc.hadoopConfiguration());
+ if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) {
+ throw new IllegalStateException("Output File Path already exists");
+ }
+ switch (cfg.operation) {
+ case VALIDATE:
+ List<ValidationOpResult> res =
+ admin.validateCompactionPlan(metaClient,
cfg.compactionInstantTime, cfg.parallelism);
+ if (cfg.printOutput) {
+ printOperationResult("Result of Validation Operation :", res);
+ }
+ serializeOperationResult(fs, res);
+ break;
+ case UNSCHEDULE_FILE:
+ List<RenameOpResult> r =
+ admin.unscheduleCompactionFileId(new
HoodieFileGroupId(cfg.partitionPath, cfg.fileId),
+ cfg.skipValidation, cfg.dryRun);
+ if (cfg.printOutput) {
+ System.out.println(r);
+ }
+ serializeOperationResult(fs, r);
+ break;
+ case UNSCHEDULE_PLAN:
+ List<RenameOpResult> r2 =
+ admin
+ .unscheduleCompactionPlan(cfg.compactionInstantTime,
cfg.skipValidation, cfg.parallelism, cfg.dryRun);
+ if (cfg.printOutput) {
+ printOperationResult("Result of Unscheduling Compaction Plan :",
r2);
+ }
+ serializeOperationResult(fs, r2);
+ break;
+ case REPAIR:
+ List<RenameOpResult> r3 =
+ admin.repairCompaction(cfg.compactionInstantTime,
cfg.parallelism, cfg.dryRun);
+ if (cfg.printOutput) {
+ printOperationResult("Result of Repair Operation :", r3);
+ }
+ serializeOperationResult(fs, r3);
+ break;
+ default:
+ throw new IllegalStateException("Not yet implemented !!");
+ }
+ } finally {
+ admin.close();
}
}