This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 51d122b  Close Hoodie Clients which are opened to properly shutdown 
embedded timeline service
51d122b is described below

commit 51d122b5c39525720ce946755f57326ce24f6e38
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Tue Jun 11 12:58:58 2019 -0700

    Close Hoodie Clients which are opened to properly shutdown embedded 
timeline service
---
 .../main/java/com/uber/hoodie/DataSourceUtils.java |  7 +-
 .../utilities/HoodieCompactionAdminTool.java       | 87 ++++++++++++----------
 2 files changed, 52 insertions(+), 42 deletions(-)

diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java 
b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
index 0d70f8a..3401266 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
@@ -183,14 +183,19 @@ public class DataSourceUtils {
   public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
       JavaRDD<HoodieRecord> incomingHoodieRecords,
       HoodieWriteConfig writeConfig) throws Exception {
+    HoodieReadClient client = null;
     try {
-      HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig);
+      client = new HoodieReadClient<>(jssc, writeConfig);
       return client.tagLocation(incomingHoodieRecords)
           .filter(r -> !((HoodieRecord<HoodieRecordPayload>) 
r).isCurrentLocationKnown());
     } catch (DatasetNotFoundException e) {
       // this will be executed when there is no hoodie dataset yet
       // so no dups to drop
       return incomingHoodieRecords;
+    } finally {
+      if (null != client) {
+        client.close();
+      }
     }
   }
 
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java
index 4f5c95d..0c640f6 100644
--- 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactionAdminTool.java
@@ -58,47 +58,52 @@ public class HoodieCompactionAdminTool {
    */
   public void run(JavaSparkContext jsc) throws Exception {
     HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath);
-    CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath);
-    final FileSystem fs = FSUtils.getFs(cfg.basePath, 
jsc.hadoopConfiguration());
-    if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) {
-      throw new IllegalStateException("Output File Path already exists");
-    }
-    switch (cfg.operation) {
-      case VALIDATE:
-        List<ValidationOpResult> res =
-            admin.validateCompactionPlan(metaClient, 
cfg.compactionInstantTime, cfg.parallelism);
-        if (cfg.printOutput) {
-          printOperationResult("Result of Validation Operation :", res);
-        }
-        serializeOperationResult(fs, res);
-        break;
-      case UNSCHEDULE_FILE:
-        List<RenameOpResult> r =
-            admin.unscheduleCompactionFileId(new 
HoodieFileGroupId(cfg.partitionPath, cfg.fileId),
-                cfg.skipValidation, cfg.dryRun);
-        if (cfg.printOutput) {
-          System.out.println(r);
-        }
-        serializeOperationResult(fs, r);
-        break;
-      case UNSCHEDULE_PLAN:
-        List<RenameOpResult> r2 =
-            admin.unscheduleCompactionPlan(cfg.compactionInstantTime, 
cfg.skipValidation, cfg.parallelism, cfg.dryRun);
-        if (cfg.printOutput) {
-          printOperationResult("Result of Unscheduling Compaction Plan :", r2);
-        }
-        serializeOperationResult(fs, r2);
-        break;
-      case REPAIR:
-        List<RenameOpResult> r3 =
-            admin.repairCompaction(cfg.compactionInstantTime, cfg.parallelism, 
cfg.dryRun);
-        if (cfg.printOutput) {
-          printOperationResult("Result of Repair Operation :", r3);
-        }
-        serializeOperationResult(fs, r3);
-        break;
-      default:
-        throw new IllegalStateException("Not yet implemented !!");
+    final CompactionAdminClient admin = new CompactionAdminClient(jsc, 
cfg.basePath);
+    try {
+      final FileSystem fs = FSUtils.getFs(cfg.basePath, 
jsc.hadoopConfiguration());
+      if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) {
+        throw new IllegalStateException("Output File Path already exists");
+      }
+      switch (cfg.operation) {
+        case VALIDATE:
+          List<ValidationOpResult> res =
+              admin.validateCompactionPlan(metaClient, 
cfg.compactionInstantTime, cfg.parallelism);
+          if (cfg.printOutput) {
+            printOperationResult("Result of Validation Operation :", res);
+          }
+          serializeOperationResult(fs, res);
+          break;
+        case UNSCHEDULE_FILE:
+          List<RenameOpResult> r =
+              admin.unscheduleCompactionFileId(new 
HoodieFileGroupId(cfg.partitionPath, cfg.fileId),
+                  cfg.skipValidation, cfg.dryRun);
+          if (cfg.printOutput) {
+            System.out.println(r);
+          }
+          serializeOperationResult(fs, r);
+          break;
+        case UNSCHEDULE_PLAN:
+          List<RenameOpResult> r2 =
+              admin
+                  .unscheduleCompactionPlan(cfg.compactionInstantTime, 
cfg.skipValidation, cfg.parallelism, cfg.dryRun);
+          if (cfg.printOutput) {
+            printOperationResult("Result of Unscheduling Compaction Plan :", 
r2);
+          }
+          serializeOperationResult(fs, r2);
+          break;
+        case REPAIR:
+          List<RenameOpResult> r3 =
+              admin.repairCompaction(cfg.compactionInstantTime, 
cfg.parallelism, cfg.dryRun);
+          if (cfg.printOutput) {
+            printOperationResult("Result of Repair Operation :", r3);
+          }
+          serializeOperationResult(fs, r3);
+          break;
+        default:
+          throw new IllegalStateException("Not yet implemented !!");
+      }
+    } finally {
+      admin.close();
     }
   }
 

Reply via email to