This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c23804f13b [HiveSink]Fix the risk of resource leakage. (#6721)
c23804f13b is described below

commit c23804f13b8e39b0a788b2a6c048c3f20e2111d5
Author: lightzhao <[email protected]>
AuthorDate: Fri Apr 19 13:47:30 2024 +0800

    [HiveSink]Fix the risk of resource leakage. (#6721)
    
    Co-authored-by: lightzhao <[email protected]>
---
 .../hive/commit/HiveSinkAggregatedCommitter.java   | 33 ++++++++++++----------
 1 file changed, 18 insertions(+), 15 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 0f57f86458..2f2bc972a0 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -57,26 +57,29 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
     @Override
     public List<FileAggregatedCommitInfo> commit(
             List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws 
IOException {
-        HiveMetaStoreProxy hiveMetaStore = 
HiveMetaStoreProxy.getInstance(pluginConfig);
         List<FileAggregatedCommitInfo> errorCommitInfos = 
super.commit(aggregatedCommitInfos);
         if (errorCommitInfos.isEmpty()) {
-            for (FileAggregatedCommitInfo aggregatedCommitInfo : 
aggregatedCommitInfos) {
-                Map<String, List<String>> partitionDirAndValuesMap =
-                        aggregatedCommitInfo.getPartitionDirAndValuesMap();
-                List<String> partitions =
-                        partitionDirAndValuesMap.keySet().stream()
-                                .map(partition -> partition.replaceAll("\\\\", 
"/"))
-                                .collect(Collectors.toList());
-                try {
-                    hiveMetaStore.addPartitions(dbName, tableName, partitions);
-                    log.info("Add these partitions {}", partitions);
-                } catch (TException e) {
-                    log.error("Failed to add these partitions {}", partitions, 
e);
-                    errorCommitInfos.add(aggregatedCommitInfo);
+            HiveMetaStoreProxy hiveMetaStore = 
HiveMetaStoreProxy.getInstance(pluginConfig);
+            try {
+                for (FileAggregatedCommitInfo aggregatedCommitInfo : 
aggregatedCommitInfos) {
+                    Map<String, List<String>> partitionDirAndValuesMap =
+                            aggregatedCommitInfo.getPartitionDirAndValuesMap();
+                    List<String> partitions =
+                            partitionDirAndValuesMap.keySet().stream()
+                                    .map(partition -> 
partition.replaceAll("\\\\", "/"))
+                                    .collect(Collectors.toList());
+                    try {
+                        hiveMetaStore.addPartitions(dbName, tableName, 
partitions);
+                        log.info("Add these partitions {}", partitions);
+                    } catch (TException e) {
+                        log.error("Failed to add these partitions {}", 
partitions, e);
+                        errorCommitInfos.add(aggregatedCommitInfo);
+                    }
                 }
+            } finally {
+                hiveMetaStore.close();
             }
         }
-        hiveMetaStore.close();
         return errorCommitInfos;
     }
 

Reply via email to