This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new edef87b523 [Improve][Connector-v2][HiveSink]remove drop partition when
abort. (#4940)
edef87b523 is described below
commit edef87b5234613c7fe4deb911bf7fb5ba70946f0
Author: lightzhao <[email protected]>
AuthorDate: Mon Aug 14 15:58:10 2023 +0800
[Improve][Connector-v2][HiveSink]remove drop partition when abort. (#4940)
Co-authored-by: lightzhao <[email protected]>
Co-authored-by: liuli <[email protected]>
Co-authored-by: ic4y <[email protected]>
---
docs/en/connector-v2/source/Hive.md | 27 +++++++++-------
.../hive/commit/HiveSinkAggregatedCommitter.java | 37 ++++++++++++++--------
.../seatunnel/hive/config/HiveConfig.java | 8 +++++
.../seatunnel/hive/sink/HiveSinkFactory.java | 3 ++
4 files changed, 50 insertions(+), 25 deletions(-)
diff --git a/docs/en/connector-v2/source/Hive.md
b/docs/en/connector-v2/source/Hive.md
index f9f35aaf73..afa9893d5b 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -33,17 +33,18 @@ Read all the data in a split in a pollNext call. What
splits are read will be sa
## Options
-| name | type | required | default value |
-|----------------------|--------|----------|---------------|
-| table_name | string | yes | - |
-| metastore_uri | string | yes | - |
-| kerberos_principal | string | no | - |
-| kerberos_keytab_path | string | no | - |
-| hdfs_site_path | string | no | - |
-| hive_site_path | string | no | - |
-| read_partitions | list | no | - |
-| read_columns | list | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|-------------------------------|---------|----------|---------------|
+| table_name | string | yes | - |
+| metastore_uri | string | yes | - |
+| kerberos_principal | string | no | - |
+| kerberos_keytab_path | string | no | - |
+| hdfs_site_path | string | no | - |
+| hive_site_path | string | no | - |
+| read_partitions | list | no | - |
+| read_columns | list | no | - |
+| abort_drop_partition_metadata | boolean | no | true |
+| common-options | | no | - |
### table_name [string]
@@ -80,6 +81,10 @@ The keytab file path of kerberos authentication
The read column list of the data source, user can use it to implement field
projection.
+### abort_drop_partition_metadata [list]
+
+Flag to decide whether to drop partition metadata from Hive Metastore during
an abort operation. Note: this only affects the metadata in the metastore, the
data in the partition will always be deleted(data generated during the
synchronization process).
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 7d7c271e1d..4934cc2aa1 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -34,11 +34,14 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA;
+
@Slf4j
public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
private final Config pluginConfig;
private final String dbName;
private final String tableName;
+ private final boolean abortDropPartitionMetadata;
public HiveSinkAggregatedCommitter(
Config pluginConfig, String dbName, String tableName,
FileSystemUtils fileSystemUtils) {
@@ -46,6 +49,10 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
this.pluginConfig = pluginConfig;
this.dbName = dbName;
this.tableName = tableName;
+ this.abortDropPartitionMetadata =
+ pluginConfig.hasPath(ABORT_DROP_PARTITION_METADATA.key())
+ ?
pluginConfig.getBoolean(ABORT_DROP_PARTITION_METADATA.key())
+ : ABORT_DROP_PARTITION_METADATA.defaultValue();
}
@Override
@@ -79,21 +86,23 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
@Override
public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos)
throws Exception {
super.abort(aggregatedCommitInfos);
- HiveMetaStoreProxy hiveMetaStore =
HiveMetaStoreProxy.getInstance(pluginConfig);
- for (FileAggregatedCommitInfo aggregatedCommitInfo :
aggregatedCommitInfos) {
- Map<String, List<String>> partitionDirAndValuesMap =
- aggregatedCommitInfo.getPartitionDirAndValuesMap();
- List<String> partitions =
- partitionDirAndValuesMap.keySet().stream()
- .map(partition -> partition.replaceAll("\\\\",
"/"))
- .collect(Collectors.toList());
- try {
- hiveMetaStore.dropPartitions(dbName, tableName, partitions);
- log.info("Remove these partitions {}", partitions);
- } catch (TException e) {
- log.error("Failed to remove these partitions {}", partitions,
e);
+ if (abortDropPartitionMetadata) {
+ HiveMetaStoreProxy hiveMetaStore =
HiveMetaStoreProxy.getInstance(pluginConfig);
+ for (FileAggregatedCommitInfo aggregatedCommitInfo :
aggregatedCommitInfos) {
+ Map<String, List<String>> partitionDirAndValuesMap =
+ aggregatedCommitInfo.getPartitionDirAndValuesMap();
+ List<String> partitions =
+ partitionDirAndValuesMap.keySet().stream()
+ .map(partition -> partition.replaceAll("\\\\",
"/"))
+ .collect(Collectors.toList());
+ try {
+ hiveMetaStore.dropPartitions(dbName, tableName,
partitions);
+ log.info("Remove these partitions {}", partitions);
+ } catch (TException e) {
+ log.error("Failed to remove these partitions {}",
partitions, e);
+ }
}
+ hiveMetaStore.close();
}
- hiveMetaStore.close();
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
index 142863b513..8cf00b8c30 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
@@ -38,11 +38,19 @@ public class HiveConfig {
.noDefaultValue()
.withDescription("Hive metastore uri");
+ public static final Option<Boolean> ABORT_DROP_PARTITION_METADATA =
+ Options.key("abort_drop_partition_metadata")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Flag to decide whether to drop partition metadata
from Hive Metastore during an abort operation. Note: this only affects the
metadata in the metastore, the data in the partition will always be
deleted(data generated during the synchronization process).");
+
public static final Option<String> HIVE_SITE_PATH =
Options.key("hive_site_path")
.stringType()
.noDefaultValue()
.withDescription("The path of hive-site.xml");
+
public static final String TEXT_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.mapred.TextInputFormat";
public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
index 6674b778c4..b98f6cffa5 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
@@ -24,6 +24,8 @@ import
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import com.google.auto.service.AutoService;
+import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA;
+
@AutoService(Factory.class)
public class HiveSinkFactory implements TableSinkFactory {
@Override
@@ -36,6 +38,7 @@ public class HiveSinkFactory implements TableSinkFactory {
return OptionRule.builder()
.required(HiveConfig.TABLE_NAME)
.required(HiveConfig.METASTORE_URI)
+ .optional(ABORT_DROP_PARTITION_METADATA)
.build();
}
}