This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2a0a0b9d19 [Fix][Connector-V2] Fix add hive partition error when
partition already existed (#6577)
2a0a0b9d19 is described below
commit 2a0a0b9d19cca478f01fde32958b6dda33a45737
Author: Jia Fan <[email protected]>
AuthorDate: Tue Apr 2 10:50:20 2024 +0800
[Fix][Connector-V2] Fix add hive partition error when partition already
existed (#6577)
---
.../seatunnel/hive/commit/HiveSinkAggregatedCommitter.java | 3 ---
.../connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java | 7 ++++++-
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 0e423f3e87..0f57f86458 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -24,7 +24,6 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregated
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.thrift.TException;
import lombok.extern.slf4j.Slf4j;
@@ -71,8 +70,6 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
try {
hiveMetaStore.addPartitions(dbName, tableName, partitions);
log.info("Add these partitions {}", partitions);
- } catch (AlreadyExistsException e) {
- log.warn("These partitions {} are already exists",
partitions);
} catch (TException e) {
log.error("Failed to add these partitions {}", partitions,
e);
errorCommitInfos.add(aggregatedCommitInfo);
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index e2941340f8..6a1288b661 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -29,6 +29,7 @@ import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorExc
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
@@ -131,7 +132,11 @@ public class HiveMetaStoreProxy {
@NonNull String dbName, @NonNull String tableName, List<String>
partitions)
throws TException {
for (String partition : partitions) {
- hiveMetaStoreClient.appendPartition(dbName, tableName, partition);
+ try {
+ hiveMetaStoreClient.appendPartition(dbName, tableName,
partition);
+ } catch (AlreadyExistsException e) {
+ log.warn("The partition {} are already exists", partition);
+ }
}
}