This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new f389cd79 [api-draft][connector] New hive sink connector (#1997)
f389cd79 is described below
commit f389cd79199500608e22f0207e2568f9bd57bd90
Author: Eric <[email protected]>
AuthorDate: Wed Jun 8 20:13:44 2022 +0800
[api-draft][connector] New hive sink connector (#1997)
* add checkpointId
* add some method in hdfsutils
* * Fix check style
* Add License header
* Remove meaningless's test
* * some code optimization
*Overflow PMD plugin because we already have Sonar
* * Fix some method error
* add checkstyle
* update hdfs dependency to flink shaded
* * Fix check style
Co-authored-by: CalvinKirs <[email protected]>
---
pom.xml | 49 ++-----
.../seatunnel-connectors-seatunnel/pom.xml | 1 +
.../seatunnel-connector-seatunnel-hive/pom.xml | 37 +++++
.../connectors/seatunnel/hive/config/Config.java | 21 +++
.../hive/sink/HiveAggregatedCommitInfo.java | 35 +++++
.../seatunnel/hive/sink/HiveCommitInfo.java | 36 +++++
.../connectors/seatunnel/hive/sink/HiveSink.java | 87 ++++++++++++
.../hive/sink/HiveSinkAggregatedCommitter.java | 88 ++++++++++++
.../seatunnel/hive/sink/HiveSinkConfig.java | 106 ++++++++++++++
.../seatunnel/hive/sink/HiveSinkState.java | 29 ++++
.../seatunnel/hive/sink/HiveSinkWriter.java | 99 +++++++++++++
.../hive/sink/file/writer/AbstractFileWriter.java | 150 ++++++++++++++++++++
.../hive/sink/file/writer/FileWriter.java | 49 +++++++
.../hive/sink/file/writer/HdfsTxtFileWriter.java | 153 +++++++++++++++++++++
.../seatunnel/hive/sink/file/writer/HdfsUtils.java | 96 +++++++++++++
15 files changed, 995 insertions(+), 41 deletions(-)
diff --git a/pom.xml b/pom.xml
index dc5298ca..a858f5b9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -609,6 +609,13 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2</artifactId>
+ <version>${flink-shaded-hadoop-2.version}</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>
@@ -853,42 +860,7 @@
</execution>
</executions>
</plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-pmd-plugin</artifactId>
- <version>${maven-pmd-plugin.version}</version>
- <configuration>
- <rulesets>
- <ruleset>rulesets/java/ali-concurrent.xml</ruleset>
- <ruleset>rulesets/java/ali-exception.xml</ruleset>
-
<ruleset>rulesets/java/ali-flowcontrol.xml</ruleset>
- <ruleset>rulesets/java/ali-naming.xml</ruleset>
- <ruleset>rulesets/java/ali-oop.xml</ruleset>
- <ruleset>rulesets/java/ali-orm.xml</ruleset>
- <ruleset>rulesets/java/ali-other.xml</ruleset>
- <ruleset>rulesets/java/ali-set.xml</ruleset>
- </rulesets>
- <printFailingErrors>true</printFailingErrors>
- <skip>${skip.pmd.check}</skip>
- </configuration>
- <executions>
- <execution>
- <id>validate</id>
- <phase>validate</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- <dependencies>
- <dependency>
- <groupId>com.alibaba.p3c</groupId>
- <artifactId>p3c-pmd</artifactId>
- <version>${p3c-pmd.version}</version>
- </dependency>
- </dependencies>
- </plugin>
+
<!-- checkstyle (End) -->
<plugin>
@@ -1008,11 +980,6 @@
<artifactId>scalastyle-maven-plugin</artifactId>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-pmd-plugin</artifactId>
- </plugin>
-
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>license-maven-plugin</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index e0770f9c..9254ba5f 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -31,6 +31,7 @@
<artifactId>seatunnel-connectors-seatunnel</artifactId>
<modules>
+ <module>seatunnel-connector-seatunnel-hive</module>
<module>seatunnel-connector-seatunnel-console</module>
<module>seatunnel-connector-seatunnel-fake</module>
<module>seatunnel-connector-seatunnel-kafka</module>
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/pom.xml
new file mode 100644
index 00000000..2de900d1
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-seatunnel</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-connectors-seatunnel-hive</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
new file mode 100644
index 00000000..2177ffbd
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
+
+public class Config {
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
new file mode 100644
index 00000000..f1eee53d
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class HiveAggregatedCommitInfo {
+
+ /**
+ * Storage the commit info in map.
+ * K is the file path need to be moved to hive data dir.
+ * V is the target file path of the data file.
+ */
+ private Map<String, String> needMoveFiles;
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
new file mode 100644
index 00000000..0dd58f8f
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class HiveCommitInfo implements Serializable {
+
+ /**
+ * Storage the commit info in map.
+ * K is the file path need to be moved to hive data dir.
+ * V is the target file path of the data file.
+ */
+ private Map<String, String> needMoveFiles;
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
new file mode 100644
index 00000000..ac0a2b68
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Hive Sink implementation by using SeaTunnel sink API.
+ * This class contains the method to create {@link HiveSinkWriter} and {@link
HiveSinkAggregatedCommitter}.
+ */
+@AutoService(SeaTunnelSink.class)
+public class HiveSink implements SeaTunnelSink<SeaTunnelRow, HiveSinkState,
HiveCommitInfo, HiveAggregatedCommitInfo> {
+
+ private Config config;
+ private long jobId;
+ private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+
+ @Override
+ public String getPluginName() {
+ return "Hive";
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ this.config = pluginConfig;
+ this.jobId = System.currentTimeMillis();
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState>
createWriter(SinkWriter.Context context) throws IOException {
+ return new HiveSinkWriter(seaTunnelRowTypeInfo, config, context,
System.currentTimeMillis());
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState>
restoreWriter(SinkWriter.Context context, List<HiveSinkState> states) throws
IOException {
+ return new HiveSinkWriter(seaTunnelRowTypeInfo, config, context,
System.currentTimeMillis());
+ }
+
+ @Override
+ public SeaTunnelContext getSeaTunnelContext() {
+ return null;
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+
+ }
+
+ @Override
+ public Optional<SinkAggregatedCommitter<HiveCommitInfo,
HiveAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
+ return Optional.of(new HiveSinkAggregatedCommitter());
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
new file mode 100644
index 00000000..673923a5
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.HdfsUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HiveSinkAggregatedCommitter implements
SinkAggregatedCommitter<HiveCommitInfo, HiveAggregatedCommitInfo> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HiveSinkAggregatedCommitter.class);
+
+ @Override
+ public List<HiveAggregatedCommitInfo>
commit(List<HiveAggregatedCommitInfo> aggregatedCommitInfoList) throws
IOException {
+ if (aggregatedCommitInfoList == null ||
aggregatedCommitInfoList.size() == 0) {
+ return null;
+ }
+ List errorAggregatedCommitInfoList = new ArrayList();
+ aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
+ try {
+ Map<String, String> needMoveFiles =
aggregateCommitInfo.getNeedMoveFiles();
+ for (Map.Entry<String, String> entry :
needMoveFiles.entrySet()) {
+ HdfsUtils.renameFile(entry.getKey(), entry.getValue(),
true);
+ }
+ } catch (IOException e) {
+ LOGGER.error("commit aggregateCommitInfo error ", e);
+ errorAggregatedCommitInfoList.add(aggregateCommitInfo);
+ }
+ });
+
+ return errorAggregatedCommitInfoList;
+ }
+
+ @Override
+ public HiveAggregatedCommitInfo combine(List<HiveCommitInfo> commitInfos) {
+ if (commitInfos == null || commitInfos.size() == 0) {
+ return null;
+ }
+ Map<String, String> aggregateCommitInfo = new HashMap<>();
+ commitInfos.stream().forEach(commitInfo -> {
+ aggregateCommitInfo.putAll(commitInfo.getNeedMoveFiles());
+ });
+ return new HiveAggregatedCommitInfo(aggregateCommitInfo);
+ }
+
+ @Override
+ public void abort(List<HiveAggregatedCommitInfo> aggregatedCommitInfoList)
throws Exception {
+ if (aggregatedCommitInfoList == null ||
aggregatedCommitInfoList.size() == 0) {
+ return;
+ }
+ aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
+ try {
+ Map<String, String> needMoveFiles =
aggregateCommitInfo.getNeedMoveFiles();
+ for (Map.Entry<String, String> entry :
needMoveFiles.entrySet()) {
+ HdfsUtils.renameFile(entry.getValue(), entry.getKey(),
true);
+ }
+ } catch (IOException e) {
+ LOGGER.error("abort aggregateCommitInfo error ", e);
+ }
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
new file mode 100644
index 00000000..2e08862f
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+import lombok.NonNull;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+
+@Data
+public class HiveSinkConfig {
+
+ private static final String HIVE_SAVE_MODE = "save_mode";
+
+ private static final String HIVE_SINK_COLUMNS = "sink_columns";
+
+ private static final String HIVE_PARTITION_BY = "partition_by";
+
+ private static final String HIVE_RESULT_TABLE_NAME = "result_table_name";
+
+ private static final String SINK_TMP_FS_ROOT_PATH =
"sink_tmp_fs_root_path";
+
+ private static final String HIVE_TABLE_FS_PATH = "hive_table_fs_path";
+
+ private static final String HIVE_TXT_FILE_FIELD_DELIMITER =
"hive_txt_file_field_delimiter";
+
+ private static final String HIVE_TXT_FILE_LINE_DELIMITER =
"hive_txt_file_line_delimiter";
+
+ private SaveMode saveMode = SaveMode.APPEND;
+
+ private String sinkTmpFsRootPath = "/tmp/seatunnel";
+
+ private List<String> partitionFieldNames;
+
+ private String hiveTableName;
+
+ private List<String> sinkColumns;
+
+ private String hiveTableFsPath;
+
+ private String hiveTxtFileFieldDelimiter = String.valueOf('\001');
+
+ private String hiveTxtFileLineDelimiter = "\n";
+
+ public enum SaveMode {
+ APPEND(),
+ OVERWRITE();
+
+ public static SaveMode fromStr(String str) {
+ if ("overwrite".equals(str)) {
+ return OVERWRITE;
+ } else {
+ return APPEND;
+ }
+ }
+ }
+
+ public HiveSinkConfig(@NonNull Config pluginConfig) {
+ checkNotNull(pluginConfig.getString(HIVE_RESULT_TABLE_NAME));
+ checkNotNull(pluginConfig.getString(HIVE_TABLE_FS_PATH));
+ this.hiveTableName = pluginConfig.getString(HIVE_RESULT_TABLE_NAME);
+ this.hiveTableFsPath = pluginConfig.getString(HIVE_TABLE_FS_PATH);
+
+ this.saveMode =
StringUtils.isBlank(pluginConfig.getString(HIVE_SAVE_MODE)) ? SaveMode.APPEND :
SaveMode.fromStr(pluginConfig.getString(HIVE_SAVE_MODE));
+ if
(!StringUtils.isBlank(pluginConfig.getString(SINK_TMP_FS_ROOT_PATH))) {
+ this.sinkTmpFsRootPath =
pluginConfig.getString(SINK_TMP_FS_ROOT_PATH);
+ }
+
+ this.partitionFieldNames =
pluginConfig.getStringList(HIVE_PARTITION_BY);
+ this.sinkColumns = pluginConfig.getStringList(HIVE_SINK_COLUMNS);
+
+ if
(!StringUtils.isBlank(pluginConfig.getString(HIVE_TXT_FILE_FIELD_DELIMITER))) {
+ this.hiveTxtFileFieldDelimiter =
pluginConfig.getString(HIVE_TXT_FILE_FIELD_DELIMITER);
+ }
+
+ if
(!StringUtils.isBlank(pluginConfig.getString(HIVE_TXT_FILE_LINE_DELIMITER))) {
+ this.hiveTxtFileLineDelimiter =
pluginConfig.getString(HIVE_TXT_FILE_LINE_DELIMITER);
+ }
+
+ // partition fields must in sink columns
+ if (!CollectionUtils.isEmpty(this.sinkColumns) &&
!CollectionUtils.isEmpty(this.partitionFieldNames) &&
!this.sinkColumns.containsAll(this.partitionFieldNames)) {
+ throw new RuntimeException("partition fields must in sink
columns");
+ }
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
new file mode 100644
index 00000000..4f9f5d12
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class HiveSinkState implements Serializable {
+ private HiveSinkConfig hiveSinkConfig;
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
new file mode 100644
index 00000000..ad63a520
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.FileWriter;
+import
org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer.HdfsTxtFileWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class HiveSinkWriter implements SinkWriter<SeaTunnelRow,
HiveCommitInfo, HiveSinkState> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HiveSinkWriter.class);
+
+ private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+ private Config pluginConfig;
+ private SinkWriter.Context context;
+ private long jobId;
+
+ private FileWriter fileWriter;
+
+ private HiveSinkConfig hiveSinkConfig;
+
+ public HiveSinkWriter(@NonNull SeaTunnelRowTypeInfo seaTunnelRowTypeInfo,
+ @NonNull Config pluginConfig,
+ @NonNull SinkWriter.Context context,
+ long jobId) {
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ this.pluginConfig = pluginConfig;
+ this.context = context;
+ this.jobId = jobId;
+
+ hiveSinkConfig = new HiveSinkConfig(this.pluginConfig);
+ fileWriter = new HdfsTxtFileWriter(this.seaTunnelRowTypeInfo,
+ hiveSinkConfig,
+ this.jobId,
+ this.context.getIndexOfSubtask());
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ fileWriter.write(element);
+ }
+
+ @Override
+ public Optional<HiveCommitInfo> prepareCommit() throws IOException {
+ fileWriter.finishAndCloseWriteFile();
+ /**
+ * We will clear the needMoveFiles in {@link #snapshotState()}, So we
need copy the needMoveFiles map here.
+ */
+ Map<String, String> commitInfoMap = new
HashMap<>(fileWriter.getNeedMoveFiles().size());
+ commitInfoMap.putAll(fileWriter.getNeedMoveFiles());
+ return Optional.of(new HiveCommitInfo(commitInfoMap));
+ }
+
+ @Override
+ public void abort() {
+ fileWriter.abort();
+ }
+
+ @Override
+ public void close() throws IOException {
+ fileWriter.finishAndCloseWriteFile();
+ }
+
+ @Override
+ public List<HiveSinkState> snapshotState() throws IOException {
+ //reset FileWrite
+ fileWriter.resetFileWriter(System.currentTimeMillis() + "");
+ return Lists.newArrayList(new HiveSinkState(hiveSinkConfig));
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
new file mode 100644
index 00000000..ef01d1e6
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/AbstractFileWriter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkConfig;
+
+import lombok.NonNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class AbstractFileWriter implements FileWriter {
+ protected Map<String, String> needMoveFiles;
+ protected SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+ protected long jobId;
+ protected int subTaskIndex;
+ protected HiveSinkConfig hiveSinkConfig;
+
+ private static final String SEATUNNEL = "seatunnel";
+ private static final String NON_PARTITION = "NON_PARTITION";
+
+ protected Map<String, String> beingWrittenFile;
+
+ protected String checkpointId;
+
+ public AbstractFileWriter(@NonNull SeaTunnelRowTypeInfo
seaTunnelRowTypeInfo,
+ @NonNull HiveSinkConfig hiveSinkConfig,
+ long jobId,
+ int subTaskIndex) {
+ checkArgument(jobId > 0);
+ checkArgument(subTaskIndex > -1);
+
+ this.needMoveFiles = new HashMap<>();
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ this.jobId = jobId;
+ this.subTaskIndex = subTaskIndex;
+ this.hiveSinkConfig = hiveSinkConfig;
+
+ this.beingWrittenFile = new HashMap<>();
+ }
+
+ public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow
seaTunnelRow) {
+ String beingWrittenFileKey = getBeingWrittenFileKey(seaTunnelRow);
+ // get filePath from beingWrittenFile
+ String beingWrittenFilePath =
beingWrittenFile.get(beingWrittenFileKey);
+ if (beingWrittenFilePath != null) {
+ return beingWrittenFilePath;
+ } else {
+ StringBuilder sbf = new
StringBuilder(hiveSinkConfig.getSinkTmpFsRootPath());
+ sbf.append("/")
+ .append(SEATUNNEL)
+ .append("/")
+ .append(jobId)
+ .append("/")
+ .append(checkpointId)
+ .append("/")
+ .append(hiveSinkConfig.getHiveTableName())
+ .append("/")
+ .append(beingWrittenFileKey)
+ .append("/")
+ .append(jobId)
+ .append("_")
+ .append(subTaskIndex)
+ .append(".")
+ .append(getFileSuffix());
+ String newBeingWrittenFilePath = sbf.toString();
+ beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
+ return newBeingWrittenFilePath;
+ }
+ }
+
+ private String getBeingWrittenFileKey(@NonNull SeaTunnelRow seaTunnelRow) {
+ if (this.hiveSinkConfig.getPartitionFieldNames() != null &&
this.hiveSinkConfig.getPartitionFieldNames().size() > 0) {
+ List<String> collect =
this.hiveSinkConfig.getPartitionFieldNames().stream().map(partitionKey -> {
+ StringBuilder sbd = new StringBuilder(partitionKey);
+
sbd.append("=").append(seaTunnelRow.getFieldMap().get(partitionKey));
+ return sbd.toString();
+ }).collect(Collectors.toList());
+
+ String beingWrittenFileKey = String.join("/", collect);
+ return beingWrittenFileKey;
+ } else {
+ // If there is no partition field in data, We use the fixed value
NON_PARTITION as the partition directory
+ return NON_PARTITION;
+ }
+ }
+
+ /**
+ * FileWriter need return the file suffix. eg: tex, orc, parquet
+ *
+ * @return
+ */
+ @NonNull
+ public abstract String getFileSuffix();
+
+ public String getHiveLocation(@NonNull String seaTunnelFilePath) {
+ StringBuilder sbf = new
StringBuilder(hiveSinkConfig.getSinkTmpFsRootPath());
+ sbf.append("/")
+ .append(SEATUNNEL)
+ .append("/")
+ .append(jobId)
+ .append("/")
+ .append(checkpointId)
+ .append("/")
+ .append(hiveSinkConfig.getHiveTableName());
+ String seaTunnelPath = sbf.toString();
+ String tmpPath = seaTunnelFilePath.replaceAll(seaTunnelPath,
hiveSinkConfig.getHiveTableFsPath());
+ return tmpPath.replaceAll(NON_PARTITION + "/", "");
+ }
+
+ @Override
+ public void resetFileWriter(@NonNull String checkpointId) {
+ this.checkpointId = checkpointId;
+ this.needMoveFiles = new HashMap<>();
+ this.beingWrittenFile = new HashMap<>();
+ this.resetMoreFileWriter(checkpointId);
+ }
+
+ public abstract void resetMoreFileWriter(@NonNull String checkpointId);
+
+ @Override
+ public void abort() {
+ this.needMoveFiles = new HashMap<>();
+ this.beingWrittenFile = new HashMap<>();
+ this.abortMore();
+ }
+
+ public abstract void abortMore();
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
new file mode 100644
index 00000000..8ee8777a
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/FileWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.NonNull;
+
+import java.util.Map;
+
+public interface FileWriter {
+
+ void write(@NonNull SeaTunnelRow seaTunnelRow);
+
+ @NonNull
+ Map<String, String> getNeedMoveFiles();
+
+ /**
+ * In this method we need finish write the file. The following operations
are often required:
+ * 1. Flush memory to disk.
+ * 2. Close output stream.
+ * 3. Add the mapping relationship between seatunnel file path and hive
file path to needMoveFiles.
+ */
+ void finishAndCloseWriteFile();
+
+ /**
+ * The writer needs to be reset after each checkpoint is completed
+ *
+ * @param checkpointId checkpointId
+ */
+ void resetFileWriter(@NonNull String checkpointId);
+
+ void abort();
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
new file mode 100644
index 00000000..4d370ff8
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsTxtFileWriter.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkConfig;
+
+import lombok.Lombok;
+import lombok.NonNull;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HdfsTxtFileWriter extends AbstractFileWriter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HdfsTxtFileWriter.class);
+ private Map<String, FSDataOutputStream> beingWrittenOutputStream;
+
+ public HdfsTxtFileWriter(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo,
+ HiveSinkConfig hiveSinkConfig,
+ long sinkId,
+ int subTaskIndex) {
+ super(seaTunnelRowTypeInfo, hiveSinkConfig, sinkId, subTaskIndex);
+ beingWrittenOutputStream = new HashMap<>();
+ }
+
+ @Override
+ @NonNull
+ public String getFileSuffix() {
+ return "txt";
+ }
+
+ @Override
+ public void resetMoreFileWriter(@NonNull String checkpointId) {
+ this.beingWrittenOutputStream = new HashMap<>();
+ }
+
+ @Override
+ public void abortMore() {
+ // delete files
+ beingWrittenOutputStream.keySet().stream().forEach(file -> {
+ try {
+ boolean deleted = HdfsUtils.deleteFile(file);
+ if (!deleted) {
+ LOGGER.error("delete file {} error", file);
+ throw new IOException(String.format("delete file {}
error", file));
+ }
+ } catch (IOException e) {
+ LOGGER.error("delete file {} error", file);
+ throw new RuntimeException(e);
+ }
+ });
+
+ this.beingWrittenOutputStream = new HashMap<>();
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ Lombok.checkNotNull(seaTunnelRow, "seaTunnelRow is null");
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ FSDataOutputStream fsDataOutputStream =
getOrCreateOutputStream(filePath);
+ String line = transformRowToLine(seaTunnelRow);
+ try {
+ fsDataOutputStream.write(line.getBytes());
+
fsDataOutputStream.write(hiveSinkConfig.getHiveTxtFileLineDelimiter().getBytes());
+ } catch (IOException e) {
+ LOGGER.error("write data to file {} error", filePath);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @NonNull
+ @Override
+ public Map<String, String> getNeedMoveFiles() {
+ return this.needMoveFiles;
+ }
+
+ @Override
+ public void finishAndCloseWriteFile() {
+ beingWrittenOutputStream.entrySet().forEach(entry -> {
+ try {
+ entry.getValue().flush();
+ } catch (IOException e) {
+ LOGGER.error("error when flush file {}", entry.getKey());
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ entry.getValue().close();
+ } catch (IOException e) {
+ LOGGER.error("error when close output stream {}",
entry.getKey());
+ }
+ }
+
+ needMoveFiles.put(entry.getKey(), getHiveLocation(entry.getKey()));
+ });
+ }
+
+ private FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
+ FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
+ if (fsDataOutputStream == null) {
+ try {
+ fsDataOutputStream = HdfsUtils.getOutputStream(filePath);
+ beingWrittenOutputStream.put(filePath, fsDataOutputStream);
+ } catch (IOException e) {
+ LOGGER.error("can not get output file stream");
+ throw new RuntimeException(e);
+ }
+ }
+ return fsDataOutputStream;
+ }
+
+ private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) {
+ String line = null;
+ List<String> sinkColumns = hiveSinkConfig.getSinkColumns();
+ if (sinkColumns == null || sinkColumns.size() == 0) {
+ line = Arrays.stream(seaTunnelRow.getFields())
+ .map(column -> column == null ? "" : column.toString())
+
.collect(Collectors.joining(hiveSinkConfig.getHiveTxtFileFieldDelimiter()));
+ } else {
+ line = sinkColumns.stream().map(column -> {
+ String valueStr = "";
+ Object value = seaTunnelRow.getFieldMap().get(column);
+ if (value != null) {
+ valueStr = value.toString();
+ }
+ return valueStr;
+
}).collect(Collectors.joining(hiveSinkConfig.getHiveTxtFileFieldDelimiter()));
+ }
+ return line;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
new file mode 100644
index 00000000..23b1e584
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/file/writer/HdfsUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink.file.writer;
+
+import lombok.NonNull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class HdfsUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HdfsUtils.class);
+
+ public static final int WRITE_BUFFER_SIZE = 2048;
+
+ public static FileSystem getHdfsFs(@NonNull String path)
+ throws IOException {
+ Configuration conf = new Configuration();
+ conf.set("fs.hdfs.impl",
"org.apache.hadoop.hdfs.DistributedFileSystem");
+ conf.set("fs.defaultFs", path);
+ return FileSystem.get(conf);
+ }
+
+ public static FSDataOutputStream getOutputStream(@NonNull String
outFilePath) throws IOException {
+ FileSystem hdfsFs = getHdfsFs(outFilePath);
+ Path path = new Path(outFilePath);
+ FSDataOutputStream fsDataOutputStream = hdfsFs.create(path, true,
WRITE_BUFFER_SIZE);
+ return fsDataOutputStream;
+ }
+
+ public static boolean deleteFile(@NonNull String file) throws IOException {
+ FileSystem hdfsFs = getHdfsFs(file);
+ return hdfsFs.delete(new Path(file), true);
+ }
+
+ /**
+ * rename file
+ *
+ * @param oldName old file name
+ * @param newName target file name
+ * @param rmWhenExist if this is true, we will delete the target file when
it already exists
+ * @throws IOException throw IOException
+ */
+ public static void renameFile(@NonNull String oldName, @NonNull String
newName, boolean rmWhenExist) throws IOException {
+ FileSystem hdfsFs = getHdfsFs(newName);
+ LOGGER.info("begin rename file oldName :[" + oldName + "] to newName
:[" + newName + "]");
+
+ Path oldPath = new Path(oldName);
+ Path newPath = new Path(newName);
+ if (rmWhenExist) {
+ if (fileExist(newName) && fileExist(oldName)) {
+ hdfsFs.delete(newPath, true);
+ }
+ }
+ if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
+ createDir(newName.substring(0, newName.lastIndexOf("/")));
+ }
+ LOGGER.info("rename file :[" + oldPath + "] to [" + newPath + "]
finish");
+
+ hdfsFs.rename(oldPath, newPath);
+ }
+
+ public static boolean createDir(@NonNull String filePath)
+ throws IOException {
+
+ FileSystem hdfsFs = getHdfsFs(filePath);
+ Path dfs = new Path(filePath);
+ return hdfsFs.mkdirs(dfs);
+ }
+
+ public static boolean fileExist(@NonNull String filePath)
+ throws IOException {
+ FileSystem hdfsFs = getHdfsFs(filePath);
+ Path fileName = new Path(filePath);
+ return hdfsFs.exists(fileName);
+ }
+}