This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new f389cd79 [api-draft][connector] New hive sink connector (#1997)
f389cd79 is described below

commit f389cd79199500608e22f0207e2568f9bd57bd90
Author: Eric <[email protected]>
AuthorDate: Wed Jun 8 20:13:44 2022 +0800

    [api-draft][connector] New hive sink connector (#1997)
    
    * add checkpointId
    
    * add some method in hdfsutils
    
    * * Fix check style
    * Add License header
    * Remove meaningless's test
    
    * * some code optimization
    *Overflow PMD plugin because we already have Sonar
    
    * * Fix some method error
    
    * add checkstyle
    
    * update hdfs dependency to flink shaded
    
    * * Fix check style
    
    Co-authored-by: CalvinKirs <[email protected]>
---
 pom.xml                                            |  49 ++-----
 .../seatunnel-connectors-seatunnel/pom.xml         |   1 +
 .../seatunnel-connector-seatunnel-hive/pom.xml     |  37 +++++
 .../connectors/seatunnel/hive/config/Config.java   |  21 +++
 .../hive/sink/HiveAggregatedCommitInfo.java        |  35 +++++
 .../seatunnel/hive/sink/HiveCommitInfo.java        |  36 +++++
 .../connectors/seatunnel/hive/sink/HiveSink.java   |  87 ++++++++++++
 .../hive/sink/HiveSinkAggregatedCommitter.java     |  88 ++++++++++++
 .../seatunnel/hive/sink/HiveSinkConfig.java        | 106 ++++++++++++++
 .../seatunnel/hive/sink/HiveSinkState.java         |  29 ++++
 .../seatunnel/hive/sink/HiveSinkWriter.java        |  99 +++++++++++++
 .../hive/sink/file/writer/AbstractFileWriter.java  | 150 ++++++++++++++++++++
 .../hive/sink/file/writer/FileWriter.java          |  49 +++++++
 .../hive/sink/file/writer/HdfsTxtFileWriter.java   | 153 +++++++++++++++++++++
 .../seatunnel/hive/sink/file/writer/HdfsUtils.java |  96 +++++++++++++
 15 files changed, 995 insertions(+), 41 deletions(-)

diff --git a/pom.xml b/pom.xml
index dc5298ca..a858f5b9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -609,6 +609,13 @@
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-shaded-hadoop-2</artifactId>
+                <version>${flink-shaded-hadoop-2.version}</version>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 
@@ -853,42 +860,7 @@
                         </execution>
                     </executions>
                 </plugin>
-
-                <plugin>
-                    <groupId>org.apache.maven.plugins</groupId>
-                    <artifactId>maven-pmd-plugin</artifactId>
-                    <version>${maven-pmd-plugin.version}</version>
-                    <configuration>
-                        <rulesets>
-                            <ruleset>rulesets/java/ali-concurrent.xml</ruleset>
-                            <ruleset>rulesets/java/ali-exception.xml</ruleset>
-                            
<ruleset>rulesets/java/ali-flowcontrol.xml</ruleset>
-                            <ruleset>rulesets/java/ali-naming.xml</ruleset>
-                            <ruleset>rulesets/java/ali-oop.xml</ruleset>
-                            <ruleset>rulesets/java/ali-orm.xml</ruleset>
-                            <ruleset>rulesets/java/ali-other.xml</ruleset>
-                            <ruleset>rulesets/java/ali-set.xml</ruleset>
-                        </rulesets>
-                        <printFailingErrors>true</printFailingErrors>
-                        <skip>${skip.pmd.check}</skip>
-                    </configuration>
-                    <executions>
-                        <execution>
-                            <id>validate</id>
-                            <phase>validate</phase>
-                            <goals>
-                                <goal>check</goal>
-                            </goals>
-                        </execution>
-                    </executions>
-                    <dependencies>
-                        <dependency>
-                            <groupId>com.alibaba.p3c</groupId>
-                            <artifactId>p3c-pmd</artifactId>
-                            <version>${p3c-pmd.version}</version>
-                        </dependency>
-                    </dependencies>
-                </plugin>
+                
                 <!-- checkstyle (End) -->
 
                 <plugin>
@@ -1008,11 +980,6 @@
                 <artifactId>scalastyle-maven-plugin</artifactId>
             </plugin>
 
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-pmd-plugin</artifactId>
-            </plugin>
-
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>license-maven-plugin</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index e0770f9c..9254ba5f 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -31,6 +31,7 @@
     <artifactId>seatunnel-connectors-seatunnel</artifactId>
 
     <modules>
+        <module>seatunnel-connector-seatunnel-hive</module>
         <module>seatunnel-connector-seatunnel-console</module>
         <module>seatunnel-connector-seatunnel-fake</module>
         <module>seatunnel-connector-seatunnel-kafka</module>
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/pom.xml
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/pom.xml
new file mode 100644
index 00000000..2de900d1
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>seatunnel-connectors-seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-connectors-seatunnel-hive</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-hadoop-2</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
new file mode 100644
index 00000000..2177ffbd
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
+
+public class Config {
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
new file mode 100644
index 00000000..f1eee53d
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class HiveAggregatedCommitInfo {
+
+    /**
+     * Storage the commit info in map.
+     * K is the file path need to be moved to hive data dir.
+     * V is the target file path of the data file.
+     */
+    private Map<String, String> needMoveFiles;
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
new file mode 100644
index 00000000..0dd58f8f
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class HiveCommitInfo implements Serializable {
+
+    /**
+     * Storage the commit info in map.
+     * K is the file path need to be moved to hive data dir.
+     * V is the target file path of the data file.
+     */
+    private Map<String, String> needMoveFiles;
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
new file mode 100644
index 00000000..ac0a2b68
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Hive Sink implementation by using SeaTunnel sink API.
+ * This class contains the method to create {@link HiveSinkWriter} and {@link 
HiveSinkAggregatedCommitter}.
+ */
+@AutoService(SeaTunnelSink.class)
+public class HiveSink implements SeaTunnelSink<SeaTunnelRow, HiveSinkState, 
HiveCommitInfo, HiveAggregatedCommitInfo> {
+
+    private Config config;
+    private long jobId;
+    private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+
+    @Override
+    public String getPluginName() {
+        return "Hive";
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        this.config = pluginConfig;
+        this.jobId = System.currentTimeMillis();
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState> 
createWriter(SinkWriter.Context context) throws IOException {
+        return new HiveSinkWriter(seaTunnelRowTypeInfo, config, context, 
System.currentTimeMillis());
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState> 
restoreWriter(SinkWriter.Context context, List<HiveSinkState> states) throws 
IOException {
+        return new HiveSinkWriter(seaTunnelRowTypeInfo, config, context, 
System.currentTimeMillis());
+    }
+
+    @Override
+    public SeaTunnelContext getSeaTunnelContext() {
+        return null;
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+
+    }
+
+    @Override
+    public Optional<SinkAggregatedCommitter<HiveCommitInfo, 
HiveAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
+        return Optional.of(new HiveSinkAggregatedCommitter());
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
new file mode 100644
index 00000000..673923a5
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.HdfsUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HiveSinkAggregatedCommitter implements 
SinkAggregatedCommitter<HiveCommitInfo, HiveAggregatedCommitInfo> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HiveSinkAggregatedCommitter.class);
+
+    @Override
+    public List<HiveAggregatedCommitInfo> 
commit(List<HiveAggregatedCommitInfo> aggregatedCommitInfoList) throws 
IOException {
+        if (aggregatedCommitInfoList == null || 
aggregatedCommitInfoList.size() == 0) {
+            return null;
+        }
+        List errorAggregatedCommitInfoList = new ArrayList();
+        aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
+            try {
+                Map<String, String> needMoveFiles = 
aggregateCommitInfo.getNeedMoveFiles();
+                for (Map.Entry<String, String> entry : 
needMoveFiles.entrySet()) {
+                    HdfsUtils.renameFile(entry.getKey(), entry.getValue(), 
true);
+                }
+            } catch (IOException e) {
+                LOGGER.error("commit aggregateCommitInfo error ", e);
+                errorAggregatedCommitInfoList.add(aggregateCommitInfo);
+            }
+        });
+
+        return errorAggregatedCommitInfoList;
+    }
+
+    @Override
+    public HiveAggregatedCommitInfo combine(List<HiveCommitInfo> commitInfos) {
+        if (commitInfos == null || commitInfos.size() == 0) {
+            return null;
+        }
+        Map<String, String> aggregateCommitInfo = new HashMap<>();
+        commitInfos.stream().forEach(commitInfo -> {
+            aggregateCommitInfo.putAll(commitInfo.getNeedMoveFiles());
+        });
+        return new HiveAggregatedCommitInfo(aggregateCommitInfo);
+    }
+
+    @Override
+    public void abort(List<HiveAggregatedCommitInfo> aggregatedCommitInfoList) 
throws Exception {
+        if (aggregatedCommitInfoList == null || 
aggregatedCommitInfoList.size() == 0) {
+            return;
+        }
+        aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
+            try {
+                Map<String, String> needMoveFiles = 
aggregateCommitInfo.getNeedMoveFiles();
+                for (Map.Entry<String, String> entry : 
needMoveFiles.entrySet()) {
+                    HdfsUtils.renameFile(entry.getValue(), entry.getKey(), 
true);
+                }
+            } catch (IOException e) {
+                LOGGER.error("abort aggregateCommitInfo error ", e);
+            }
+        });
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
new file mode 100644
index 00000000..2e08862f
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+import lombok.NonNull;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+@Data
+public class HiveSinkConfig {
+
+    private static final String HIVE_SAVE_MODE = "save_mode";
+
+    private static final String HIVE_SINK_COLUMNS = "sink_columns";
+
+    private static final String HIVE_PARTITION_BY = "partition_by";
+
+    private static final String HIVE_RESULT_TABLE_NAME = "result_table_name";
+
+    private static final String SINK_TMP_FS_ROOT_PATH = 
"sink_tmp_fs_root_path";
+
+    private static final String HIVE_TABLE_FS_PATH = "hive_table_fs_path";
+
+    private static final String HIVE_TXT_FILE_FIELD_DELIMITER = 
"hive_txt_file_field_delimiter";
+
+    private static final String HIVE_TXT_FILE_LINE_DELIMITER = 
"hive_txt_file_line_delimiter";
+
+    private SaveMode saveMode = SaveMode.APPEND;
+
+    private String sinkTmpFsRootPath = "/tmp/seatunnel";
+
+    private List<String> partitionFieldNames;
+
+    private String hiveTableName;
+
+    private List<String> sinkColumns;
+
+    private String hiveTableFsPath;
+
+    private String hiveTxtFileFieldDelimiter = String.valueOf('\001');
+
+    private String hiveTxtFileLineDelimiter = "\n";
+
+    public enum SaveMode {
+        APPEND(),
+        OVERWRITE();
+
+        public static SaveMode fromStr(String str) {
+            if ("overwrite".equals(str)) {
+                return OVERWRITE;
+            } else {
+                return APPEND;
+            }
+        }
+    }
+
+    public HiveSinkConfig(@NonNull Config pluginConfig) {
+        checkNotNull(pluginConfig.getString(HIVE_RESULT_TABLE_NAME));
+        checkNotNull(pluginConfig.getString(HIVE_TABLE_FS_PATH));
+        this.hiveTableName = pluginConfig.getString(HIVE_RESULT_TABLE_NAME);
+        this.hiveTableFsPath = pluginConfig.getString(HIVE_TABLE_FS_PATH);
+
+        this.saveMode = 
StringUtils.isBlank(pluginConfig.getString(HIVE_SAVE_MODE)) ? SaveMode.APPEND : 
SaveMode.fromStr(pluginConfig.getString(HIVE_SAVE_MODE));
+        if 
(!StringUtils.isBlank(pluginConfig.getString(SINK_TMP_FS_ROOT_PATH))) {
+            this.sinkTmpFsRootPath = 
pluginConfig.getString(SINK_TMP_FS_ROOT_PATH);
+        }
+
+        this.partitionFieldNames = 
pluginConfig.getStringList(HIVE_PARTITION_BY);
+        this.sinkColumns = pluginConfig.getStringList(HIVE_SINK_COLUMNS);
+
+        if 
(!StringUtils.isBlank(pluginConfig.getString(HIVE_TXT_FILE_FIELD_DELIMITER))) {
+            this.hiveTxtFileFieldDelimiter = 
pluginConfig.getString(HIVE_TXT_FILE_FIELD_DELIMITER);
+        }
+
+        if 
(!StringUtils.isBlank(pluginConfig.getString(HIVE_TXT_FILE_LINE_DELIMITER))) {
+            this.hiveTxtFileLineDelimiter = 
pluginConfig.getString(HIVE_TXT_FILE_LINE_DELIMITER);
+        }
+
+        // partition fields must in sink columns
+        if (!CollectionUtils.isEmpty(this.sinkColumns) && 
!CollectionUtils.isEmpty(this.partitionFieldNames) && 
!this.sinkColumns.containsAll(this.partitionFieldNames)) {
+            throw new RuntimeException("partition fields must in sink 
columns");
+        }
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
new file mode 100644
index 00000000..4f9f5d12
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class HiveSinkState implements Serializable {
+    private HiveSinkConfig hiveSinkConfig;
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
new file mode 100644
index 00000000..ad63a520
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.FileWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.HdfsTxtFileWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, 
HiveCommitInfo, HiveSinkState> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HiveSinkWriter.class);
+
+    private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+    private Config pluginConfig;
+    private SinkWriter.Context context;
+    private long jobId;
+
+    private FileWriter fileWriter;
+
+    private HiveSinkConfig hiveSinkConfig;
+
+    public HiveSinkWriter(@NonNull SeaTunnelRowTypeInfo seaTunnelRowTypeInfo,
+                          @NonNull Config pluginConfig,
+                          @NonNull SinkWriter.Context context,
+                          long jobId) {
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+        this.pluginConfig = pluginConfig;
+        this.context = context;
+        this.jobId = jobId;
+
+        hiveSinkConfig = new HiveSinkConfig(this.pluginConfig);
+        fileWriter = new HdfsTxtFileWriter(this.seaTunnelRowTypeInfo,
+            hiveSinkConfig,
+            this.jobId,
+            this.context.getIndexOfSubtask());
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        fileWriter.write(element);
+    }
+
+    @Override
+    public Optional<HiveCommitInfo> prepareCommit() throws IOException {
+        fileWriter.finishAndCloseWriteFile();
+        /**
+         * We will clear the needMoveFiles in {@link #snapshotState()}, So we 
need copy the needMoveFiles map here.
+         */
+        Map<String, String> commitInfoMap = new 
HashMap<>(fileWriter.getNeedMoveFiles().size());
+        commitInfoMap.putAll(fileWriter.getNeedMoveFiles());
+        return Optional.of(new HiveCommitInfo(commitInfoMap));
+    }
+
+    @Override
+    public void abort() {
+        fileWriter.abort();
+    }
+
+    @Override
+    public void close() throws IOException {
+        fileWriter.finishAndCloseWriteFile();
+    }
+
+    @Override
+    public List<HiveSinkState> snapshotState() throws IOException {
+        //reset FileWrite
+        fileWriter.resetFileWriter(System.currentTimeMillis() + "");
+        return Lists.newArrayList(new HiveSinkState(hiveSinkConfig));
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
new file mode 100644
index 00000000..ef01d1e6
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkConfig;
+
+import lombok.NonNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class AbstractFileWriter implements FileWriter {
+    protected Map<String, String> needMoveFiles;
+    protected SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+    protected long jobId;
+    protected int subTaskIndex;
+    protected HiveSinkConfig hiveSinkConfig;
+
+    private static final String SEATUNNEL = "seatunnel";
+    private static final String NON_PARTITION = "NON_PARTITION";
+
+    protected Map<String, String> beingWrittenFile;
+
+    protected String checkpointId;
+
+    public AbstractFileWriter(@NonNull SeaTunnelRowTypeInfo 
seaTunnelRowTypeInfo,
+                              @NonNull HiveSinkConfig hiveSinkConfig,
+                              long jobId,
+                              int subTaskIndex) {
+        checkArgument(jobId > 0);
+        checkArgument(subTaskIndex > -1);
+
+        this.needMoveFiles = new HashMap<>();
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+        this.jobId = jobId;
+        this.subTaskIndex = subTaskIndex;
+        this.hiveSinkConfig = hiveSinkConfig;
+
+        this.beingWrittenFile = new HashMap<>();
+    }
+
+    public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow 
seaTunnelRow) {
+        String beingWrittenFileKey = getBeingWrittenFileKey(seaTunnelRow);
+        // get filePath from beingWrittenFile
+        String beingWrittenFilePath = 
beingWrittenFile.get(beingWrittenFileKey);
+        if (beingWrittenFilePath != null) {
+            return beingWrittenFilePath;
+        } else {
+            StringBuilder sbf = new 
StringBuilder(hiveSinkConfig.getSinkTmpFsRootPath());
+            sbf.append("/")
+                .append(SEATUNNEL)
+                .append("/")
+                .append(jobId)
+                .append("/")
+                .append(checkpointId)
+                .append("/")
+                .append(hiveSinkConfig.getHiveTableName())
+                .append("/")
+                .append(beingWrittenFileKey)
+                .append("/")
+                .append(jobId)
+                .append("_")
+                .append(subTaskIndex)
+                .append(".")
+                .append(getFileSuffix());
+            String newBeingWrittenFilePath = sbf.toString();
+            beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
+            return newBeingWrittenFilePath;
+        }
+    }
+
+    private String getBeingWrittenFileKey(@NonNull SeaTunnelRow seaTunnelRow) {
+        if (this.hiveSinkConfig.getPartitionFieldNames() != null && 
this.hiveSinkConfig.getPartitionFieldNames().size() > 0) {
+            List<String> collect = 
this.hiveSinkConfig.getPartitionFieldNames().stream().map(partitionKey -> {
+                StringBuilder sbd = new StringBuilder(partitionKey);
+                
sbd.append("=").append(seaTunnelRow.getFieldMap().get(partitionKey));
+                return sbd.toString();
+            }).collect(Collectors.toList());
+
+            String beingWrittenFileKey = String.join("/", collect);
+            return beingWrittenFileKey;
+        } else {
+            // If there is no partition field in data, We use the fixed value 
NON_PARTITION as the partition directory
+            return NON_PARTITION;
+        }
+    }
+
+    /**
+     * FileWriter need return the file suffix. eg: tex, orc, parquet
+     *
+     * @return
+     */
+    @NonNull
+    public abstract String getFileSuffix();
+
+    public String getHiveLocation(@NonNull String seaTunnelFilePath) {
+        StringBuilder sbf = new 
StringBuilder(hiveSinkConfig.getSinkTmpFsRootPath());
+        sbf.append("/")
+            .append(SEATUNNEL)
+            .append("/")
+            .append(jobId)
+            .append("/")
+            .append(checkpointId)
+            .append("/")
+            .append(hiveSinkConfig.getHiveTableName());
+        String seaTunnelPath = sbf.toString();
+        String tmpPath = seaTunnelFilePath.replaceAll(seaTunnelPath, 
hiveSinkConfig.getHiveTableFsPath());
+        return tmpPath.replaceAll(NON_PARTITION + "/", "");
+    }
+
+    @Override
+    public void resetFileWriter(@NonNull String checkpointId) {
+        this.checkpointId = checkpointId;
+        this.needMoveFiles = new HashMap<>();
+        this.beingWrittenFile = new HashMap<>();
+        this.resetMoreFileWriter(checkpointId);
+    }
+
+    public abstract void resetMoreFileWriter(@NonNull String checkpointId);
+
+    @Override
+    public void abort() {
+        this.needMoveFiles = new HashMap<>();
+        this.beingWrittenFile = new HashMap<>();
+        this.abortMore();
+    }
+
+    public abstract void abortMore();
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
new file mode 100644
index 00000000..8ee8777a
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.NonNull;
+
+import java.util.Map;
+
+public interface FileWriter {
+
+    void write(@NonNull SeaTunnelRow seaTunnelRow);
+
+    @NonNull
+    Map<String, String> getNeedMoveFiles();
+
+    /**
+     * In this method we need finish write the file. The following operations 
are often required:
+     * 1. Flush memory to disk.
+     * 2. Close output stream.
+     * 3. Add the mapping relationship between seatunnel file path and hive 
file path to needMoveFiles.
+     */
+    void finishAndCloseWriteFile();
+
+    /**
+     * The writer needs to be reset after each checkpoint is completed
+     *
+     * @param checkpointId checkpointId
+     */
+    void resetFileWriter(@NonNull String checkpointId);
+
+    void abort();
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
new file mode 100644
index 00000000..4d370ff8
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkConfig;
+
+import lombok.Lombok;
+import lombok.NonNull;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HdfsTxtFileWriter extends AbstractFileWriter {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HdfsTxtFileWriter.class);
+    private Map<String, FSDataOutputStream> beingWrittenOutputStream;
+
+    public HdfsTxtFileWriter(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo,
+                             HiveSinkConfig hiveSinkConfig,
+                             long sinkId,
+                             int subTaskIndex) {
+        super(seaTunnelRowTypeInfo, hiveSinkConfig, sinkId, subTaskIndex);
+        beingWrittenOutputStream = new HashMap<>();
+    }
+
+    @Override
+    @NonNull
+    public String getFileSuffix() {
+        return "txt";
+    }
+
+    @Override
+    public void resetMoreFileWriter(@NonNull String checkpointId) {
+        this.beingWrittenOutputStream = new HashMap<>();
+    }
+
+    @Override
+    public void abortMore() {
+        // delete files
+        beingWrittenOutputStream.keySet().stream().forEach(file -> {
+            try {
+                boolean deleted = HdfsUtils.deleteFile(file);
+                if (!deleted) {
+                    LOGGER.error("delete file {} error", file);
+                    throw new IOException(String.format("delete file {} 
error", file));
+                }
+            } catch (IOException e) {
+                LOGGER.error("delete file {} error", file);
+                throw new RuntimeException(e);
+            }
+        });
+
+        this.beingWrittenOutputStream = new HashMap<>();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        Lombok.checkNotNull(seaTunnelRow, "seaTunnelRow is null");
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        FSDataOutputStream fsDataOutputStream = 
getOrCreateOutputStream(filePath);
+        String line = transformRowToLine(seaTunnelRow);
+        try {
+            fsDataOutputStream.write(line.getBytes());
+            
fsDataOutputStream.write(hiveSinkConfig.getHiveTxtFileLineDelimiter().getBytes());
+        } catch (IOException e) {
+            LOGGER.error("write data to file {} error", filePath);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @NonNull
+    @Override
+    public Map<String, String> getNeedMoveFiles() {
+        return this.needMoveFiles;
+    }
+
+    @Override
+    public void finishAndCloseWriteFile() {
+        beingWrittenOutputStream.entrySet().forEach(entry -> {
+            try {
+                entry.getValue().flush();
+            } catch (IOException e) {
+                LOGGER.error("error when flush file {}", entry.getKey());
+                throw new RuntimeException(e);
+            } finally {
+                try {
+                    entry.getValue().close();
+                } catch (IOException e) {
+                    LOGGER.error("error when close output stream {}", 
entry.getKey());
+                }
+            }
+
+            needMoveFiles.put(entry.getKey(), getHiveLocation(entry.getKey()));
+        });
+    }
+
+    private FSDataOutputStream getOrCreateOutputStream(@NonNull String 
filePath) {
+        FSDataOutputStream fsDataOutputStream = 
beingWrittenOutputStream.get(filePath);
+        if (fsDataOutputStream == null) {
+            try {
+                fsDataOutputStream = HdfsUtils.getOutputStream(filePath);
+                beingWrittenOutputStream.put(filePath, fsDataOutputStream);
+            } catch (IOException e) {
+                LOGGER.error("can not get output file stream");
+                throw new RuntimeException(e);
+            }
+        }
+        return fsDataOutputStream;
+    }
+
+    private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) {
+        String line = null;
+        List<String> sinkColumns = hiveSinkConfig.getSinkColumns();
+        if (sinkColumns == null || sinkColumns.size() == 0) {
+            line = Arrays.stream(seaTunnelRow.getFields())
+                .map(column -> column == null ? "" : column.toString())
+                
.collect(Collectors.joining(hiveSinkConfig.getHiveTxtFileFieldDelimiter()));
+        } else {
+            line = sinkColumns.stream().map(column -> {
+                String valueStr = "";
+                Object value = seaTunnelRow.getFieldMap().get(column);
+                if (value != null) {
+                    valueStr = value.toString();
+                }
+                return valueStr;
+            
}).collect(Collectors.joining(hiveSinkConfig.getHiveTxtFileFieldDelimiter()));
+        }
+        return line;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
new file mode 100644
index 00000000..23b1e584
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
+
+import lombok.NonNull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class HdfsUtils {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HdfsUtils.class);
+
+    public static final int WRITE_BUFFER_SIZE = 2048;
+
+    public static FileSystem getHdfsFs(@NonNull String path)
+        throws IOException {
+        Configuration conf = new Configuration();
+        conf.set("fs.hdfs.impl", 
"org.apache.hadoop.hdfs.DistributedFileSystem");
+        conf.set("fs.defaultFs", path);
+        return FileSystem.get(conf);
+    }
+
+    public static FSDataOutputStream getOutputStream(@NonNull String 
outFilePath) throws IOException {
+        FileSystem hdfsFs = getHdfsFs(outFilePath);
+        Path path = new Path(outFilePath);
+        FSDataOutputStream fsDataOutputStream = hdfsFs.create(path, true, 
WRITE_BUFFER_SIZE);
+        return fsDataOutputStream;
+    }
+
+    public static boolean deleteFile(@NonNull String file) throws IOException {
+        FileSystem hdfsFs = getHdfsFs(file);
+        return hdfsFs.delete(new Path(file), true);
+    }
+
+    /**
+     * rename file
+     *
+     * @param oldName     old file name
+     * @param newName     target file name
+     * @param rmWhenExist if this is true, we will delete the target file when 
it already exists
+     * @throws IOException throw IOException
+     */
+    public static void renameFile(@NonNull String oldName, @NonNull String 
newName, boolean rmWhenExist) throws IOException {
+        FileSystem hdfsFs = getHdfsFs(newName);
+        LOGGER.info("begin rename file oldName :[" + oldName + "] to newName 
:[" + newName + "]");
+
+        Path oldPath = new Path(oldName);
+        Path newPath = new Path(newName);
+        if (rmWhenExist) {
+            if (fileExist(newName) && fileExist(oldName)) {
+                hdfsFs.delete(newPath, true);
+            }
+        }
+        if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
+            createDir(newName.substring(0, newName.lastIndexOf("/")));
+        }
+        LOGGER.info("rename file :[" + oldPath + "] to [" + newPath + "] 
finish");
+
+        hdfsFs.rename(oldPath, newPath);
+    }
+
+    public static boolean createDir(@NonNull String filePath)
+        throws IOException {
+
+        FileSystem hdfsFs = getHdfsFs(filePath);
+        Path dfs = new Path(filePath);
+        return hdfsFs.mkdirs(dfs);
+    }
+
+    public static boolean fileExist(@NonNull String filePath)
+        throws IOException {
+        FileSystem hdfsFs = getHdfsFs(filePath);
+        Path fileName = new Path(filePath);
+        return hdfsFs.exists(fileName);
+    }
+}

Reply via email to