This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new edef87b523 [Improve][Connector-v2][HiveSink]remove drop partition when 
abort. (#4940)
edef87b523 is described below

commit edef87b5234613c7fe4deb911bf7fb5ba70946f0
Author: lightzhao <[email protected]>
AuthorDate: Mon Aug 14 15:58:10 2023 +0800

    [Improve][Connector-v2][HiveSink]remove drop partition when abort. (#4940)
    
    Co-authored-by: lightzhao <[email protected]>
    Co-authored-by: liuli <[email protected]>
    Co-authored-by: ic4y <[email protected]>
---
 docs/en/connector-v2/source/Hive.md                | 27 +++++++++-------
 .../hive/commit/HiveSinkAggregatedCommitter.java   | 37 ++++++++++++++--------
 .../seatunnel/hive/config/HiveConfig.java          |  8 +++++
 .../seatunnel/hive/sink/HiveSinkFactory.java       |  3 ++
 4 files changed, 50 insertions(+), 25 deletions(-)

diff --git a/docs/en/connector-v2/source/Hive.md 
b/docs/en/connector-v2/source/Hive.md
index f9f35aaf73..afa9893d5b 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -33,17 +33,18 @@ Read all the data in a split in a pollNext call. What 
splits are read will be sa
 
 ## Options
 
-|         name         |  type  | required | default value |
-|----------------------|--------|----------|---------------|
-| table_name           | string | yes      | -             |
-| metastore_uri        | string | yes      | -             |
-| kerberos_principal   | string | no       | -             |
-| kerberos_keytab_path | string | no       | -             |
-| hdfs_site_path       | string | no       | -             |
-| hive_site_path       | string | no       | -             |
-| read_partitions      | list   | no       | -             |
-| read_columns         | list   | no       | -             |
-| common-options       |        | no       | -             |
+|             name              |  type   | required | default value |
+|-------------------------------|---------|----------|---------------|
+| table_name                    | string  | yes      | -             |
+| metastore_uri                 | string  | yes      | -             |
+| kerberos_principal            | string  | no       | -             |
+| kerberos_keytab_path          | string  | no       | -             |
+| hdfs_site_path                | string  | no       | -             |
+| hive_site_path                | string  | no       | -             |
+| read_partitions               | list    | no       | -             |
+| read_columns                  | list    | no       | -             |
+| abort_drop_partition_metadata | boolean | no       | true          |
+| common-options                |         | no       | -             |
 
 ### table_name [string]
 
@@ -80,6 +81,10 @@ The keytab file path of kerberos authentication
 
 The read column list of the data source, user can use it to implement field 
projection.
 
+### abort_drop_partition_metadata [list]
+
+Flag to decide whether to drop partition metadata from Hive Metastore during 
an abort operation. Note: this only affects the metadata in the metastore, the 
data in the partition will always be deleted(data generated during the 
synchronization process).
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 7d7c271e1d..4934cc2aa1 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -34,11 +34,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA;
+
 @Slf4j
 public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
     private final Config pluginConfig;
     private final String dbName;
     private final String tableName;
+    private final boolean abortDropPartitionMetadata;
 
     public HiveSinkAggregatedCommitter(
             Config pluginConfig, String dbName, String tableName, 
FileSystemUtils fileSystemUtils) {
@@ -46,6 +49,10 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
         this.pluginConfig = pluginConfig;
         this.dbName = dbName;
         this.tableName = tableName;
+        this.abortDropPartitionMetadata =
+                pluginConfig.hasPath(ABORT_DROP_PARTITION_METADATA.key())
+                        ? 
pluginConfig.getBoolean(ABORT_DROP_PARTITION_METADATA.key())
+                        : ABORT_DROP_PARTITION_METADATA.defaultValue();
     }
 
     @Override
@@ -79,21 +86,23 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
     @Override
     public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) 
throws Exception {
         super.abort(aggregatedCommitInfos);
-        HiveMetaStoreProxy hiveMetaStore = 
HiveMetaStoreProxy.getInstance(pluginConfig);
-        for (FileAggregatedCommitInfo aggregatedCommitInfo : 
aggregatedCommitInfos) {
-            Map<String, List<String>> partitionDirAndValuesMap =
-                    aggregatedCommitInfo.getPartitionDirAndValuesMap();
-            List<String> partitions =
-                    partitionDirAndValuesMap.keySet().stream()
-                            .map(partition -> partition.replaceAll("\\\\", 
"/"))
-                            .collect(Collectors.toList());
-            try {
-                hiveMetaStore.dropPartitions(dbName, tableName, partitions);
-                log.info("Remove these partitions {}", partitions);
-            } catch (TException e) {
-                log.error("Failed to remove these partitions {}", partitions, 
e);
+        if (abortDropPartitionMetadata) {
+            HiveMetaStoreProxy hiveMetaStore = 
HiveMetaStoreProxy.getInstance(pluginConfig);
+            for (FileAggregatedCommitInfo aggregatedCommitInfo : 
aggregatedCommitInfos) {
+                Map<String, List<String>> partitionDirAndValuesMap =
+                        aggregatedCommitInfo.getPartitionDirAndValuesMap();
+                List<String> partitions =
+                        partitionDirAndValuesMap.keySet().stream()
+                                .map(partition -> partition.replaceAll("\\\\", 
"/"))
+                                .collect(Collectors.toList());
+                try {
+                    hiveMetaStore.dropPartitions(dbName, tableName, 
partitions);
+                    log.info("Remove these partitions {}", partitions);
+                } catch (TException e) {
+                    log.error("Failed to remove these partitions {}", 
partitions, e);
+                }
             }
+            hiveMetaStore.close();
         }
-        hiveMetaStore.close();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
index 142863b513..8cf00b8c30 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
@@ -38,11 +38,19 @@ public class HiveConfig {
                     .noDefaultValue()
                     .withDescription("Hive metastore uri");
 
+    public static final Option<Boolean> ABORT_DROP_PARTITION_METADATA =
+            Options.key("abort_drop_partition_metadata")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Flag to decide whether to drop partition metadata 
from Hive Metastore during an abort operation. Note: this only affects the 
metadata in the metastore, the data in the partition will always be 
deleted(data generated during the synchronization process).");
+
     public static final Option<String> HIVE_SITE_PATH =
             Options.key("hive_site_path")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The path of hive-site.xml");
+
     public static final String TEXT_INPUT_FORMAT_CLASSNAME =
             "org.apache.hadoop.mapred.TextInputFormat";
     public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
index 6674b778c4..b98f6cffa5 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
@@ -24,6 +24,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 
 import com.google.auto.service.AutoService;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA;
+
 @AutoService(Factory.class)
 public class HiveSinkFactory implements TableSinkFactory {
     @Override
@@ -36,6 +38,7 @@ public class HiveSinkFactory implements TableSinkFactory {
         return OptionRule.builder()
                 .required(HiveConfig.TABLE_NAME)
                 .required(HiveConfig.METASTORE_URI)
+                .optional(ABORT_DROP_PARTITION_METADATA)
                 .build();
     }
 }

Reply via email to