This is an automated email from the ASF dual-hosted git repository.
jarvis pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6fafe6f4d3 [Feature][connector-hive] hive sink connector support
overwrite mode #7843 (#7891)
6fafe6f4d3 is described below
commit 6fafe6f4d398f002a3ea4a9b6f2edbb1fe51cafc
Author: Adam Wang <[email protected]>
AuthorDate: Wed May 21 18:18:40 2025 +0800
[Feature][connector-hive] hive sink connector support overwrite mode #7843
(#7891)
Co-authored-by: wangxiaogang <[email protected]>
Co-authored-by: Jia Fan <[email protected]>
---
docs/en/connector-v2/sink/Hive.md | 5 +
.../file/sink/commit/FileAggregatedCommitInfo.java | 39 +++
.../hive/commit/HiveSinkAggregatedCommitter.java | 73 ++++++
.../seatunnel/hive/sink/HiveSinkOptions.java | 7 +
.../e2e/connector/hive/HiveOverwriteIT.java | 262 +++++++++++++++++++++
.../overwrite/fake_to_hive_overwrite_1.conf | 58 +++++
.../overwrite/fake_to_hive_overwrite_2.conf | 55 +++++
.../overwrite/fake_to_hive_overwrite_3.conf | 51 ++++
.../overwrite/hive_to_assert_overwrite_1.conf | 73 ++++++
.../overwrite/hive_to_assert_overwrite_2.conf | 73 ++++++
.../overwrite/hive_to_assert_overwrite_3.conf | 73 ++++++
11 files changed, 769 insertions(+)
diff --git a/docs/en/connector-v2/sink/Hive.md
b/docs/en/connector-v2/sink/Hive.md
index 3c35ed95f9..538462697b 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -47,6 +47,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| kerberos_keytab_path | string | no | - |
| abort_drop_partition_metadata | boolean | no | true |
| parquet_avro_write_timestamp_as_int96 | boolean | no | false |
+| overwrite | boolean | no | false |
| common-options | | no | - |
### table_name [string]
@@ -95,6 +96,10 @@ Flag to decide whether to drop partition metadata from Hive
Metastore during an
Support writing Parquet INT96 from a timestamp, only valid for parquet files.
+### overwrite [boolean]
+
+Flag to decide whether to use overwrite mode when inserting data into Hive. If
set to true, for non-partitioned tables, the existing data in the table will be
deleted before inserting new data. For partitioned tables, the data in the
relevant partition will be deleted before inserting new data.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
index f17d35352c..b5140827f2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
@@ -45,4 +45,43 @@ public class FileAggregatedCommitInfo implements
Serializable {
* <p>V is the list of partition column's values.
*/
private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("FileAggregatedCommitInfo{");
+
+ // Print transactionMap
+ sb.append("transactionMap={");
+ transactionMap.forEach(
+ (sourcePath, targetMap) -> {
+ sb.append("\n ").append(sourcePath).append("={");
+ targetMap.forEach(
+ (targetPath, value) -> {
+ sb.append("\n ")
+ .append(targetPath)
+ .append("=")
+ .append(value)
+ .append(",");
+ });
+ sb.append("\n },");
+ });
+ sb.append("\n},");
+
+ // Print partitionDirAndValuesMap
+ sb.append("partitionDirAndValuesMap={");
+ partitionDirAndValuesMap.forEach(
+ (partitionColumn, values) -> {
+ sb.append("\n ").append(partitionColumn).append("=[");
+ values.forEach(
+ value -> {
+ sb.append("\n ").append(value).append(",");
+ });
+ sb.append("\n ],");
+ });
+ sb.append("\n}");
+
+ sb.append("}");
+ return sb.toString();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index e5ad2ba2a6..aabc2febe7 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -29,8 +29,10 @@ import org.apache.thrift.TException;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
@@ -38,6 +40,7 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
private final String dbName;
private final String tableName;
private final boolean abortDropPartitionMetadata;
+ private final boolean overwrite;
private final ReadonlyConfig readonlyConfig;
private final HiveMetaStoreProxy hiveMetaStore;
@@ -51,11 +54,18 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
this.tableName = tableName;
this.abortDropPartitionMetadata =
readonlyConfig.get(HiveSinkOptions.ABORT_DROP_PARTITION_METADATA);
+ this.overwrite = readonlyConfig.get(HiveSinkOptions.OVERWRITE);
}
@Override
public List<FileAggregatedCommitInfo> commit(
List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws
IOException {
+ log.info("Aggregated commit infos: {}", aggregatedCommitInfos);
+ if (overwrite) {
+ log.info(
+ "start delete directories in aggregatedCommitInfos because
overwrite is enabled.");
+ deleteDirectories(aggregatedCommitInfos);
+ }
List<FileAggregatedCommitInfo> errorCommitInfos =
super.commit(aggregatedCommitInfos);
if (errorCommitInfos.isEmpty()) {
@@ -107,4 +117,67 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
super.close();
}
}
+
+ /**
+ * Deletes the partition directories based on the partition paths stored
in the aggregated
+ * commit information.
+ *
+ * <p>This method is invoked during the commit phase when the overwrite
option is enabled. It
+ * iterates over the partition directories specified in the commit
information and deletes the
+ * directories from the Hadoop file system.
+ *
+ * @param aggregatedCommitInfos
+ */
+ private void deleteDirectories(List<FileAggregatedCommitInfo>
aggregatedCommitInfos)
+ throws IOException {
+ if (aggregatedCommitInfos.isEmpty()) {
+ return;
+ }
+
+ for (FileAggregatedCommitInfo aggregatedCommitInfo :
aggregatedCommitInfos) {
+ LinkedHashMap<String, LinkedHashMap<String, String>>
transactionMap =
+ aggregatedCommitInfo.getTransactionMap();
+
+ // Do not delete if source data is empty
+ if (transactionMap.values().stream().allMatch(Map::isEmpty)) {
+ log.info("Data source is empty, no directories will be
deleted.");
+ continue;
+ }
+
+ try {
+ // Get the first target path from transactionMap
+ String targetPath =
+ transactionMap.values().stream()
+ .flatMap(m -> m.values().stream())
+ .findFirst()
+ .orElseThrow(
+ () -> new IllegalStateException("No
target paths found"));
+
+ if
(aggregatedCommitInfo.getPartitionDirAndValuesMap().isEmpty()) {
+ // For non-partitioned table, extract and delete table
directory
+ // Example:
hdfs://hadoop-master1:8020/warehouse/test_overwrite_1/
+ String tableDir = targetPath.substring(0,
targetPath.lastIndexOf('/'));
+ hadoopFileSystemProxy.deleteFile(tableDir);
+ log.info("Deleted table directory: {}", tableDir);
+ } else {
+ // For partitioned table, extract and delete partition
directories
+ // Example:
+ //
hdfs://hadoop-master1:8020/warehouse/test_overwrite_partition/age=26/
+ Set<String> partitionDirs =
+ transactionMap.values().stream()
+ .flatMap(m -> m.values().stream())
+ .map(path -> path.substring(0,
path.lastIndexOf('/')))
+ .collect(Collectors.toSet());
+
+ for (String partitionDir : partitionDirs) {
+ hadoopFileSystemProxy.deleteFile(partitionDir);
+ log.info("Deleted partition directory: {}",
partitionDir);
+ }
+ }
+ } catch (IOException e) {
+ log.error("Failed to delete directories", e);
+ throw e;
+ }
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
index 404244b411..10214d59bc 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
@@ -29,4 +29,11 @@ public class HiveSinkOptions extends HiveOptions {
.defaultValue(false)
.withDescription(
"Flag to decide whether to drop partition metadata
from Hive Metastore during an abort operation. Note: this only affects the
metadata in the metastore, the data in the partition will always be
deleted(data generated during the synchronization process).");
+
+ public static final Option<Boolean> OVERWRITE =
+ Options.key("overwrite")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Flag to decide whether to use overwrite mode when
inserting data into Hive. If set to true, for non-partitioned tables, the
existing data in the table will be deleted before inserting new data. For
partitioned tables, the data in the relevant partition will be deleted before
inserting new data.");
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveOverwriteIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveOverwriteIT.java
new file mode 100644
index 0000000000..727c38d237
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveOverwriteIT.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hive;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.lifecycle.Startables;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK})
+@Slf4j
+public class HiveOverwriteIT extends TestSuiteBase implements TestResource {
+ private static final String CREATE_SQL =
+ "CREATE TABLE test_hive_sink_on_hdfs_overwrite"
+ + "("
+ + " pk_id BIGINT,"
+ + " name STRING,"
+ + " score INT"
+ + ")";
+
+ private static final String HMS_HOST = "metastore";
+ private static final String HIVE_SERVER_HOST = "hiveserver2";
+
+ private String hiveExeUrl() {
+ return
"https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar";
+ }
+
+ private String libFb303Url() {
+ return
"https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar";
+ }
+
+ private String hadoopAwsUrl() {
+ return
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.4/hadoop-aws-3.1.4.jar";
+ }
+
+ private String aliyunSdkOssUrl() {
+ return
"https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.4.1/aliyun-sdk-oss-3.4.1.jar";
+ }
+
+ private String jdomUrl() {
+ return "https://repo1.maven.org/maven2/org/jdom/jdom/1.1/jdom-1.1.jar";
+ }
+
+ private String hadoopAliyunUrl() {
+ return
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.1.4/hadoop-aliyun-3.1.4.jar";
+ }
+
+ private String hadoopCosUrl() {
+ return
"https://repo1.maven.org/maven2/com/qcloud/cos/hadoop-cos/2.6.5-8.0.2/hadoop-cos-2.6.5-8.0.2.jar";
+ }
+
+ private HiveContainer hiveServerContainer;
+ private HiveContainer hmsContainer;
+ private Connection hiveConnection;
+ private String pluginHiveDir = "/tmp/seatunnel/plugins/Hive/lib";
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ // The jar of hive-exec
+ Container.ExecResult downloadHiveExeCommands =
+ container.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p "
+ + pluginHiveDir
+ + " && cd "
+ + pluginHiveDir
+ + " && wget "
+ + hiveExeUrl());
+ Assertions.assertEquals(
+ 0,
+ downloadHiveExeCommands.getExitCode(),
+ downloadHiveExeCommands.getStderr());
+ Container.ExecResult downloadLibFb303Commands =
+ container.execInContainer(
+ "sh", "-c", "cd " + pluginHiveDir + " && wget
" + libFb303Url());
+ Assertions.assertEquals(
+ 0,
+ downloadLibFb303Commands.getExitCode(),
+ downloadLibFb303Commands.getStderr());
+ // The jar of s3
+ Container.ExecResult downloadS3Commands =
+ container.execInContainer(
+ "sh", "-c", "cd " + pluginHiveDir + " && wget
" + hadoopAwsUrl());
+ Assertions.assertEquals(
+ 0, downloadS3Commands.getExitCode(),
downloadS3Commands.getStderr());
+ // The jar of oss
+ Container.ExecResult downloadOssCommands =
+ container.execInContainer(
+ "sh",
+ "-c",
+ "cd "
+ + pluginHiveDir
+ + " && wget "
+ + aliyunSdkOssUrl()
+ + " && wget "
+ + jdomUrl()
+ + " && wget "
+ + hadoopAliyunUrl());
+ Assertions.assertEquals(
+ 0, downloadOssCommands.getExitCode(),
downloadOssCommands.getStderr());
+ // The jar of cos
+ Container.ExecResult downloadCosCommands =
+ container.execInContainer(
+ "sh", "-c", "cd " + pluginHiveDir + " && wget
" + hadoopCosUrl());
+ Assertions.assertEquals(
+ 0, downloadCosCommands.getExitCode(),
downloadCosCommands.getStderr());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ hmsContainer =
+ HiveContainer.hmsStandalone()
+ .withCreateContainerCmdModifier(cmd ->
cmd.withName(HMS_HOST))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HMS_HOST);
+ hmsContainer.setPortBindings(Collections.singletonList("9083:9083"));
+
+ Startables.deepStart(Stream.of(hmsContainer)).join();
+ log.info("HMS just started");
+
+ hiveServerContainer =
+ HiveContainer.hiveServer()
+ .withNetwork(NETWORK)
+ .withCreateContainerCmdModifier(cmd ->
cmd.withName(HIVE_SERVER_HOST))
+ .withNetworkAliases(HIVE_SERVER_HOST)
+ .withFileSystemBind("/tmp/data", "/opt/hive/data")
+ .withEnv("SERVICE_OPTS",
"-Dhive.metastore.uris=thrift://metastore:9083")
+ .withEnv("IS_RESUME", "true")
+ .dependsOn(hmsContainer);
+
hiveServerContainer.setPortBindings(Collections.singletonList("10004:10000"));
+
+ Startables.deepStart(Stream.of(hiveServerContainer)).join();
+ log.info("HiveServer2 just started");
+ given().ignoreExceptions()
+ .await()
+ .atMost(360, TimeUnit.SECONDS)
+ .pollDelay(Duration.ofSeconds(10L))
+ .pollInterval(Duration.ofSeconds(3L))
+ .untilAsserted(this::initializeConnection);
+ prepareTable();
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (hmsContainer != null) {
+ log.info(hmsContainer.execInContainer("cat",
"/tmp/hive/hive.log").getStdout());
+ hmsContainer.close();
+ }
+ if (hiveServerContainer != null) {
+ log.info(hiveServerContainer.execInContainer("cat",
"/tmp/hive/hive.log").getStdout());
+ hiveServerContainer.close();
+ }
+ }
+
+ private void initializeConnection()
+ throws ClassNotFoundException, InstantiationException,
IllegalAccessException,
+ SQLException {
+ this.hiveConnection = this.hiveServerContainer.getConnection();
+ }
+
+ private void prepareTable() throws Exception {
+ log.info(
+ String.format(
+ "Databases are %s",
+
this.hmsContainer.createMetaStoreClient().getAllDatabases()));
+ try (Statement statement = this.hiveConnection.createStatement()) {
+ statement.execute(CREATE_SQL);
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw exception;
+ }
+ }
+
+ /**
+ * Tests the Hive sink connector with overwrite mode functionality. This
test validates the data
+ * insertion and overwrite capabilities of the Hive connector through a
series of operations:
+ *
+ * <p>1. First insertion: Inserts 3 records into the target Hive table
(table contains 3
+ * records) 2. Second insertion: Appends 2 more records (table contains 5
records) 3. Third
+ * insertion: Uses overwrite mode to insert 1 record (table now contains
only 1 record, previous
+ * data is overwritten)
+ *
+ * <p>Each operation is followed by an assertion job to verify the
expected data state.
+ *
+ * @param container The test container that provides the execution
environment
+ * @throws IOException If an I/O error occurs during job execution
+ * @throws InterruptedException If the job execution is interrupted
+ */
+ @TestTemplate
+ public void testFakeSinkHiveOverwrite(TestContainer container)
+ throws IOException, InterruptedException {
+ // Inserts 3 rows of data into the target table, resulting in the
table having 3 rows.
+ Container.ExecResult execResult1 =
+
container.executeJob("/overwrite/fake_to_hive_overwrite_1.conf");
+ Assertions.assertEquals(0, execResult1.getExitCode());
+
+ Container.ExecResult readResult1 =
+
container.executeJob("/overwrite/hive_to_assert_overwrite_1.conf");
+ Assertions.assertEquals(0, readResult1.getExitCode());
+
+ Container.ExecResult execResult2 =
+
container.executeJob("/overwrite/fake_to_hive_overwrite_2.conf");
+ Assertions.assertEquals(0, execResult2.getExitCode());
+
+ Container.ExecResult readResult2 =
+
container.executeJob("/overwrite/hive_to_assert_overwrite_2.conf");
+ Assertions.assertEquals(0, readResult2.getExitCode());
+
+ Container.ExecResult execResult3 =
+
container.executeJob("/overwrite/fake_to_hive_overwrite_3.conf");
+ Assertions.assertEquals(0, execResult3.getExitCode());
+
+ Container.ExecResult readResult3 =
+
container.executeJob("/overwrite/hive_to_assert_overwrite_3.conf");
+ Assertions.assertEquals(0, readResult3.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/fake_to_hive_overwrite_1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/fake_to_hive_overwrite_1.conf
new file mode 100644
index 0000000000..4ae3ff258b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/fake_to_hive_overwrite_1.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_overwrite"
+ metastore_uri = "thrift://metastore:9083"
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/fake_to_hive_overwrite_2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/fake_to_hive_overwrite_2.conf
new file mode 100644
index 0000000000..57d6be7ce2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/fake_to_hive_overwrite_2.conf
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [4, "D", 100]
+ },
+ {
+ kind = INSERT
+ fields = [5, "E", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_overwrite"
+ metastore_uri = "thrift://metastore:9083"
+ overwrite = true
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/fake_to_hive_overwrite_3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/fake_to_hive_overwrite_3.conf
new file mode 100644
index 0000000000..ef9d9e94a8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/fake_to_hive_overwrite_3.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [6, "F", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_overwrite"
+ metastore_uri = "thrift://metastore:9083"
+ overwrite = true
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/hive_to_assert_overwrite_1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/hive_to_assert_overwrite_1.conf
new file mode 100644
index 0000000000..1af98fb0dc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/hive_to_assert_overwrite_1.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_overwrite"
+ metastore_uri = "thrift://metastore:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ plugin_output = hive_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = hive_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/hive_to_assert_overwrite_2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/hive_to_assert_overwrite_2.conf
new file mode 100644
index 0000000000..592cc4dec2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/hive_to_assert_overwrite_2.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_overwrite"
+ metastore_uri = "thrift://metastore:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ plugin_output = hive_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = hive_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/hive_to_assert_overwrite_3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/hive_to_assert_overwrite_3.conf
new file mode 100644
index 0000000000..b19ae88ba7
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/overwrite/hive_to_assert_overwrite_3.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_overwrite"
+ metastore_uri = "thrift://metastore:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ plugin_output = hive_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = hive_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}