This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c23804f13b [HiveSink]Fix the risk of resource leakage. (#6721)
c23804f13b is described below
commit c23804f13b8e39b0a788b2a6c048c3f20e2111d5
Author: lightzhao <[email protected]>
AuthorDate: Fri Apr 19 13:47:30 2024 +0800
[HiveSink]Fix the risk of resource leakage. (#6721)
Co-authored-by: lightzhao <[email protected]>
---
.../hive/commit/HiveSinkAggregatedCommitter.java | 33 ++++++++++++----------
1 file changed, 18 insertions(+), 15 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 0f57f86458..2f2bc972a0 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -57,26 +57,29 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
@Override
public List<FileAggregatedCommitInfo> commit(
List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws
IOException {
- HiveMetaStoreProxy hiveMetaStore =
HiveMetaStoreProxy.getInstance(pluginConfig);
List<FileAggregatedCommitInfo> errorCommitInfos =
super.commit(aggregatedCommitInfos);
if (errorCommitInfos.isEmpty()) {
- for (FileAggregatedCommitInfo aggregatedCommitInfo :
aggregatedCommitInfos) {
- Map<String, List<String>> partitionDirAndValuesMap =
- aggregatedCommitInfo.getPartitionDirAndValuesMap();
- List<String> partitions =
- partitionDirAndValuesMap.keySet().stream()
- .map(partition -> partition.replaceAll("\\\\",
"/"))
- .collect(Collectors.toList());
- try {
- hiveMetaStore.addPartitions(dbName, tableName, partitions);
- log.info("Add these partitions {}", partitions);
- } catch (TException e) {
- log.error("Failed to add these partitions {}", partitions,
e);
- errorCommitInfos.add(aggregatedCommitInfo);
+ HiveMetaStoreProxy hiveMetaStore =
HiveMetaStoreProxy.getInstance(pluginConfig);
+ try {
+ for (FileAggregatedCommitInfo aggregatedCommitInfo :
aggregatedCommitInfos) {
+ Map<String, List<String>> partitionDirAndValuesMap =
+ aggregatedCommitInfo.getPartitionDirAndValuesMap();
+ List<String> partitions =
+ partitionDirAndValuesMap.keySet().stream()
+ .map(partition ->
partition.replaceAll("\\\\", "/"))
+ .collect(Collectors.toList());
+ try {
+ hiveMetaStore.addPartitions(dbName, tableName,
partitions);
+ log.info("Add these partitions {}", partitions);
+ } catch (TException e) {
+ log.error("Failed to add these partitions {}",
partitions, e);
+ errorCommitInfos.add(aggregatedCommitInfo);
+ }
}
+ } finally {
+ hiveMetaStore.close();
}
}
- hiveMetaStore.close();
return errorCommitInfos;
}