This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 23ad4ee73 [Connector-V2] Add Hive sink connector v2 (#2158)
23ad4ee73 is described below

commit 23ad4ee735b68c097b2564269580c6c584044f62
Author: Eric <[email protected]>
AuthorDate: Sat Jul 16 10:12:22 2022 +0800

    [Connector-V2] Add Hive sink connector v2 (#2158)
    
    * tmp commit
    
    * add hadoop2 and hadoop3 shade jar
    
    * add hadoop2 and hadoop3 shade jar
    
    * add license head
    
    * change know denpendencies
    
    * tmp commit
    
    * tmp commit
    
    * change hadoop dependency scope to provide
    
    * back pom
    
    * fix checkstyle
    
    * add example
    
    * fix example bug
    
    * remove file connector from example and e2e because hadoop2 can not 
compile with jdk11
    
    * no need jdk8 and jdk11 profile because we don't use hadoop shade jar
    
    * change hadoop jar dependency scope to provided
    
    * back
    
    * file connector can not build in jdk11
    
    * drop hadoop shade
    
    * add gitignore item
    
    * add hadoop and local file sink
    
    * fix pom error
    
    * fix pom error
    
    * fix pom error
    
    * implement new interface
    
    * fix UT error
    
    * fix e2e error
    
    * update build timeout from 30min to 40min
    
    * fix e2e error
    
    * remove auto service
    
    * fix e2e error
    
    * fix e2e error
    
    * fix e2e error
    
    * found e2e error
    
    * fix e2e error
    
    * fix e2e error
    
    * fix e2e error
    
    * merge from upstream
    
    * merge from upstream
    
    * merge from upstream
    
    * merge from upstream
    
    * merge from upstream
    
    * add mvn jvm option
    
    * add mvn jvm option
    
    * add license
    
    * add licnese
    
    * add licnese
    
    * fix dependency
    
    * fix build jvm oom
    
    * fix build jvm oom
    
    * fix build jvm oom
    
    * fix dependency
    
    * fix dependency
    
    * fix e2e error
    
    * add codeql check timeout from 30min to 60min
    
    * merge from dev
    
    * merge from dev
    
    * fix ci error
    
    * fix checkstyle
    
    * fix ci
    
    * fix ci
    
    * aa
    
    * aa
    
    * aa
    
    * add .idea
    
    * del .idea
    
    * del .idea
    
    * del .idea
    
    * del .idea
    
    * remove no use license
    
    * remove no use before and after method in test
    
    * fix license; remove dependency
    
    * fix review
    
    * fix build order
    
    * fix license
    
    * fix license
    
    * fix review
    
    * fix review
    
    * fix review
    
    * fix review
    
    * fix review
    
    * fix review
    
    * fix review
    
    * fix review
    
    * fix review
    
    * add code-analysys timeout to 120
    
    * retry ci
    
    * update license and remove no use jar from LICENSE file
    
    * retry ci
    
    * add hive sink
    
    * add hive sink connector doc
    
    * add hive sink connector doc
    
    * fix checkstyle error.
    
    * fix bug
    
    * tmp
    
    * fix hive shade error
    
    * fix hive shade error
    
    * fix commit bug
    
    * optimaze doc
    
    * optimaze doc
    
    * optimize doc
    
    * optimize code
---
 docs/en/connector-v2/sink/File.mdx                 |   2 +
 docs/en/connector-v2/sink/Hive.md                  |  62 ++++++++
 docs/en/connector-v2/source/Hudi.md                |   2 +
 pom.xml                                            |   4 +-
 ...TextFileConfig.java => BaseTextFileConfig.java} |  10 +-
 .../seatunnel/file/sink/AbstractFileSink.java      |   3 +-
 .../file/sink/FileAggregatedCommitInfo.java        |   3 +
 .../seatunnel/file/sink/FileCommitInfo.java        |   3 +
 .../file/sink/FileSinkAggregatedCommitter.java     |   9 +-
 .../file/sink/FileSinkWriterWithTransaction.java   | 160 +++++++++++++++++++++
 .../file/sink/config/TextFileSinkConfig.java       |  10 +-
 .../file/sink/transaction/Transaction.java         |   5 +-
 .../writer/AbstractTransactionStateFileWriter.java |  14 +-
 .../writer/FileSinkPartitionDirNameGenerator.java  |  18 ++-
 .../sink/writer/PartitionDirNameGenerator.java     |   4 +-
 .../TestFileSinkPartitionDirNameGenerator.java     |   6 +-
 .../sink/hdfs/FileSinkAggregatedCommitterTest.java |  36 +++--
 .../TestHdfsTxtTransactionStateFileWriter.java     |  10 ++
 .../local/FileSinkAggregatedCommitterTest.java     |  39 +++--
 .../TestLocalTxtTransactionStateFileWriter.java    |  10 ++
 seatunnel-connectors-v2/connector-hive/pom.xml     |  22 ++-
 .../HiveSinkState.java => config/Constant.java}    |  14 +-
 .../hive/sink/HiveAggregatedCommitInfo.java        |  14 +-
 .../seatunnel/hive/sink/HiveCommitInfo.java        |  16 ++-
 .../connectors/seatunnel/hive/sink/HiveSink.java   |  56 ++++++--
 .../hive/sink/HiveSinkAggregatedCommitter.java     |  93 +++++++++---
 .../seatunnel/hive/sink/HiveSinkConfig.java        | 148 ++++++++++++-------
 .../seatunnel/hive/sink/HiveSinkState.java         |   3 +-
 .../seatunnel/hive/sink/HiveSinkWriter.java        | 137 ++++++++++++++----
 .../hive/sink/file/writer/AbstractFileWriter.java  | 155 --------------------
 .../hive/sink/file/writer/FileWriter.java          |  49 -------
 .../hive/sink/file/writer/HdfsTxtFileWriter.java   | 151 -------------------
 .../seatunnel/hive/sink/file/writer/HdfsUtils.java |  96 -------------
 .../seatunnel/hive/utils/HiveMetaStoreProxy.java   |  52 +++++++
 .../seatunnel/hive/sink/TestHiveSinkConfig.java    |  53 +++++++
 .../src/test/resources/fakesource_to_hive.conf     |  23 +--
 seatunnel-connectors-v2/connector-hudi/pom.xml     |   6 +-
 seatunnel-dependency-shade/pom.xml                 |  39 -----
 .../seatunnel-hive-shade/pom.xml                   |  69 ---------
 .../seatunnel-flink-connector-v2-example/pom.xml   |  11 ++
 .../resources/examples/fakesource_to_file.conf     |   6 +-
 41 files changed, 851 insertions(+), 772 deletions(-)

diff --git a/docs/en/connector-v2/sink/File.mdx 
b/docs/en/connector-v2/sink/File.mdx
index cf92d188b..7e2e3efd7 100644
--- a/docs/en/connector-v2/sink/File.mdx
+++ b/docs/en/connector-v2/sink/File.mdx
@@ -111,6 +111,8 @@ Streaming Job not support `overwrite`.
 </TabItem>
 <TabItem value="HdfsFile">
 
+In order to use this connector, You must ensure your spark/flink cluster 
already integrated hadoop. The tested hadoop version is 2.x.
+
 | name                              | type   | required | default value        
                                         |
 | --------------------------------- | ------ | -------- | 
------------------------------------------------------------- |
 | path                              | string | yes      | -                    
                                         |
diff --git a/docs/en/connector-v2/sink/Hive.md 
b/docs/en/connector-v2/sink/Hive.md
new file mode 100644
index 000000000..1794633f1
--- /dev/null
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -0,0 +1,62 @@
+# Hive
+
+## Description
+
+Write data to Hive.
+
+In order to use this connector, You must ensure your spark/flink cluster 
already integrated hive. The tested hive version is 2.3.9.
+
+## Options
+
+| name                              | type   | required | default value        
                                         |
+| --------------------------------- | ------ | -------- | 
------------------------------------------------------------- |
+| hive_table_name                   | string | yes      | -                    
                                         |
+| hive_metastore_uris               | string | yes      | -                    
                                         |
+| partition_by                      | array  | no       | -                    
                                         |
+| sink_columns                      | array  | no       | When this parameter 
is empty, all fields are sink columns     |
+| is_enable_transaction             | boolean| no       | true                 
                                         |
+| save_mode                         | string | no       | "append"             
                                         |
+
+### hive_table_name [string]
+
+Target Hive table name eg: db1.table1
+
+### hive_metastore_uris [string]
+
+Hive metastore uris
+
+### partition_by [array]
+
+Partition data based on selected fields
+
+### sink_columns [array]
+
+Which columns need be write to hive, default value is all of the columns get 
from `Transform` or `Source`.
+The order of the fields determines the order in which the file is actually 
written.
+
+### is_enable_transaction [boolean]
+
+If `is_enable_transaction` is true, we will ensure that data will not be lost 
or duplicated when it is written to the target directory.
+
+Only support `true` now.
+
+### save_mode [string]
+
+Storage mode, we need support `overwrite` and `append`. `append` is now 
supported.
+
+Streaming Job not support `overwrite`.
+
+## Example
+
+```bash
+
+Hive {
+    hive_table_name="db1.table1"
+    hive_metastore_uris="thrift://localhost:9083"
+    partition_by=["age"]
+    sink_columns=["name","age"]
+    is_enable_transaction=true
+    save_mode="append"
+}
+
+```
diff --git a/docs/en/connector-v2/source/Hudi.md 
b/docs/en/connector-v2/source/Hudi.md
index 4803b1f44..4851d61ef 100644
--- a/docs/en/connector-v2/source/Hudi.md
+++ b/docs/en/connector-v2/source/Hudi.md
@@ -4,6 +4,8 @@
 
 Used to read data from Hudi. Currently, only supports hudi cow table and 
Snapshot Query with Batch Mode.
 
+In order to use this connector, You must ensure your spark/flink cluster 
already integrated hive. The tested hive version is 2.3.9.
+
 ## Options
 
 | name                     | type    | required | default value |
diff --git a/pom.xml b/pom.xml
index fe73780d2..17aa94ed4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,7 +87,7 @@
         <module>seatunnel-plugin-discovery</module>
         <module>seatunnel-formats</module>
         <module>seatunnel-dist</module>
-        <module>seatunnel-server</module>
+           <module>seatunnel-server</module>
     </modules>
 
     <profiles>
@@ -97,7 +97,6 @@
                 <activeByDefault>true</activeByDefault>
             </activation>
             <modules>
-                <module>seatunnel-dependency-shade</module>
                 <module>seatunnel-connectors-v2</module>
                 <module>seatunnel-connectors-v2-dist</module>
                 <module>seatunnel-examples</module>
@@ -928,7 +927,6 @@
                 <artifactId>hibernate-validator</artifactId>
                 <version>${hibernate.validator.version}</version>
             </dependency>
-
         </dependencies>
     </dependencyManagement>
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/AbstractTextFileConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
similarity index 91%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/AbstractTextFileConfig.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
index 32672066d..d6fd26d1b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/AbstractTextFileConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
 import java.util.Locale;
 
 @Data
-public class AbstractTextFileConfig implements DelimiterConfig, 
CompressConfig, Serializable {
+public class BaseTextFileConfig implements DelimiterConfig, CompressConfig, 
Serializable {
     private static final long serialVersionUID = 1L;
 
     protected String compressCodec;
@@ -42,9 +42,7 @@ public class AbstractTextFileConfig implements 
DelimiterConfig, CompressConfig,
     protected String fileNameExpression;
     protected FileFormat fileFormat = FileFormat.TEXT;
 
-    public AbstractTextFileConfig(@NonNull Config config) {
-        checkNotNull(config.getString(Constant.PATH));
-
+    public BaseTextFileConfig(@NonNull Config config) {
         if (config.hasPath(Constant.COMPRESS_CODEC)) {
             throw new RuntimeException("compress not support now");
         }
@@ -60,6 +58,7 @@ public class AbstractTextFileConfig implements 
DelimiterConfig, CompressConfig,
         if (config.hasPath(Constant.PATH) && 
!StringUtils.isBlank(config.getString(Constant.PATH))) {
             this.path = config.getString(Constant.PATH);
         }
+        checkNotNull(path);
 
         if (config.hasPath(Constant.FILE_NAME_EXPRESSION) && 
!StringUtils.isBlank(config.getString(Constant.FILE_NAME_EXPRESSION))) {
             this.fileNameExpression = 
config.getString(Constant.FILE_NAME_EXPRESSION);
@@ -70,6 +69,5 @@ public class AbstractTextFileConfig implements 
DelimiterConfig, CompressConfig,
         }
     }
 
-    protected AbstractTextFileConfig() {
-    }
+    public BaseTextFileConfig() {}
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
index a296eea53..77b72f004 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/AbstractFileSink.java
@@ -41,7 +41,6 @@ import java.util.Optional;
 
 /**
  * Hive Sink implementation by using SeaTunnel sink API.
- * This class contains the method to create {@link 
TransactionStateFileSinkWriter} and {@link FileSinkAggregatedCommitter}.
  */
 public abstract class AbstractFileSink implements SeaTunnelSink<SeaTunnelRow, 
FileSinkState, FileCommitInfo, FileAggregatedCommitInfo> {
     private Config config;
@@ -92,7 +91,7 @@ public abstract class AbstractFileSink implements 
SeaTunnelSink<SeaTunnelRow, Fi
     @Override
     public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
restoreWriter(SinkWriter.Context context, List<FileSinkState> states) throws 
IOException {
         if (this.getSinkConfig().isEnableTransaction()) {
-            return new TransactionStateFileSinkWriter(seaTunnelRowTypeInfo,
+            return new FileSinkWriterWithTransaction(seaTunnelRowTypeInfo,
                 config,
                 context,
                 textFileSinkConfig,
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileAggregatedCommitInfo.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileAggregatedCommitInfo.java
index 1036c3a59..c847ff659 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileAggregatedCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileAggregatedCommitInfo.java
@@ -21,6 +21,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 
 @Data
@@ -33,4 +34,6 @@ public class FileAggregatedCommitInfo implements Serializable 
{
      * V is the target file path of the data file.
      */
     private Map<String, Map<String, String>> transactionMap;
+
+    private Map<String, List<String>> partitionDirAndValsMap;
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileCommitInfo.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileCommitInfo.java
index 689b85ebf..0fcb04a03 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileCommitInfo.java
@@ -21,6 +21,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 
 @Data
@@ -34,5 +35,7 @@ public class FileCommitInfo implements Serializable {
      */
     private Map<String, String> needMoveFiles;
 
+    private Map<String, List<String>> partitionDirAndValsMap;
+
     private String transactionDir;
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
index cc8ff2404..3c7c8cf9c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkAggregatedCommitter.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter;
 
 import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,6 +30,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class FileSinkAggregatedCommitter implements 
SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSinkAggregatedCommitter.class);
@@ -63,6 +65,7 @@ public class FileSinkAggregatedCommitter implements 
SinkAggregatedCommitter<File
             return null;
         }
         Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
+        Map<String, List<String>> partitionDirAndValsMap = new HashMap<>();
         commitInfos.stream().forEach(commitInfo -> {
             Map<String, String> needMoveFileMap = 
aggregateCommitInfo.get(commitInfo.getTransactionDir());
             if (needMoveFileMap == null) {
@@ -70,8 +73,12 @@ public class FileSinkAggregatedCommitter implements 
SinkAggregatedCommitter<File
                 aggregateCommitInfo.put(commitInfo.getTransactionDir(), 
needMoveFileMap);
             }
             needMoveFileMap.putAll(commitInfo.getNeedMoveFiles());
+            Set<Map.Entry<String, List<String>>> entries = 
commitInfo.getPartitionDirAndValsMap().entrySet();
+            if (!CollectionUtils.isEmpty(entries)) {
+                
partitionDirAndValsMap.putAll(commitInfo.getPartitionDirAndValsMap());
+            }
         });
-        return new FileAggregatedCommitInfo(aggregateCommitInfo);
+        return new FileAggregatedCommitInfo(aggregateCommitInfo, 
partitionDirAndValsMap);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
new file mode 100644
index 000000000..83e51d1bc
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkPartitionDirNameGenerator;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public class FileSinkWriterWithTransaction implements SinkWriter<SeaTunnelRow, 
FileCommitInfo, FileSinkState> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSinkWriterWithTransaction.class);
+
+    private SeaTunnelRowType seaTunnelRowTypeInfo;
+    private Config pluginConfig;
+    private Context context;
+    private String jobId;
+
+    private TransactionStateFileWriter fileWriter;
+
+    private TextFileSinkConfig textFileSinkConfig;
+
+    public FileSinkWriterWithTransaction(@NonNull SeaTunnelRowType 
seaTunnelRowTypeInfo,
+                                         @NonNull Config pluginConfig,
+                                         @NonNull SinkWriter.Context context,
+                                         @NonNull TextFileSinkConfig 
textFileSinkConfig,
+                                         @NonNull String jobId,
+                                         @NonNull SinkFileSystemPlugin 
sinkFileSystemPlugin) {
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+        this.pluginConfig = pluginConfig;
+        this.context = context;
+        this.jobId = jobId;
+        this.textFileSinkConfig = textFileSinkConfig;
+
+        Optional<TransactionStateFileWriter> transactionStateFileWriter = 
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
+            new FileSinkTransactionFileNameGenerator(
+                this.textFileSinkConfig.getFileFormat(),
+                this.textFileSinkConfig.getFileNameExpression(),
+                this.textFileSinkConfig.getFileNameTimeFormat()),
+            new FileSinkPartitionDirNameGenerator(
+                this.textFileSinkConfig.getPartitionFieldList(),
+                this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+                this.textFileSinkConfig.getPartitionDirExpression()),
+            this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+            this.textFileSinkConfig.getTmpPath(),
+            this.textFileSinkConfig.getPath(),
+            this.jobId,
+            this.context.getIndexOfSubtask(),
+            this.textFileSinkConfig.getFieldDelimiter(),
+            this.textFileSinkConfig.getRowDelimiter(),
+            sinkFileSystemPlugin.getFileSystem().get());
+
+        if (!transactionStateFileWriter.isPresent()) {
+            throw new RuntimeException("A TransactionStateFileWriter is need");
+        }
+
+        this.fileWriter = transactionStateFileWriter.get();
+
+        fileWriter.beginTransaction(1L);
+    }
+
+    public FileSinkWriterWithTransaction(@NonNull SeaTunnelRowType 
seaTunnelRowTypeInfo,
+                                         @NonNull Config pluginConfig,
+                                         @NonNull SinkWriter.Context context,
+                                         @NonNull TextFileSinkConfig 
textFileSinkConfig,
+                                         @NonNull String jobId,
+                                         @NonNull List<FileSinkState> 
fileSinkStates,
+                                         @NonNull SinkFileSystemPlugin 
sinkFileSystemPlugin) {
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+        this.pluginConfig = pluginConfig;
+        this.context = context;
+        this.jobId = jobId;
+
+        Optional<TransactionStateFileWriter> transactionStateFileWriter = 
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
+            new FileSinkTransactionFileNameGenerator(
+                this.textFileSinkConfig.getFileFormat(),
+                this.textFileSinkConfig.getFileNameExpression(),
+                this.textFileSinkConfig.getFileNameTimeFormat()),
+            new FileSinkPartitionDirNameGenerator(
+                this.textFileSinkConfig.getPartitionFieldList(),
+                this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+                this.textFileSinkConfig.getPartitionDirExpression()),
+            this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+            this.textFileSinkConfig.getTmpPath(),
+            this.textFileSinkConfig.getPath(),
+            this.jobId,
+            this.context.getIndexOfSubtask(),
+            this.textFileSinkConfig.getFieldDelimiter(),
+            this.textFileSinkConfig.getRowDelimiter(),
+            sinkFileSystemPlugin.getFileSystem().get());
+
+        if (!transactionStateFileWriter.isPresent()) {
+            throw new RuntimeException("A TransactionStateFileWriter is need");
+        }
+
+        this.fileWriter = transactionStateFileWriter.get();
+
+        // Rollback dirty transaction
+        if (fileSinkStates.size() > 0) {
+            List<String> transactionAfter = 
fileWriter.getTransactionAfter(fileSinkStates.get(0).getTransactionId());
+            fileWriter.abortTransactions(transactionAfter);
+        }
+        fileWriter.beginTransaction(fileSinkStates.get(0).getCheckpointId() + 
1);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        fileWriter.write(element);
+    }
+
+    @Override
+    public Optional<FileCommitInfo> prepareCommit() throws IOException {
+        return fileWriter.prepareCommit();
+    }
+
+    @Override
+    public void abortPrepare() {
+        fileWriter.abortTransaction();
+    }
+
+    @Override
+    public void close() throws IOException {
+        fileWriter.finishAndCloseWriteFile();
+    }
+
+    @Override
+    public List<FileSinkState> snapshotState(long checkpointId) throws 
IOException {
+        List<FileSinkState> fileSinkStates = 
fileWriter.snapshotState(checkpointId);
+        fileWriter.beginTransaction(checkpointId);
+        return fileSinkStates;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
index 63b747bf2..ce94ff847 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
@@ -19,10 +19,12 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.sink.config;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.config.AbstractTextFileConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseTextFileConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
 import org.apache.seatunnel.connectors.seatunnel.file.config.PartitionConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkPartitionDirNameGenerator;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -38,14 +40,14 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 @Data
-public class TextFileSinkConfig extends AbstractTextFileConfig implements 
PartitionConfig {
+public class TextFileSinkConfig extends BaseTextFileConfig implements 
PartitionConfig {
 
     private List<String> sinkColumnList;
 
     private List<String> partitionFieldList;
 
     /**
-     * default is ${k1}=${v1}/${k2}=${v2}/...
+     * default is ${k0}=${v0}/${k1}=${v1}/... {@link 
FileSinkPartitionDirNameGenerator#generatorPartitionDir(SeaTunnelRow)} ()}
      */
     private String partitionDirExpression;
 
@@ -69,7 +71,7 @@ public class TextFileSinkConfig extends 
AbstractTextFileConfig implements Partit
         super(config);
         
checkArgument(!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
 
-        if (config.hasPath(Constant.FILE_FORMAT) && 
!CollectionUtils.isEmpty(config.getStringList(Constant.SINK_COLUMNS))) {
+        if (config.hasPath(Constant.SINK_COLUMNS) && 
!CollectionUtils.isEmpty(config.getStringList(Constant.SINK_COLUMNS))) {
             this.sinkColumnList = config.getStringList(Constant.SINK_COLUMNS);
         }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/transaction/Transaction.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/transaction/Transaction.java
index dd8d41bf6..d9a39c5df 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/transaction/Transaction.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/transaction/Transaction.java
@@ -22,7 +22,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.AbstractFileSink;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.FileSinkAggregatedCommitter;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.FileSinkState;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.TransactionStateFileSinkWriter;
 
 import lombok.NonNull;
 
@@ -40,7 +39,7 @@ public interface Transaction extends Serializable {
     String beginTransaction(@NonNull Long checkpointId);
 
     /**
-     * Abort current Transaction, called when {@link 
TransactionStateFileSinkWriter#prepareCommit()} or {@link 
TransactionStateFileSinkWriter#snapshotState(long)} failed
+     * Abort current Transaction, called when {@link 
org.apache.seatunnel.connectors.seatunnel.file.sink.TransactionStateFileSinkWriter#prepareCommit()}
 or {@link 
org.apache.seatunnel.connectors.seatunnel.file.sink.TransactionStateFileSinkWriter#snapshotState(long)}
 failed
      */
     void abortTransaction();
 
@@ -56,7 +55,7 @@ public interface Transaction extends Serializable {
     List<String> getTransactionAfter(@NonNull String transactionId);
 
     /**
-     * Called by {@link TransactionStateFileSinkWriter#prepareCommit()}
+     * Called by {@link 
org.apache.seatunnel.connectors.seatunnel.file.sink.TransactionStateFileSinkWriter#prepareCommit()}
      * We should end the transaction in this method. After this method is 
called, the transaction will no longer accept data writing
      *
      * @return Return the commit information that can be commit in {@link 
FileSinkAggregatedCommitter#commit(List)}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractTransactionStateFileWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractTransactionStateFileWriter.java
index 784d87046..b14827b56 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractTransactionStateFileWriter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractTransactionStateFileWriter.java
@@ -66,6 +66,8 @@ public abstract class AbstractTransactionStateFileWriter 
implements TransactionS
 
     private FileSystem fileSystem;
 
+    private Map<String, List<String>> partitionDirAndValsMap;
+
     public AbstractTransactionStateFileWriter(@NonNull SeaTunnelRowType 
seaTunnelRowTypeInfo,
                                               @NonNull 
TransactionFileNameGenerator transactionFileNameGenerator,
                                               @NonNull 
PartitionDirNameGenerator partitionDirNameGenerator,
@@ -89,7 +91,8 @@ public abstract class AbstractTransactionStateFileWriter 
implements TransactionS
     }
 
     public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow 
seaTunnelRow) {
-        String beingWrittenFileKey = 
this.partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow);
+        Map<String, List<String>> dataPartitionDirAndValsMap = 
this.partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow);
+        String beingWrittenFileKey = 
dataPartitionDirAndValsMap.keySet().toArray()[0].toString();
         // get filePath from beingWrittenFile
         String beingWrittenFilePath = 
beingWrittenFile.get(beingWrittenFileKey);
         if (beingWrittenFilePath != null) {
@@ -99,6 +102,9 @@ public abstract class AbstractTransactionStateFileWriter 
implements TransactionS
             
sbf.append("/").append(beingWrittenFileKey).append("/").append(transactionFileNameGenerator.generateFileName(this.transactionId));
             String newBeingWrittenFilePath = sbf.toString();
             beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
+            if 
(!Constant.NON_PARTITION.equals(dataPartitionDirAndValsMap.keySet().toArray()[0].toString())){
+                partitionDirAndValsMap.putAll(dataPartitionDirAndValsMap);
+            }
             return newBeingWrittenFilePath;
         }
     }
@@ -114,6 +120,7 @@ public abstract class AbstractTransactionStateFileWriter 
implements TransactionS
         this.transactionId = "T" + Constant.TRANSACTION_ID_SPLIT + jobId + 
Constant.TRANSACTION_ID_SPLIT + subTaskIndex + Constant.TRANSACTION_ID_SPLIT + 
checkpointId;
         this.transactionDir = getTransactionDir(this.transactionId);
         this.needMoveFiles = new HashMap<>();
+        this.partitionDirAndValsMap = new HashMap<>();
         this.beingWrittenFile = new HashMap<>();
         this.beginTransaction(this.transactionId);
         this.checkpointId = checkpointId;
@@ -164,7 +171,10 @@ public abstract class AbstractTransactionStateFileWriter 
implements TransactionS
         // this.needMoveFiles will be clear when beginTransaction, so we need 
copy the needMoveFiles.
         Map<String, String> commitMap = new HashMap<>();
         commitMap.putAll(this.needMoveFiles);
-        return Optional.of(new FileCommitInfo(commitMap, this.transactionDir));
+
+        Map<String, List<String>> copyMap = 
this.partitionDirAndValsMap.entrySet().stream()
+            .collect(Collectors.toMap(e -> e.getKey(), e -> new 
ArrayList<String>(e.getValue())));
+        return Optional.of(new FileCommitInfo(commitMap, copyMap, 
this.transactionDir));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkPartitionDirNameGenerator.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkPartitionDirNameGenerator.java
index aa8a3656b..a9175409f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkPartitionDirNameGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/FileSinkPartitionDirNameGenerator.java
@@ -25,6 +25,7 @@ import lombok.Data;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,11 +60,15 @@ public class FileSinkPartitionDirNameGenerator implements 
PartitionDirNameGenera
     }
 
     @Override
-    public String generatorPartitionDir(SeaTunnelRow seaTunnelRow) {
+    public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow 
seaTunnelRow) {
+        Map<String, List<String>> partitionDirAndValsMap = new HashMap<>(1);
         if (CollectionUtils.isEmpty(this.partitionFieldsIndexInRow)) {
-            return Constant.NON_PARTITION;
+            partitionDirAndValsMap.put(Constant.NON_PARTITION, null);
+            return partitionDirAndValsMap;
         }
 
+        List<String> vals = new ArrayList<>(partitionFieldsIndexInRow.size());
+        String partitionDir;
         if (StringUtils.isBlank(partitionDirExpression)) {
             StringBuilder sbd = new StringBuilder();
             for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
@@ -71,15 +76,20 @@ public class FileSinkPartitionDirNameGenerator implements 
PartitionDirNameGenera
                     .append("=")
                     
.append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)])
                     .append("/");
+                
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
             }
-            return sbd.toString();
+            partitionDir = sbd.toString();
         } else {
             Map<String, String> valueMap = new 
HashMap<>(partitionFieldList.size() * 2);
             for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
                 valueMap.put(keys[i], partitionFieldList.get(i));
                 valueMap.put(values[i], 
seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+                
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
             }
-            return VariablesSubstitute.substitute(partitionDirExpression, 
valueMap);
+            partitionDir = 
VariablesSubstitute.substitute(partitionDirExpression, valueMap);
         }
+
+        partitionDirAndValsMap.put(partitionDir, vals);
+        return partitionDirAndValsMap;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/PartitionDirNameGenerator.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/PartitionDirNameGenerator.java
index 1145e847d..05c90256b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/PartitionDirNameGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/PartitionDirNameGenerator.java
@@ -20,7 +20,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
 
 public interface PartitionDirNameGenerator extends Serializable {
-    String generatorPartitionDir(SeaTunnelRow seaTunnelRow);
+    Map<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow);
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/TestFileSinkPartitionDirNameGenerator.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/TestFileSinkPartitionDirNameGenerator.java
index 1989275f4..0867f104a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/TestFileSinkPartitionDirNameGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/TestFileSinkPartitionDirNameGenerator.java
@@ -55,15 +55,15 @@ public class TestFileSinkPartitionDirNameGenerator {
         partitionFieldsIndexInRow.add(3);
 
         PartitionDirNameGenerator partitionDirNameGenerator = new 
FileSinkPartitionDirNameGenerator(partitionFieldList, 
partitionFieldsIndexInRow, "${v0}/${v1}");
-        String partitionDir = 
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow);
+        String partitionDir = 
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow).keySet().toArray()[0].toString();
         Assert.assertEquals("test/3", partitionDir);
 
         partitionDirNameGenerator = new 
FileSinkPartitionDirNameGenerator(partitionFieldList, 
partitionFieldsIndexInRow, "${k0}=${v0}/${k1}=${v1}");
-        partitionDir = 
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow);
+        partitionDir = 
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow).keySet().toArray()[0].toString();
         Assert.assertEquals("c3=test/c4=3", partitionDir);
 
         partitionDirNameGenerator = new 
FileSinkPartitionDirNameGenerator(null, null, "${k0}=${v0}/${k1}=${v1}");
-        partitionDir = 
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow);
+        partitionDir = 
partitionDirNameGenerator.generatorPartitionDir(seaTunnelRow).keySet().toArray()[0].toString();
         Assert.assertEquals(Constant.NON_PARTITION, partitionDir);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/FileSinkAggregatedCommitterTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/FileSinkAggregatedCommitterTest.java
index 79c54fcc9..7554cee09 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/FileSinkAggregatedCommitterTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/FileSinkAggregatedCommitterTest.java
@@ -25,12 +25,15 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.stream.Collectors;
 
 public class FileSinkAggregatedCommitterTest {
+    @SuppressWarnings("checkstyle:UnnecessaryParentheses")
     @Test
     public void testCommit() throws Exception {
         FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new 
FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
@@ -46,7 +49,12 @@ public class FileSinkAggregatedCommitterTest {
         HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
 
         transactionFiles.put(transactionDir, needMoveFiles);
-        FileAggregatedCommitInfo fileAggregatedCommitInfo = new 
FileAggregatedCommitInfo(transactionFiles);
+
+        Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+        partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new 
String[]{"4", "rrr"})).collect(Collectors.toList()));
+        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new 
String[]{"4", "bbb"})).collect(Collectors.toList()));
+
+        FileAggregatedCommitInfo fileAggregatedCommitInfo = new 
FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
         List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new 
ArrayList<>();
         fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
         fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
@@ -56,7 +64,7 @@ public class FileSinkAggregatedCommitterTest {
         Assert.assertTrue(!HdfsUtils.fileExist(transactionDir));
     }
 
-    @SuppressWarnings("checkstyle:MagicNumber")
+    @SuppressWarnings({"checkstyle:MagicNumber", 
"checkstyle:UnnecessaryParentheses"})
     @Test
     public void testCombine() throws Exception {
         FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new 
FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
@@ -66,28 +74,37 @@ public class FileSinkAggregatedCommitterTest {
         String transactionDir = 
String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId);
         String targetDir = String.format("/tmp/hive/warehouse/%s", jobId);
         Map<String, String> needMoveFiles = new HashMap<>();
-        needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir 
+ "/c3=4/c4=rrr/test1.txt");
+        needMoveFiles.put(transactionDir + "/c3=3/c4=rrr/test1.txt", targetDir 
+ "/c3=3/c4=rrr/test1.txt");
         needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir 
+ "/c3=4/c4=bbb/test1.txt");
-        HdfsUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
+        Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+        partitionDirAndVals.put("/c3=3/c4=rrr", Arrays.stream((new 
String[]{"3", "rrr"})).collect(Collectors.toList()));
+        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new 
String[]{"4", "bbb"})).collect(Collectors.toList()));
+        FileCommitInfo fileCommitInfo = new FileCommitInfo(needMoveFiles, 
partitionDirAndVals, transactionDir);
+        HdfsUtils.createFile(transactionDir + "/c3=3/c4=rrr/test1.txt");
         HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
 
         Map<String, String> needMoveFiles1 = new HashMap<>();
         needMoveFiles1.put(transactionDir + "/c3=4/c4=rrr/test2.txt", 
targetDir + "/c3=4/c4=rrr/test2.txt");
         needMoveFiles1.put(transactionDir + "/c3=4/c4=bbb/test2.txt", 
targetDir + "/c3=4/c4=bbb/test2.txt");
-        FileCommitInfo fileCommitInfo = new FileCommitInfo(needMoveFiles, 
transactionDir);
-        FileCommitInfo fileCommitInfo1 = new FileCommitInfo(needMoveFiles1, 
transactionDir);
+        Map<String, List<String>> partitionDirAndVals1 = new HashMap<>();
+        partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new 
String[]{"4", "rrr"})).collect(Collectors.toList()));
+        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new 
String[]{"4", "bbb"})).collect(Collectors.toList()));
+        FileCommitInfo fileCommitInfo1 = new FileCommitInfo(needMoveFiles1, 
partitionDirAndVals1, transactionDir);
         List<FileCommitInfo> fileCommitInfoList = new ArrayList<>();
         fileCommitInfoList.add(fileCommitInfo);
         fileCommitInfoList.add(fileCommitInfo1);
+
         FileAggregatedCommitInfo combine = 
fileSinkAggregatedCommitter.combine(fileCommitInfoList);
         Assert.assertEquals(1, combine.getTransactionMap().size());
         Assert.assertEquals(4, 
combine.getTransactionMap().get(transactionDir).size());
-        Assert.assertEquals(targetDir + "/c3=4/c4=rrr/test1.txt", 
combine.getTransactionMap().get(transactionDir).get(transactionDir + 
"/c3=4/c4=rrr/test1.txt"));
+        Assert.assertEquals(targetDir + "/c3=3/c4=rrr/test1.txt", 
combine.getTransactionMap().get(transactionDir).get(transactionDir + 
"/c3=3/c4=rrr/test1.txt"));
         Assert.assertEquals(targetDir + "/c3=4/c4=bbb/test1.txt", 
combine.getTransactionMap().get(transactionDir).get(transactionDir + 
"/c3=4/c4=bbb/test1.txt"));
         Assert.assertEquals(targetDir + "/c3=4/c4=rrr/test2.txt", 
combine.getTransactionMap().get(transactionDir).get(transactionDir + 
"/c3=4/c4=rrr/test2.txt"));
         Assert.assertEquals(targetDir + "/c3=4/c4=bbb/test2.txt", 
combine.getTransactionMap().get(transactionDir).get(transactionDir + 
"/c3=4/c4=bbb/test2.txt"));
+        Assert.assertEquals(3, 
combine.getPartitionDirAndValsMap().keySet().size());
     }
 
+    @SuppressWarnings("checkstyle:UnnecessaryParentheses")
     @Test
     public void testAbort() throws Exception {
         FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new 
FileSinkAggregatedCommitter(new HdfsFileSystemCommitter());
@@ -99,11 +116,14 @@ public class FileSinkAggregatedCommitterTest {
         Map<String, String> needMoveFiles = new HashMap<>();
         needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir 
+ "/c3=4/c4=rrr/test1.txt");
         needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir 
+ "/c3=4/c4=bbb/test1.txt");
+        Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+        partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new 
String[]{"4", "rrr"})).collect(Collectors.toList()));
+        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new 
String[]{"4", "bbb"})).collect(Collectors.toList()));
         HdfsUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
         HdfsUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
 
         transactionFiles.put(transactionDir, needMoveFiles);
-        FileAggregatedCommitInfo fileAggregatedCommitInfo = new 
FileAggregatedCommitInfo(transactionFiles);
+        FileAggregatedCommitInfo fileAggregatedCommitInfo = new 
FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
         List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new 
ArrayList<>();
         fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
         fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/TestHdfsTxtTransactionStateFileWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/TestHdfsTxtTransactionStateFileWriter.java
index f3e1847f4..d8633e86d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/TestHdfsTxtTransactionStateFileWriter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/hdfs/TestHdfsTxtTransactionStateFileWriter.java
@@ -93,5 +93,15 @@ public class TestHdfsTxtTransactionStateFileWriter {
         Map<String, String> needMoveFiles = fileCommitInfo.getNeedMoveFiles();
         Assert.assertEquals(targetPath + "/c3=str1/c4=str2/" + transactionId + 
".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str2/" + transactionId 
+ ".txt"));
         Assert.assertEquals(targetPath + "/c3=str1/c4=str3/" + transactionId + 
".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str3/" + transactionId 
+ ".txt"));
+
+        Map<String, List<String>> partitionDirAndValsMap = 
fileCommitInfo.getPartitionDirAndValsMap();
+        Assert.assertEquals(2, partitionDirAndValsMap.size());
+        
Assert.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str2"));
+        
Assert.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str3"));
+        Assert.assertTrue(partitionDirAndValsMap.get("c3=str1/c4=str2").size() 
== 2);
+        Assert.assertEquals("str1", 
partitionDirAndValsMap.get("c3=str1/c4=str2").get(0));
+        Assert.assertEquals("str2", 
partitionDirAndValsMap.get("c3=str1/c4=str2").get(1));
+        Assert.assertEquals("str1", 
partitionDirAndValsMap.get("c3=str1/c4=str3").get(0));
+        Assert.assertEquals("str3", 
partitionDirAndValsMap.get("c3=str1/c4=str3").get(1));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/FileSinkAggregatedCommitterTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/FileSinkAggregatedCommitterTest.java
index 89524aa5f..6b82041b1 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/FileSinkAggregatedCommitterTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/FileSinkAggregatedCommitterTest.java
@@ -25,13 +25,15 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.stream.Collectors;
 
 public class FileSinkAggregatedCommitterTest {
-    @Test
+    @SuppressWarnings("checkstyle:UnnecessaryParentheses")
     public void testCommit() throws Exception {
         FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new 
FileSinkAggregatedCommitter(new LocalFileSystemCommitter());
         Map<String, Map<String, String>> transactionFiles = new HashMap<>();
@@ -47,7 +49,12 @@ public class FileSinkAggregatedCommitterTest {
         FileUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
 
         transactionFiles.put(transactionDir, needMoveFiles);
-        FileAggregatedCommitInfo fileAggregatedCommitInfo = new 
FileAggregatedCommitInfo(transactionFiles);
+
+        Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+        partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new 
String[]{"4", "rrr"})).collect(Collectors.toList()));
+        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new 
String[]{"4", "bbb"})).collect(Collectors.toList()));
+
+        FileAggregatedCommitInfo fileAggregatedCommitInfo = new 
FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
         List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new 
ArrayList<>();
         fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
         fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
@@ -57,7 +64,7 @@ public class FileSinkAggregatedCommitterTest {
         Assert.assertTrue(!FileUtils.fileExist(transactionDir));
     }
 
-    @SuppressWarnings("checkstyle:MagicNumber")
+    @SuppressWarnings({"checkstyle:UnnecessaryParentheses", 
"checkstyle:MagicNumber"})
     @Test
     public void testCombine() throws Exception {
         FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new 
FileSinkAggregatedCommitter(new LocalFileSystemCommitter());
@@ -68,28 +75,37 @@ public class FileSinkAggregatedCommitterTest {
         String transactionDir = 
String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId);
         String targetDir = String.format("/tmp/hive/warehouse/%s", jobId);
         Map<String, String> needMoveFiles = new HashMap<>();
-        needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir 
+ "/c3=4/c4=rrr/test1.txt");
+        needMoveFiles.put(transactionDir + "/c3=3/c4=rrr/test1.txt", targetDir 
+ "/c3=3/c4=rrr/test1.txt");
         needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir 
+ "/c3=4/c4=bbb/test1.txt");
-        FileUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
+        Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+        partitionDirAndVals.put("/c3=3/c4=rrr", Arrays.stream((new 
String[]{"3", "rrr"})).collect(Collectors.toList()));
+        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new 
String[]{"4", "bbb"})).collect(Collectors.toList()));
+        FileCommitInfo fileCommitInfo = new FileCommitInfo(needMoveFiles, 
partitionDirAndVals, transactionDir);
+        FileUtils.createFile(transactionDir + "/c3=3/c4=rrr/test1.txt");
         FileUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
 
         Map<String, String> needMoveFiles1 = new HashMap<>();
         needMoveFiles1.put(transactionDir + "/c3=4/c4=rrr/test2.txt", 
targetDir + "/c3=4/c4=rrr/test2.txt");
         needMoveFiles1.put(transactionDir + "/c3=4/c4=bbb/test2.txt", 
targetDir + "/c3=4/c4=bbb/test2.txt");
-        FileCommitInfo fileCommitInfo = new FileCommitInfo(needMoveFiles, 
transactionDir);
-        FileCommitInfo fileCommitInfo1 = new FileCommitInfo(needMoveFiles1, 
transactionDir);
+        Map<String, List<String>> partitionDirAndVals1 = new HashMap<>();
+        partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new 
String[]{"4", "rrr"})).collect(Collectors.toList()));
+        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new 
String[]{"4", "bbb"})).collect(Collectors.toList()));
+        FileCommitInfo fileCommitInfo1 = new FileCommitInfo(needMoveFiles1, 
partitionDirAndVals1, transactionDir);
         List<FileCommitInfo> fileCommitInfoList = new ArrayList<>();
         fileCommitInfoList.add(fileCommitInfo);
         fileCommitInfoList.add(fileCommitInfo1);
+
         FileAggregatedCommitInfo combine = 
fileSinkAggregatedCommitter.combine(fileCommitInfoList);
         Assert.assertEquals(1, combine.getTransactionMap().size());
         Assert.assertEquals(4, 
combine.getTransactionMap().get(transactionDir).size());
-        Assert.assertEquals(targetDir + "/c3=4/c4=rrr/test1.txt", 
combine.getTransactionMap().get(transactionDir).get(transactionDir + 
"/c3=4/c4=rrr/test1.txt"));
+        Assert.assertEquals(targetDir + "/c3=3/c4=rrr/test1.txt", 
combine.getTransactionMap().get(transactionDir).get(transactionDir + 
"/c3=3/c4=rrr/test1.txt"));
         Assert.assertEquals(targetDir + "/c3=4/c4=bbb/test1.txt", 
combine.getTransactionMap().get(transactionDir).get(transactionDir + 
"/c3=4/c4=bbb/test1.txt"));
         Assert.assertEquals(targetDir + "/c3=4/c4=rrr/test2.txt", 
combine.getTransactionMap().get(transactionDir).get(transactionDir + 
"/c3=4/c4=rrr/test2.txt"));
         Assert.assertEquals(targetDir + "/c3=4/c4=bbb/test2.txt", 
combine.getTransactionMap().get(transactionDir).get(transactionDir + 
"/c3=4/c4=bbb/test2.txt"));
+        Assert.assertEquals(3, 
combine.getPartitionDirAndValsMap().keySet().size());
     }
 
+    @SuppressWarnings("checkstyle:UnnecessaryParentheses")
     @Test
     public void testAbort() throws Exception {
         FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new 
FileSinkAggregatedCommitter(new LocalFileSystemCommitter());
@@ -102,18 +118,21 @@ public class FileSinkAggregatedCommitterTest {
         Map<String, String> needMoveFiles = new HashMap<>();
         needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir 
+ "/c3=4/c4=rrr/test1.txt");
         needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir 
+ "/c3=4/c4=bbb/test1.txt");
+        Map<String, List<String>> partitionDirAndVals = new HashMap<>();
+        partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new 
String[]{"4", "rrr"})).collect(Collectors.toList()));
+        partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new 
String[]{"4", "bbb"})).collect(Collectors.toList()));
         FileUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt");
         FileUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt");
 
         transactionFiles.put(transactionDir, needMoveFiles);
-        FileAggregatedCommitInfo fileAggregatedCommitInfo = new 
FileAggregatedCommitInfo(transactionFiles);
+        FileAggregatedCommitInfo fileAggregatedCommitInfo = new 
FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals);
         List<FileAggregatedCommitInfo> fileAggregatedCommitInfoList = new 
ArrayList<>();
         fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo);
         fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList);
 
         Assert.assertTrue(FileUtils.fileExist(targetDir + 
"/c3=4/c4=bbb/test1.txt"));
         Assert.assertTrue(FileUtils.fileExist(targetDir + 
"/c3=4/c4=rrr/test1.txt"));
-        Assert.assertFalse(FileUtils.fileExist(transactionDir));
+        Assert.assertTrue(!FileUtils.fileExist(transactionDir));
 
         fileSinkAggregatedCommitter.abort(fileAggregatedCommitInfoList);
         Assert.assertTrue(!FileUtils.fileExist(targetDir + 
"/c3=4/c4=bbb/test1.txt"));
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/TestLocalTxtTransactionStateFileWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/TestLocalTxtTransactionStateFileWriter.java
index 007ea39ef..d739ac41c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/TestLocalTxtTransactionStateFileWriter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sink/local/TestLocalTxtTransactionStateFileWriter.java
@@ -93,5 +93,15 @@ public class TestLocalTxtTransactionStateFileWriter {
         Map<String, String> needMoveFiles = fileCommitInfo.getNeedMoveFiles();
         Assert.assertEquals(targetPath + "/c3=str1/c4=str2/" + transactionId + 
".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str2/" + transactionId 
+ ".txt"));
         Assert.assertEquals(targetPath + "/c3=str1/c4=str3/" + transactionId + 
".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str3/" + transactionId 
+ ".txt"));
+
+        Map<String, List<String>> partitionDirAndValsMap = 
fileCommitInfo.getPartitionDirAndValsMap();
+        Assert.assertEquals(2, partitionDirAndValsMap.size());
+        
Assert.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str2"));
+        
Assert.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str3"));
+        Assert.assertTrue(partitionDirAndValsMap.get("c3=str1/c4=str2").size() 
== 2);
+        Assert.assertEquals("str1", 
partitionDirAndValsMap.get("c3=str1/c4=str2").get(0));
+        Assert.assertEquals("str2", 
partitionDirAndValsMap.get("c3=str1/c4=str2").get(1));
+        Assert.assertEquals("str1", 
partitionDirAndValsMap.get("c3=str1/c4=str3").get(0));
+        Assert.assertEquals("str3", 
partitionDirAndValsMap.get("c3=str1/c4=str3").get(1));
     }
 }
diff --git a/seatunnel-connectors-v2/connector-hive/pom.xml 
b/seatunnel-connectors-v2/connector-hive/pom.xml
index 62a2eed1f..342a01792 100644
--- a/seatunnel-connectors-v2/connector-hive/pom.xml
+++ b/seatunnel-connectors-v2/connector-hive/pom.xml
@@ -31,9 +31,9 @@
 
     <dependencies>
         <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-hive-shade</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <scope>provided</scope>
         </dependency>
         
         <dependency>
@@ -80,9 +80,25 @@
             <artifactId>commons-collections4</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-hadoop</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
similarity index 75%
copy from 
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
copy to 
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
index 4f9f5d12e..dd0b2ab54 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
@@ -15,15 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-@AllArgsConstructor
-public class HiveSinkState implements Serializable {
-    private HiveSinkConfig hiveSinkConfig;
+public class Constant {
+    public static final String HIVE_RESULT_TABLE_NAME = "hive_table_name";
+    public static final String HIVE_METASTORE_URIS = "hive_metastore_uris";
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
index 025fbefbf..6259389c4 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
@@ -17,20 +17,18 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import org.apache.hadoop.hive.metastore.api.Table;
 
 import java.io.Serializable;
-import java.util.Map;
 
 @Data
 @AllArgsConstructor
 public class HiveAggregatedCommitInfo implements Serializable {
-
-    /**
-     * Storage the commit info in map.
-     * K is the file path need to be moved to hive data dir.
-     * V is the target file path of the data file.
-     */
-    private Map<String, String> needMoveFiles;
+    private FileAggregatedCommitInfo fileAggregatedCommitInfo;
+    private String hiveMetastoreUris;
+    private Table table;
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
index 0dd58f8f4..002beea32 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
@@ -17,20 +17,22 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
+import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import org.apache.hadoop.hive.metastore.api.Table;
 
 import java.io.Serializable;
-import java.util.Map;
 
 @Data
 @AllArgsConstructor
 public class HiveCommitInfo implements Serializable {
 
-    /**
-     * Storage the commit info in map.
-     * K is the file path need to be moved to hive data dir.
-     * V is the target file path of the data file.
-     */
-    private Map<String, String> needMoveFiles;
+    private FileCommitInfo fileCommitInfo;
+
+    private String hiveMetastoreUris;
+
+    private Table table;
+
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index f1ba12edd..4df91b1a5 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -26,6 +27,8 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -43,8 +46,11 @@ import java.util.Optional;
 public class HiveSink implements SeaTunnelSink<SeaTunnelRow, HiveSinkState, 
HiveCommitInfo, HiveAggregatedCommitInfo> {
 
     private Config config;
-    private long jobId;
-    private SeaTunnelRowType seaTunnelRowType;
+    private String jobId;
+    private Long checkpointId;
+    private SeaTunnelRowType seaTunnelRowTypeInfo;
+    private SeaTunnelContext seaTunnelContext;
+    private HiveSinkConfig hiveSinkConfig;
 
     @Override
     public String getPluginName() {
@@ -52,34 +58,47 @@ public class HiveSink implements 
SeaTunnelSink<SeaTunnelRow, HiveSinkState, Hive
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowTypeInfo) {
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+        this.hiveSinkConfig = new HiveSinkConfig(config, seaTunnelRowTypeInfo);
     }
 
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
+        return this.seaTunnelRowTypeInfo;
     }
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         this.config = pluginConfig;
-        this.jobId = System.currentTimeMillis();
+        this.checkpointId = 1L;
     }
 
     @Override
     public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState> 
createWriter(SinkWriter.Context context) throws IOException {
-        return new HiveSinkWriter(seaTunnelRowType, config, context, 
System.currentTimeMillis());
+        if (!seaTunnelContext.getJobMode().equals(JobMode.BATCH) && 
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
 {
+            throw new RuntimeException("only batch job can overwrite hive 
table");
+        }
+
+        if 
(!this.getSinkConfig().getTextFileSinkConfig().isEnableTransaction()) {
+            throw new RuntimeException("Hive Sink Connector only support 
transaction now");
+        }
+        return new HiveSinkWriter(seaTunnelRowTypeInfo,
+            config,
+            context,
+            getSinkConfig(),
+            jobId);
     }
 
     @Override
     public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState> 
restoreWriter(SinkWriter.Context context, List<HiveSinkState> states) throws 
IOException {
-        return new HiveSinkWriter(seaTunnelRowType, config, context, 
System.currentTimeMillis());
+        return new HiveSinkWriter(seaTunnelRowTypeInfo, config, context, 
hiveSinkConfig, jobId, states);
     }
 
     @Override
-    public Optional<Serializer<HiveCommitInfo>> getCommitInfoSerializer() {
-        return Optional.of(new DefaultSerializer<>());
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+        this.jobId = seaTunnelContext.getJobId();
     }
 
     @Override
@@ -87,8 +106,25 @@ public class HiveSink implements 
SeaTunnelSink<SeaTunnelRow, HiveSinkState, Hive
         return Optional.of(new HiveSinkAggregatedCommitter());
     }
 
+    @Override
+    public Optional<Serializer<HiveSinkState>> getWriterStateSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
     @Override
     public Optional<Serializer<HiveAggregatedCommitInfo>> 
getAggregatedCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<HiveAggregatedCommitInfo>());
+    }
+
+    @Override
+    public Optional<Serializer<HiveCommitInfo>> getCommitInfoSerializer() {
         return Optional.of(new DefaultSerializer<>());
     }
+
+    private HiveSinkConfig getSinkConfig() {
+        if (this.hiveSinkConfig == null && (this.seaTunnelRowTypeInfo != null 
&& this.config != null)) {
+            this.hiveSinkConfig = new HiveSinkConfig(config, 
seaTunnelRowTypeInfo);
+        }
+        return this.hiveSinkConfig;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
index 673923a51..3a0448179 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
@@ -18,8 +18,14 @@
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.HdfsUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs.HdfsUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
 
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,41 +34,82 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class HiveSinkAggregatedCommitter implements 
SinkAggregatedCommitter<HiveCommitInfo, HiveAggregatedCommitInfo> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HiveSinkAggregatedCommitter.class);
 
     @Override
     public List<HiveAggregatedCommitInfo> 
commit(List<HiveAggregatedCommitInfo> aggregatedCommitInfoList) throws 
IOException {
-        if (aggregatedCommitInfoList == null || 
aggregatedCommitInfoList.size() == 0) {
+        LOGGER.info("=============================agg 
commit=================================");
+        if (CollectionUtils.isEmpty(aggregatedCommitInfoList)) {
             return null;
         }
         List errorAggregatedCommitInfoList = new ArrayList();
-        aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
-            try {
-                Map<String, String> needMoveFiles = 
aggregateCommitInfo.getNeedMoveFiles();
-                for (Map.Entry<String, String> entry : 
needMoveFiles.entrySet()) {
-                    HdfsUtils.renameFile(entry.getKey(), entry.getValue(), 
true);
+        HiveMetaStoreProxy hiveMetaStoreProxy = new 
HiveMetaStoreProxy(aggregatedCommitInfoList.get(0).getHiveMetastoreUris());
+        HiveMetaStoreClient hiveMetaStoreClient = 
hiveMetaStoreProxy.getHiveMetaStoreClient();
+        try {
+            aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
+                try {
+                    for (Map.Entry<String, Map<String, String>> entry : 
aggregateCommitInfo.getFileAggregatedCommitInfo().getTransactionMap().entrySet())
 {
+                        // rollback the file
+                        for (Map.Entry<String, String> mvFileEntry : 
entry.getValue().entrySet()) {
+                            HdfsUtils.renameFile(mvFileEntry.getKey(), 
mvFileEntry.getValue(), true);
+                        }
+                        // delete the transaction dir
+                        HdfsUtils.deleteFile(entry.getKey());
+                    }
+                    // add hive partition
+                    
aggregateCommitInfo.getFileAggregatedCommitInfo().getPartitionDirAndValsMap().entrySet().forEach(entry
 -> {
+                        Partition part = new Partition();
+                        
part.setDbName(aggregateCommitInfo.getTable().getDbName());
+                        
part.setTableName(aggregateCommitInfo.getTable().getTableName());
+                        part.setValues(entry.getValue());
+                        part.setParameters(new HashMap<>());
+                        
part.setSd(aggregateCommitInfo.getTable().getSd().deepCopy());
+                        
part.getSd().setSerdeInfo(aggregateCommitInfo.getTable().getSd().getSerdeInfo());
+                        
part.getSd().setLocation(aggregateCommitInfo.getTable().getSd().getLocation() + 
"/" + entry.getKey());
+                        try {
+                            hiveMetaStoreClient.add_partition(part);
+                        } catch (TException e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+                } catch (Exception e) {
+                    LOGGER.error("commit aggregateCommitInfo error ", e);
+                    errorAggregatedCommitInfoList.add(aggregateCommitInfo);
                 }
-            } catch (IOException e) {
-                LOGGER.error("commit aggregateCommitInfo error ", e);
-                errorAggregatedCommitInfoList.add(aggregateCommitInfo);
-            }
-        });
+            });
+        } finally {
+            hiveMetaStoreClient.close();
+        }
 
         return errorAggregatedCommitInfoList;
     }
 
     @Override
     public HiveAggregatedCommitInfo combine(List<HiveCommitInfo> commitInfos) {
-        if (commitInfos == null || commitInfos.size() == 0) {
+        if (CollectionUtils.isEmpty(commitInfos)) {
             return null;
         }
-        Map<String, String> aggregateCommitInfo = new HashMap<>();
+        Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
+        Map<String, List<String>> partitionDirAndValsMap = new HashMap<>();
         commitInfos.stream().forEach(commitInfo -> {
-            aggregateCommitInfo.putAll(commitInfo.getNeedMoveFiles());
+            Map<String, String> needMoveFileMap = 
aggregateCommitInfo.get(commitInfo.getFileCommitInfo().getTransactionDir());
+            if (needMoveFileMap == null) {
+                needMoveFileMap = new HashMap<>();
+                
aggregateCommitInfo.put(commitInfo.getFileCommitInfo().getTransactionDir(), 
needMoveFileMap);
+            }
+            
needMoveFileMap.putAll(commitInfo.getFileCommitInfo().getNeedMoveFiles());
+            Set<Map.Entry<String, List<String>>> entries = 
commitInfo.getFileCommitInfo().getPartitionDirAndValsMap().entrySet();
+            if (!CollectionUtils.isEmpty(entries)) {
+                
partitionDirAndValsMap.putAll(commitInfo.getFileCommitInfo().getPartitionDirAndValsMap());
+            }
         });
-        return new HiveAggregatedCommitInfo(aggregateCommitInfo);
+        return new HiveAggregatedCommitInfo(
+            new FileAggregatedCommitInfo(aggregateCommitInfo, 
partitionDirAndValsMap),
+            commitInfos.get(0).getHiveMetastoreUris(),
+            commitInfos.get(0).getTable());
     }
 
     @Override
@@ -72,9 +119,17 @@ public class HiveSinkAggregatedCommitter implements 
SinkAggregatedCommitter<Hive
         }
         aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
             try {
-                Map<String, String> needMoveFiles = 
aggregateCommitInfo.getNeedMoveFiles();
-                for (Map.Entry<String, String> entry : 
needMoveFiles.entrySet()) {
-                    HdfsUtils.renameFile(entry.getValue(), entry.getKey(), 
true);
+                for (Map.Entry<String, Map<String, String>> entry : 
aggregateCommitInfo.getFileAggregatedCommitInfo().getTransactionMap().entrySet())
 {
+                    // rollback the file
+                    for (Map.Entry<String, String> mvFileEntry : 
entry.getValue().entrySet()) {
+                        if (HdfsUtils.fileExist(mvFileEntry.getValue()) && 
!HdfsUtils.fileExist(mvFileEntry.getKey())) {
+                            HdfsUtils.renameFile(mvFileEntry.getValue(), 
mvFileEntry.getKey(), true);
+                        }
+                    }
+                    // delete the transaction dir
+                    HdfsUtils.deleteFile(entry.getKey());
+
+                    // The partitions that have been added will be preserved 
and will not be deleted
                 }
             } catch (IOException e) {
                 LOGGER.error("abort aggregateCommitInfo error ", e);
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
index e4495121b..a37dd3a51 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
@@ -17,90 +17,132 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_NAME_EXPRESSION;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.IS_PARTITION_FIELD_WRITE_IN_FILE;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.ROW_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SAVE_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.Constant.HIVE_METASTORE_URIS;
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.Constant.HIVE_RESULT_TABLE_NAME;
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
+
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
 import lombok.Data;
 import lombok.NonNull;
-import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
 
+import java.io.Serializable;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 @Data
-public class HiveSinkConfig {
-
-    private static final String HIVE_SAVE_MODE = "save_mode";
-
-    private static final String HIVE_SINK_COLUMNS = "sink_columns";
-
-    private static final String HIVE_PARTITION_BY = "partition_by";
-
-    private static final String HIVE_RESULT_TABLE_NAME = "result_table_name";
+public class HiveSinkConfig implements Serializable {
+    private String hiveTableName;
+    private List<String> hivePartitionFieldList;
+    private String hiveMetaUris;
 
-    private static final String SINK_TMP_FS_ROOT_PATH = 
"sink_tmp_fs_root_path";
+    private String dbName;
 
-    private static final String HIVE_TABLE_FS_PATH = "hive_table_fs_path";
+    private String tableName;
 
-    private static final String HIVE_TXT_FILE_FIELD_DELIMITER = 
"hive_txt_file_field_delimiter";
+    private Table table;
 
-    private static final String HIVE_TXT_FILE_LINE_DELIMITER = 
"hive_txt_file_line_delimiter";
+    private TextFileSinkConfig textFileSinkConfig;
 
-    private SaveMode saveMode = SaveMode.APPEND;
+    public HiveSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType 
seaTunnelRowTypeInfo) {
+        
checkArgument(!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
 
-    private String sinkTmpFsRootPath = "/tmp/seatunnel";
+        if (config.hasPath(HIVE_RESULT_TABLE_NAME) && 
!StringUtils.isBlank(config.getString(HIVE_RESULT_TABLE_NAME))) {
+            this.hiveTableName = config.getString(HIVE_RESULT_TABLE_NAME);
+        }
+        checkNotNull(hiveTableName);
 
-    private List<String> partitionFieldNames;
+        if (config.hasPath(HIVE_METASTORE_URIS) && 
!StringUtils.isBlank(config.getString(HIVE_METASTORE_URIS))) {
+            this.hiveMetaUris = config.getString(HIVE_METASTORE_URIS);
+        }
+        checkNotNull(hiveMetaUris);
 
-    private String hiveTableName;
+        String[] dbAndTableName = hiveTableName.split("\\.");
+        if (dbAndTableName == null || dbAndTableName.length != 2) {
+            throw new RuntimeException("Please config " + 
HIVE_RESULT_TABLE_NAME + " as db.table format");
+        }
+        this.dbName = dbAndTableName[0];
+        this.tableName = dbAndTableName[1];
+        HiveMetaStoreProxy hiveMetaStoreProxy = new 
HiveMetaStoreProxy(hiveMetaUris);
+        HiveMetaStoreClient hiveMetaStoreClient = 
hiveMetaStoreProxy.getHiveMetaStoreClient();
+
+        try {
+            table = hiveMetaStoreClient.getTable(dbName, tableName);
+            String inputFormat = table.getSd().getInputFormat();
+            if 
("org.apache.hadoop.mapred.TextInputFormat".equals(inputFormat)) {
+                config = config.withValue(FILE_FORMAT, 
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
+            } else {
+                throw new RuntimeException("Only support text file now");
+            }
 
-    private List<String> sinkColumns;
+            Map<String, String> parameters = 
table.getSd().getSerdeInfo().getParameters();
+            config = config.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE, 
ConfigValueFactory.fromAnyRef(false))
+                .withValue(FIELD_DELIMITER, 
ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
+                .withValue(ROW_DELIMITER, 
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")))
+                .withValue(FILE_NAME_EXPRESSION, 
ConfigValueFactory.fromAnyRef("${transactionId}"))
+                .withValue(PATH, 
ConfigValueFactory.fromAnyRef(table.getSd().getLocation()));
 
-    private String hiveTableFsPath;
+            if (!config.hasPath(SAVE_MODE) || 
StringUtils.isBlank(config.getString(Constant.SAVE_MODE))) {
+                config = config.withValue(SAVE_MODE, 
ConfigValueFactory.fromAnyRef(SaveMode.APPEND.toString()));
+            }
 
-    private String hiveTxtFileFieldDelimiter = String.valueOf('\001');
+            this.textFileSinkConfig = new TextFileSinkConfig(config, 
seaTunnelRowTypeInfo);
 
-    private String hiveTxtFileLineDelimiter = "\n";
+            // --------------------Check textFileSinkConfig with the hive 
table info-------------------
+            List<FieldSchema> fields = 
hiveMetaStoreClient.getFields(dbAndTableName[0], dbAndTableName[1]);
+            List<FieldSchema> partitionKeys = table.getPartitionKeys();
 
-    public enum SaveMode {
-        APPEND(),
-        OVERWRITE();
+            // Remove partitionKeys from table fields
+            List<FieldSchema> fieldNotContainPartitionKey = 
fields.stream().filter(filed -> 
!partitionKeys.contains(filed)).collect(Collectors.toList());
 
-        public static SaveMode fromStr(String str) {
-            if ("overwrite".equals(str)) {
-                return OVERWRITE;
-            } else {
-                return APPEND;
+            // check fields size must same as sinkColumnList size
+            if (fieldNotContainPartitionKey.size() != 
textFileSinkConfig.getSinkColumnList().size()) {
+                throw new RuntimeException("sink columns size must same as 
hive table field size");
             }
-        }
-    }
-
-    public HiveSinkConfig(@NonNull Config pluginConfig) {
-        checkNotNull(pluginConfig.getString(HIVE_RESULT_TABLE_NAME));
-        checkNotNull(pluginConfig.getString(HIVE_TABLE_FS_PATH));
-        this.hiveTableName = pluginConfig.getString(HIVE_RESULT_TABLE_NAME);
-        this.hiveTableFsPath = pluginConfig.getString(HIVE_TABLE_FS_PATH);
 
-        this.saveMode = 
StringUtils.isBlank(pluginConfig.getString(HIVE_SAVE_MODE)) ? SaveMode.APPEND : 
SaveMode.fromStr(pluginConfig.getString(HIVE_SAVE_MODE));
-        if 
(!StringUtils.isBlank(pluginConfig.getString(SINK_TMP_FS_ROOT_PATH))) {
-            this.sinkTmpFsRootPath = 
pluginConfig.getString(SINK_TMP_FS_ROOT_PATH);
-        }
-
-        this.partitionFieldNames = 
pluginConfig.getStringList(HIVE_PARTITION_BY);
-        this.sinkColumns = pluginConfig.getStringList(HIVE_SINK_COLUMNS);
+            // check hivePartitionFieldList size must same as 
partitionFieldList size
+            if (partitionKeys.size() != 
textFileSinkConfig.getPartitionFieldList().size()) {
+                throw new RuntimeException("partition by columns size must 
same as hive table partition columns size");
+            }
 
-        if 
(!StringUtils.isBlank(pluginConfig.getString(HIVE_TXT_FILE_FIELD_DELIMITER))) {
-            this.hiveTxtFileFieldDelimiter = 
pluginConfig.getString(HIVE_TXT_FILE_FIELD_DELIMITER);
+            // --------------------Check textFileSinkConfig with the hive 
table info end----------------
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        } finally {
+            hiveMetaStoreClient.close();
         }
 
-        if 
(!StringUtils.isBlank(pluginConfig.getString(HIVE_TXT_FILE_LINE_DELIMITER))) {
-            this.hiveTxtFileLineDelimiter = 
pluginConfig.getString(HIVE_TXT_FILE_LINE_DELIMITER);
+        // hive only support append or overwrite
+        if (!this.textFileSinkConfig.getSaveMode().equals(SaveMode.APPEND) && 
!this.textFileSinkConfig.getSaveMode().equals(SaveMode.OVERWRITE)) {
+            throw new RuntimeException("hive only support append or overwrite 
save mode");
         }
+    }
 
-        // partition fields must in sink columns
-        if (!CollectionUtils.isEmpty(this.sinkColumns) && 
!CollectionUtils.isEmpty(this.partitionFieldNames) && 
!this.sinkColumns.containsAll(this.partitionFieldNames)) {
-            throw new RuntimeException("partition fields must in sink 
columns");
-        }
+    public TextFileSinkConfig getTextFileSinkConfig() {
+        return textFileSinkConfig;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
index 4f9f5d12e..a104151c3 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
@@ -25,5 +25,6 @@ import java.io.Serializable;
 @Data
 @AllArgsConstructor
 public class HiveSinkState implements Serializable {
-    private HiveSinkConfig hiveSinkConfig;
+    private String transactionId;
+    private Long checkpointId;
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
index e04f57729..4bdeae478 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
@@ -20,48 +20,120 @@ package 
org.apache.seatunnel.connectors.seatunnel.hive.sink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.FileWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.HdfsTxtFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.FileSinkState;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.hdfs.HdfsFileSinkPlugin;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkPartitionDirNameGenerator;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import com.google.common.collect.Lists;
 import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, 
HiveCommitInfo, HiveSinkState> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HiveSinkWriter.class);
 
-    private SeaTunnelRowType seaTunnelRowType;
+    private SeaTunnelRowType seaTunnelRowTypeInfo;
     private Config pluginConfig;
-    private SinkWriter.Context context;
-    private long jobId;
+    private Context context;
+    private String jobId;
 
-    private FileWriter fileWriter;
+    private TransactionStateFileWriter fileWriter;
 
     private HiveSinkConfig hiveSinkConfig;
 
-    public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowType,
+    public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
                           @NonNull Config pluginConfig,
                           @NonNull SinkWriter.Context context,
-                          long jobId) {
-        this.seaTunnelRowType = seaTunnelRowType;
+                          @NonNull HiveSinkConfig hiveSinkConfig,
+                          @NonNull String jobId) {
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
         this.pluginConfig = pluginConfig;
         this.context = context;
         this.jobId = jobId;
+        this.hiveSinkConfig = hiveSinkConfig;
+
+        SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
+        Optional<TransactionStateFileWriter> transactionStateFileWriter = 
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
+            new FileSinkTransactionFileNameGenerator(
+                this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
+                
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
+                
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat()),
+            new FileSinkPartitionDirNameGenerator(
+                
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
+                
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
+                
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression()),
+            
this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
+            this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
+            this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
+            this.jobId,
+            this.context.getIndexOfSubtask(),
+            this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
+            this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
+            sinkFileSystemPlugin.getFileSystem().get());
+
+        if (!transactionStateFileWriter.isPresent()) {
+            throw new RuntimeException("A TransactionStateFileWriter is need");
+        }
+
+        this.fileWriter = transactionStateFileWriter.get();
+
+        fileWriter.beginTransaction(1L);
+    }
 
-        hiveSinkConfig = new HiveSinkConfig(this.pluginConfig);
-        fileWriter = new HdfsTxtFileWriter(this.seaTunnelRowType,
-            hiveSinkConfig,
+    public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+                          @NonNull Config pluginConfig,
+                          @NonNull SinkWriter.Context context,
+                          @NonNull HiveSinkConfig hiveSinkConfig,
+                          @NonNull String jobId,
+                          @NonNull List<HiveSinkState> hiveSinkStates) {
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+        this.pluginConfig = pluginConfig;
+        this.context = context;
+        this.jobId = jobId;
+        this.hiveSinkConfig = hiveSinkConfig;
+
+        SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
+        Optional<TransactionStateFileWriter> transactionStateFileWriter = 
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
+            new FileSinkTransactionFileNameGenerator(
+                this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
+                
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
+                
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat()),
+            new FileSinkPartitionDirNameGenerator(
+                
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
+                
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
+                
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression()),
+            
this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
+            this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
+            this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
             this.jobId,
-            this.context.getIndexOfSubtask());
+            this.context.getIndexOfSubtask(),
+            this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
+            this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
+            sinkFileSystemPlugin.getFileSystem().get());
+
+        if (!transactionStateFileWriter.isPresent()) {
+            throw new RuntimeException("A TransactionStateFileWriter is need");
+        }
+
+        this.fileWriter = transactionStateFileWriter.get();
+
+        // Rollback dirty transaction
+        if (hiveSinkStates.size() > 0) {
+            List<String> transactionAfter = 
fileWriter.getTransactionAfter(hiveSinkStates.get(0).getTransactionId());
+            fileWriter.abortTransactions(transactionAfter);
+        }
+        fileWriter.beginTransaction(hiveSinkStates.get(0).getCheckpointId() + 
1);
     }
 
     @Override
@@ -71,18 +143,12 @@ public class HiveSinkWriter implements 
SinkWriter<SeaTunnelRow, HiveCommitInfo,
 
     @Override
     public Optional<HiveCommitInfo> prepareCommit() throws IOException {
-        fileWriter.finishAndCloseWriteFile();
-        /**
-         * We will clear the needMoveFiles in {@link #snapshotState()}, So we 
need copy the needMoveFiles map here.
-         */
-        Map<String, String> commitInfoMap = new 
HashMap<>(fileWriter.getNeedMoveFiles().size());
-        commitInfoMap.putAll(fileWriter.getNeedMoveFiles());
-        return Optional.of(new HiveCommitInfo(commitInfoMap));
-    }
-
-    @Override
-    public void abortPrepare() {
-        fileWriter.abort();
+        Optional<FileCommitInfo> fileCommitInfoOptional = 
fileWriter.prepareCommit();
+        if (fileCommitInfoOptional.isPresent()) {
+            FileCommitInfo fileCommitInfo = fileCommitInfoOptional.get();
+            return Optional.of(new HiveCommitInfo(fileCommitInfo, 
hiveSinkConfig.getHiveMetaUris(), this.hiveSinkConfig.getTable()));
+        }
+        return Optional.empty();
     }
 
     @Override
@@ -92,8 +158,17 @@ public class HiveSinkWriter implements 
SinkWriter<SeaTunnelRow, HiveCommitInfo,
 
     @Override
     public List<HiveSinkState> snapshotState(long checkpointId) throws 
IOException {
-        //reset FileWrite
-        fileWriter.resetFileWriter(System.currentTimeMillis() + "");
-        return Lists.newArrayList(new HiveSinkState(hiveSinkConfig));
+        List<FileSinkState> fileSinkStates = 
fileWriter.snapshotState(checkpointId);
+        if (!CollectionUtils.isEmpty(fileSinkStates)) {
+            return fileSinkStates.stream().map(state ->
+                    new HiveSinkState(state.getTransactionId(), 
state.getCheckpointId()))
+                .collect(Collectors.toList());
+        }
+        return null;
+    }
+
+    @Override
+    public void abortPrepare() {
+        fileWriter.abortTransaction();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
deleted file mode 100644
index 57ca3c901..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkConfig;
-
-import lombok.NonNull;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-public abstract class AbstractFileWriter implements FileWriter {
-    protected Map<String, String> needMoveFiles;
-    protected SeaTunnelRowType seaTunnelRowType;
-    protected long jobId;
-    protected int subTaskIndex;
-    protected HiveSinkConfig hiveSinkConfig;
-
-    private static final String SEATUNNEL = "seatunnel";
-    private static final String NON_PARTITION = "NON_PARTITION";
-
-    protected Map<String, String> beingWrittenFile;
-
-    protected String checkpointId;
-    protected final int[] partitionKeyIndexes;
-
-    public AbstractFileWriter(@NonNull SeaTunnelRowType seaTunnelRowType,
-                              @NonNull HiveSinkConfig hiveSinkConfig,
-                              long jobId,
-                              int subTaskIndex) {
-        checkArgument(jobId > 0);
-        checkArgument(subTaskIndex > -1);
-
-        this.needMoveFiles = new HashMap<>();
-        this.seaTunnelRowType = seaTunnelRowType;
-        this.jobId = jobId;
-        this.subTaskIndex = subTaskIndex;
-        this.hiveSinkConfig = hiveSinkConfig;
-
-        this.beingWrittenFile = new HashMap<>();
-        if (this.hiveSinkConfig.getPartitionFieldNames() == null) {
-            this.partitionKeyIndexes = new int[0];
-        } else {
-            this.partitionKeyIndexes = IntStream.range(0, 
seaTunnelRowType.getTotalFields())
-                .filter(i -> 
hiveSinkConfig.getPartitionFieldNames().contains(seaTunnelRowType.getFieldName(i)))
-                .toArray();
-        }
-    }
-
-    public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow 
seaTunnelRow) {
-        String beingWrittenFileKey = getBeingWrittenFileKey(seaTunnelRow);
-        // get filePath from beingWrittenFile
-        String beingWrittenFilePath = 
beingWrittenFile.get(beingWrittenFileKey);
-        if (beingWrittenFilePath != null) {
-            return beingWrittenFilePath;
-        } else {
-            StringBuilder sbf = new 
StringBuilder(hiveSinkConfig.getSinkTmpFsRootPath());
-            sbf.append("/")
-                .append(SEATUNNEL)
-                .append("/")
-                .append(jobId)
-                .append("/")
-                .append(checkpointId)
-                .append("/")
-                .append(hiveSinkConfig.getHiveTableName())
-                .append("/")
-                .append(beingWrittenFileKey)
-                .append("/")
-                .append(jobId)
-                .append("_")
-                .append(subTaskIndex)
-                .append(".")
-                .append(getFileSuffix());
-            String newBeingWrittenFilePath = sbf.toString();
-            beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
-            return newBeingWrittenFilePath;
-        }
-    }
-
-    private String getBeingWrittenFileKey(@NonNull SeaTunnelRow seaTunnelRow) {
-        if (partitionKeyIndexes.length > 0) {
-            return Arrays.stream(partitionKeyIndexes)
-                .boxed()
-                .map(i -> seaTunnelRowType.getFieldName(i) + "=" + 
seaTunnelRow.getField(i))
-                .collect(Collectors.joining("/"));
-        } else {
-            // If there is no partition field in data, We use the fixed value 
NON_PARTITION as the partition directory
-            return NON_PARTITION;
-        }
-    }
-
-    /**
-     * FileWriter need return the file suffix. eg: tex, orc, parquet
-     *
-     * @return
-     */
-    @NonNull
-    public abstract String getFileSuffix();
-
-    public String getHiveLocation(@NonNull String seaTunnelFilePath) {
-        StringBuilder sbf = new 
StringBuilder(hiveSinkConfig.getSinkTmpFsRootPath());
-        sbf.append("/")
-            .append(SEATUNNEL)
-            .append("/")
-            .append(jobId)
-            .append("/")
-            .append(checkpointId)
-            .append("/")
-            .append(hiveSinkConfig.getHiveTableName());
-        String seaTunnelPath = sbf.toString();
-        String tmpPath = seaTunnelFilePath.replaceAll(seaTunnelPath, 
hiveSinkConfig.getHiveTableFsPath());
-        return tmpPath.replaceAll(NON_PARTITION + "/", "");
-    }
-
-    @Override
-    public void resetFileWriter(@NonNull String checkpointId) {
-        this.checkpointId = checkpointId;
-        this.needMoveFiles = new HashMap<>();
-        this.beingWrittenFile = new HashMap<>();
-        this.resetMoreFileWriter(checkpointId);
-    }
-
-    public abstract void resetMoreFileWriter(@NonNull String checkpointId);
-
-    @Override
-    public void abort() {
-        this.needMoveFiles = new HashMap<>();
-        this.beingWrittenFile = new HashMap<>();
-        this.abortMore();
-    }
-
-    public abstract void abortMore();
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
deleted file mode 100644
index 8ee8777a1..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-
-import lombok.NonNull;
-
-import java.util.Map;
-
-public interface FileWriter {
-
-    void write(@NonNull SeaTunnelRow seaTunnelRow);
-
-    @NonNull
-    Map<String, String> getNeedMoveFiles();
-
-    /**
-     * In this method we need finish write the file. The following operations 
are often required:
-     * 1. Flush memory to disk.
-     * 2. Close output stream.
-     * 3. Add the mapping relationship between seatunnel file path and hive 
file path to needMoveFiles.
-     */
-    void finishAndCloseWriteFile();
-
-    /**
-     * The writer needs to be reset after each checkpoint is completed
-     *
-     * @param checkpointId checkpointId
-     */
-    void resetFileWriter(@NonNull String checkpointId);
-
-    void abort();
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
deleted file mode 100644
index 71b26568f..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkConfig;
-
-import lombok.Lombok;
-import lombok.NonNull;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-public class HdfsTxtFileWriter extends AbstractFileWriter {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(HdfsTxtFileWriter.class);
-    private Map<String, FSDataOutputStream> beingWrittenOutputStream;
-    protected final int[] sinkColumnIndexes;
-
-    public HdfsTxtFileWriter(SeaTunnelRowType seaTunnelRowType,
-                             HiveSinkConfig hiveSinkConfig,
-                             long sinkId,
-                             int subTaskIndex) {
-        super(seaTunnelRowType, hiveSinkConfig, sinkId, subTaskIndex);
-        beingWrittenOutputStream = new HashMap<>();
-        List<String> sinkColumns = hiveSinkConfig.getSinkColumns();
-        if (sinkColumns == null || sinkColumns.size() == 0) {
-            this.sinkColumnIndexes = IntStream.range(0, 
seaTunnelRowType.getTotalFields()).toArray();
-        } else {
-            this.sinkColumnIndexes = IntStream.range(0, 
seaTunnelRowType.getTotalFields())
-                .filter(i -> 
sinkColumns.contains(seaTunnelRowType.getFieldName(i)))
-                .toArray();
-        }
-    }
-
-    @Override
-    @NonNull
-    public String getFileSuffix() {
-        return "txt";
-    }
-
-    @Override
-    public void resetMoreFileWriter(@NonNull String checkpointId) {
-        this.beingWrittenOutputStream = new HashMap<>();
-    }
-
-    @Override
-    public void abortMore() {
-        // delete files
-        beingWrittenOutputStream.keySet().stream().forEach(file -> {
-            try {
-                boolean deleted = HdfsUtils.deleteFile(file);
-                if (!deleted) {
-                    LOGGER.error("delete file {} error", file);
-                    throw new IOException(String.format("delete file {} 
error", file));
-                }
-            } catch (IOException e) {
-                LOGGER.error("delete file {} error", file);
-                throw new RuntimeException(e);
-            }
-        });
-
-        this.beingWrittenOutputStream = new HashMap<>();
-    }
-
-    @Override
-    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
-        Lombok.checkNotNull(seaTunnelRow, "seaTunnelRow is null");
-        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
-        FSDataOutputStream fsDataOutputStream = 
getOrCreateOutputStream(filePath);
-        String line = transformRowToLine(seaTunnelRow);
-        try {
-            fsDataOutputStream.write(line.getBytes());
-            
fsDataOutputStream.write(hiveSinkConfig.getHiveTxtFileLineDelimiter().getBytes());
-        } catch (IOException e) {
-            LOGGER.error("write data to file {} error", filePath);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @NonNull
-    @Override
-    public Map<String, String> getNeedMoveFiles() {
-        return this.needMoveFiles;
-    }
-
-    @Override
-    public void finishAndCloseWriteFile() {
-        beingWrittenOutputStream.entrySet().forEach(entry -> {
-            try {
-                entry.getValue().flush();
-            } catch (IOException e) {
-                LOGGER.error("error when flush file {}", entry.getKey());
-                throw new RuntimeException(e);
-            } finally {
-                try {
-                    entry.getValue().close();
-                } catch (IOException e) {
-                    LOGGER.error("error when close output stream {}", 
entry.getKey());
-                }
-            }
-
-            needMoveFiles.put(entry.getKey(), getHiveLocation(entry.getKey()));
-        });
-    }
-
-    private FSDataOutputStream getOrCreateOutputStream(@NonNull String 
filePath) {
-        FSDataOutputStream fsDataOutputStream = 
beingWrittenOutputStream.get(filePath);
-        if (fsDataOutputStream == null) {
-            try {
-                fsDataOutputStream = HdfsUtils.getOutputStream(filePath);
-                beingWrittenOutputStream.put(filePath, fsDataOutputStream);
-            } catch (IOException e) {
-                LOGGER.error("can not get output file stream");
-                throw new RuntimeException(e);
-            }
-        }
-        return fsDataOutputStream;
-    }
-
-    private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) {
-        return Arrays.stream(sinkColumnIndexes)
-            .boxed()
-            .map(seaTunnelRow::getField)
-            .map(value -> value == null ? "" : value.toString())
-            
.collect(Collectors.joining(hiveSinkConfig.getHiveTxtFileFieldDelimiter()));
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
deleted file mode 100644
index 23b1e5843..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
-
-import lombok.NonNull;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class HdfsUtils {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(HdfsUtils.class);
-
-    public static final int WRITE_BUFFER_SIZE = 2048;
-
-    public static FileSystem getHdfsFs(@NonNull String path)
-        throws IOException {
-        Configuration conf = new Configuration();
-        conf.set("fs.hdfs.impl", 
"org.apache.hadoop.hdfs.DistributedFileSystem");
-        conf.set("fs.defaultFs", path);
-        return FileSystem.get(conf);
-    }
-
-    public static FSDataOutputStream getOutputStream(@NonNull String 
outFilePath) throws IOException {
-        FileSystem hdfsFs = getHdfsFs(outFilePath);
-        Path path = new Path(outFilePath);
-        FSDataOutputStream fsDataOutputStream = hdfsFs.create(path, true, 
WRITE_BUFFER_SIZE);
-        return fsDataOutputStream;
-    }
-
-    public static boolean deleteFile(@NonNull String file) throws IOException {
-        FileSystem hdfsFs = getHdfsFs(file);
-        return hdfsFs.delete(new Path(file), true);
-    }
-
-    /**
-     * rename file
-     *
-     * @param oldName     old file name
-     * @param newName     target file name
-     * @param rmWhenExist if this is true, we will delete the target file when 
it already exists
-     * @throws IOException throw IOException
-     */
-    public static void renameFile(@NonNull String oldName, @NonNull String 
newName, boolean rmWhenExist) throws IOException {
-        FileSystem hdfsFs = getHdfsFs(newName);
-        LOGGER.info("begin rename file oldName :[" + oldName + "] to newName 
:[" + newName + "]");
-
-        Path oldPath = new Path(oldName);
-        Path newPath = new Path(newName);
-        if (rmWhenExist) {
-            if (fileExist(newName) && fileExist(oldName)) {
-                hdfsFs.delete(newPath, true);
-            }
-        }
-        if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
-            createDir(newName.substring(0, newName.lastIndexOf("/")));
-        }
-        LOGGER.info("rename file :[" + oldPath + "] to [" + newPath + "] 
finish");
-
-        hdfsFs.rename(oldPath, newPath);
-    }
-
-    public static boolean createDir(@NonNull String filePath)
-        throws IOException {
-
-        FileSystem hdfsFs = getHdfsFs(filePath);
-        Path dfs = new Path(filePath);
-        return hdfsFs.mkdirs(dfs);
-    }
-
-    public static boolean fileExist(@NonNull String filePath)
-        throws IOException {
-        FileSystem hdfsFs = getHdfsFs(filePath);
-        Path fileName = new Path(filePath);
-        return hdfsFs.exists(fileName);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
new file mode 100644
index 000000000..30c9a2eba
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+
+import lombok.NonNull;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+public class HiveMetaStoreProxy {
+
+    private HiveMetaStoreClient hiveMetaStoreClient;
+
+    public HiveMetaStoreProxy(@NonNull String uris) {
+        HiveConf hiveConf = new HiveConf();
+        hiveConf.set("hive.metastore.uris", uris);
+        try {
+            hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
+        } catch (MetaException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Table getTable(@NonNull String dbName, @NonNull String tableName) {
+        try {
+            return hiveMetaStoreClient.getTable(dbName, tableName);
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public HiveMetaStoreClient getHiveMetaStoreClient() {
+        return hiveMetaStoreClient;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/TestHiveSinkConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/TestHiveSinkConfig.java
new file mode 100644
index 000000000..92e5a6931
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/TestHiveSinkConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.File;
+import java.util.List;
+
+@RunWith(JUnit4.class)
+public class TestHiveSinkConfig {
+
+    @Test
+    public void testCreateHiveSinkConfig() {
+        String[] fieldNames = new String[]{"name", "age"};
+        SeaTunnelDataType[] seaTunnelDataTypes = new 
SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.INT_TYPE};
+        SeaTunnelRowType seaTunnelRowTypeInfo = new 
SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+        String configFile = "fakesource_to_hive.conf";
+        String configFilePath = System.getProperty("user.dir") + 
"/src/test/resources/" + configFile;
+        Config config = ConfigFactory
+            .parseFile(new File(configFilePath))
+            .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+            .resolveWith(ConfigFactory.systemProperties(),
+                ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        List<? extends Config> sink = config.getConfigList("sink");
+        HiveSinkConfig hiveSinkConfig = new HiveSinkConfig(sink.get(0), 
seaTunnelRowTypeInfo);
+    }
+}
diff --git 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
 
b/seatunnel-connectors-v2/connector-hive/src/test/resources/fakesource_to_hive.conf
similarity index 80%
copy from 
seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
copy to 
seatunnel-connectors-v2/connector-hive/src/test/resources/fakesource_to_hive.conf
index c1ce63055..3412ea663 100644
--- 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/resources/fakesource_to_hive.conf
@@ -21,8 +21,8 @@
 env {
   # You can set flink configuration here
   execution.parallelism = 1
-  job.mode = "STREAMING"
-  execution.checkpoint.interval = 5000
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
@@ -38,29 +38,16 @@ source {
 }
 
 transform {
-
-    sql {
-      sql = "select name,age from fake"
-    }
   # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
   # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
 }
 
 sink {
-  File {
-    path="file:///tmp/hive/warehouse/test2"
-    field_delimiter="\t"
-    row_delimiter="\n"
+  Hive {
+    hive_table_name="default.test_fake_to_hive"
+    hive_metastore_uris="thrift://localhost:9083"
     partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="text"
     sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error"
-
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
diff --git a/seatunnel-connectors-v2/connector-hudi/pom.xml 
b/seatunnel-connectors-v2/connector-hudi/pom.xml
index 26b3c7b7c..0b2ee9ce8 100644
--- a/seatunnel-connectors-v2/connector-hudi/pom.xml
+++ b/seatunnel-connectors-v2/connector-hudi/pom.xml
@@ -32,9 +32,9 @@
     <dependencies>
 
         <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-hive-shade</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git a/seatunnel-dependency-shade/pom.xml 
b/seatunnel-dependency-shade/pom.xml
deleted file mode 100644
index 1e6259991..000000000
--- a/seatunnel-dependency-shade/pom.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-
-    <parent>
-        <artifactId>seatunnel</artifactId>
-        <groupId>org.apache.seatunnel</groupId>
-        <version>${revision}</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>seatunnel-dependency-shade</artifactId>
-    <packaging>pom</packaging>
-    
-    <modules>
-        <module>seatunnel-hive-shade</module>
-    </modules>
-
-</project>
\ No newline at end of file
diff --git a/seatunnel-dependency-shade/seatunnel-hive-shade/pom.xml 
b/seatunnel-dependency-shade/seatunnel-hive-shade/pom.xml
deleted file mode 100644
index ddab451b6..000000000
--- a/seatunnel-dependency-shade/seatunnel-hive-shade/pom.xml
+++ /dev/null
@@ -1,69 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>seatunnel-dependency-shade</artifactId>
-        <groupId>org.apache.seatunnel</groupId>
-        <version>${revision}</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>seatunnel-hive-shade</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.hive</groupId>
-            <artifactId>hive-exec</artifactId>
-        </dependency>
-    </dependencies>
-    <build>
-        <finalName>${project.artifactId}-${project.version}</finalName>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <filters>
-                                <filter>
-                                    <artifact>*:*</artifact>
-                                    <excludes>
-                                        <exclude>META-INF/*.SF</exclude>
-                                        <exclude>META-INF/*.DSA</exclude>
-                                        <exclude>META-INF/*.RSA</exclude>
-                                        
<exclude>org/apache/calcite/**</exclude>
-                                        <exclude>org/htrace/**</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-          
-        </plugins>
-    </build>
-</project>
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 68e0ad2e8..f2d18d62f 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -56,6 +56,11 @@
             <artifactId>connector-console</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-local</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-socket</artifactId>
@@ -101,6 +106,12 @@
             <scope>${flink.scope}</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
index c1ce63055..f7b790c40 100644
--- 
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
+++ 
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
@@ -21,8 +21,8 @@
 env {
   # You can set flink configuration here
   execution.parallelism = 1
-  job.mode = "STREAMING"
-  execution.checkpoint.interval = 5000
+  job.mode = "BATCH"
+  execution.checkpoint.interval = 10
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
@@ -47,7 +47,7 @@ transform {
 }
 
 sink {
-  File {
+  LocalFile {
     path="file:///tmp/hive/warehouse/test2"
     field_delimiter="\t"
     row_delimiter="\n"

Reply via email to