This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 631509293 [Improve][Connector-V2] Refactor the structure of file sink 
to reduce redundant codes (#2555)
631509293 is described below

commit 631509293037f3c6d7ef5ce6df53070f28e8d78f
Author: TyrantLucifer <[email protected]>
AuthorDate: Wed Aug 31 09:15:17 2022 +0800

    [Improve][Connector-V2] Refactor the structure of file sink to reduce 
redundant codes (#2555)
---
 .../seatunnel/file/config/FileFormat.java          |  36 +++
 .../seatunnel/file/sink/BaseFileSink.java          | 120 +++++++++
 .../seatunnel/file/sink/BaseFileSinkWriter.java    |  84 ++++++
 .../sink/commit/FileAggregatedCommitInfo2.java     |  43 +++
 .../file/sink/commit/FileCommitInfo2.java          |  48 ++++
 .../sink/commit/FileSinkAggregatedCommitter2.java  | 117 ++++++++
 .../file/sink/commit/FileSinkCommitter2.java       |  65 +++++
 .../file/sink/config/TextFileSinkConfig.java       |   7 +-
 .../seatunnel/file/sink/state/FileSinkState2.java  |  30 +++
 .../seatunnel/file/sink/util/FileSystemUtils.java  | 129 +++++++++
 .../file/sink/writer/AbstractWriteStrategy.java    | 295 +++++++++++++++++++++
 .../file/sink/writer/JsonWriteStrategy.java        |  98 +++++++
 .../file/sink/writer/OrcWriteStrategy.java         | 210 +++++++++++++++
 .../file/sink/writer/ParquetWriteStrategy.java     | 143 ++++++++++
 .../file/sink/writer/TextWriteStrategy.java        |  94 +++++++
 .../seatunnel/file/sink/writer/Transaction.java    |  64 +++++
 .../seatunnel/file/sink/writer/WriteStrategy.java  |  75 ++++++
 .../file/sink/writer/WriteStrategyFactory.java     |  43 +++
 18 files changed, 1698 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index a356ed0c6..4cddc2f4c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -17,6 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.JsonWriteStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.TextWriteStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
@@ -27,30 +33,56 @@ import java.io.Serializable;
 
 public enum FileFormat implements Serializable {
     CSV("csv") {
+        @Override
+        public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
+            textFileSinkConfig.setFieldDelimiter(",");
+            return new TextWriteStrategy(textFileSinkConfig);
+        }
+
         @Override
         public ReadStrategy getReadStrategy() {
             return new TextReadStrategy();
         }
     },
     TEXT("txt") {
+        @Override
+        public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
+            return new TextWriteStrategy(textFileSinkConfig);
+        }
+
         @Override
         public ReadStrategy getReadStrategy() {
             return new TextReadStrategy();
         }
     },
     PARQUET("parquet") {
+        @Override
+        public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
+            return new ParquetWriteStrategy(textFileSinkConfig);
+        }
+
         @Override
         public ReadStrategy getReadStrategy() {
             return new ParquetReadStrategy();
         }
     },
     ORC("orc") {
+        @Override
+        public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
+            return new OrcWriteStrategy(textFileSinkConfig);
+        }
+
         @Override
         public ReadStrategy getReadStrategy() {
             return new OrcReadStrategy();
         }
     },
     JSON("json") {
+        @Override
+        public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
+            return new JsonWriteStrategy(textFileSinkConfig);
+        }
+
         @Override
         public ReadStrategy getReadStrategy() {
             return new JsonReadStrategy();
@@ -70,4 +102,8 @@ public enum FileFormat implements Serializable {
     public ReadStrategy getReadStrategy() {
         return null;
     }
+
+    public WriteStrategy getWriteStrategy(TextFileSinkConfig 
textFileSinkConfig) {
+        return null;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
new file mode 100644
index 000000000..0faa8d77e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkCommitter2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public abstract class BaseFileSink implements SeaTunnelSink<SeaTunnelRow, 
FileSinkState2, FileCommitInfo2, FileAggregatedCommitInfo2> {
+    protected SeaTunnelRowType seaTunnelRowType;
+    protected Config pluginConfig;
+    protected HadoopConf hadoopConf;
+    protected TextFileSinkConfig textFileSinkConfig;
+    protected WriteStrategy writeStrategy;
+    protected SeaTunnelContext seaTunnelContext;
+    protected String jobId;
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+        this.jobId = seaTunnelContext.getJobId();
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.textFileSinkConfig = new TextFileSinkConfig(pluginConfig, 
seaTunnelRowType);
+        this.writeStrategy = 
WriteStrategyFactory.of(textFileSinkConfig.getFileFormat(), textFileSinkConfig);
+        this.writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return seaTunnelRowType;
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, FileCommitInfo2, FileSinkState2> 
restoreWriter(SinkWriter.Context context, List<FileSinkState2> states) throws 
IOException {
+        return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId, states);
+    }
+
+    @Override
+    public Optional<SinkCommitter<FileCommitInfo2>> createCommitter() throws 
IOException {
+        return Optional.of(new FileSinkCommitter2());
+    }
+
+    @Override
+    public Optional<SinkAggregatedCommitter<FileCommitInfo2, 
FileAggregatedCommitInfo2>> createAggregatedCommitter() throws IOException {
+        return Optional.of(new FileSinkAggregatedCommitter2());
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, FileCommitInfo2, FileSinkState2> 
createWriter(SinkWriter.Context context) throws IOException {
+        return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, 
jobId);
+    }
+
+    @Override
+    public Optional<Serializer<FileCommitInfo2>> getCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<Serializer<FileAggregatedCommitInfo2>> 
getAggregatedCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<Serializer<FileSinkState2>> getWriterStateSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    /**
+     * Use the pluginConfig to do some initialize operation.
+     *
+     * @param pluginConfig plugin config.
+     * @throws PrepareFailException if plugin prepare failed, the {@link 
PrepareFailException} will throw.
+     */
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        this.pluginConfig = pluginConfig;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
new file mode 100644
index 000000000..d02411d77
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, 
FileCommitInfo2, FileSinkState2> {
+    private final WriteStrategy writeStrategy;
+    private final HadoopConf hadoopConf;
+    private final SinkWriter.Context context;
+    private final int subTaskIndex;
+    private final String jobId;
+
+    public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf 
hadoopConf, SinkWriter.Context context, String jobId, List<FileSinkState2> 
fileSinkStates) {
+        this.writeStrategy = writeStrategy;
+        this.context = context;
+        this.hadoopConf = hadoopConf;
+        this.jobId = jobId;
+        this.subTaskIndex = context.getIndexOfSubtask();
+        writeStrategy.init(hadoopConf, jobId, subTaskIndex);
+        if (!fileSinkStates.isEmpty()) {
+            List<String> transactionIds = 
writeStrategy.getTransactionIdFromStates(fileSinkStates);
+            transactionIds.forEach(writeStrategy::abortPrepare);
+            
writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId());
+        }
+    }
+
+    public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf 
hadoopConf, SinkWriter.Context context, String jobId) {
+        this(writeStrategy, hadoopConf, context, jobId, 
Collections.emptyList());
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        try {
+            writeStrategy.write(element);
+        } catch (Exception e) {
+            throw new RuntimeException("Write data error, please check", e);
+        }
+    }
+
+    @Override
+    public Optional<FileCommitInfo2> prepareCommit() throws IOException {
+        return writeStrategy.prepareCommit();
+    }
+
+    @Override
+    public void abortPrepare() {
+        writeStrategy.abortPrepare();
+    }
+
+    @Override
+    public List<FileSinkState2> snapshotState(long checkpointId) throws 
IOException {
+        return writeStrategy.snapshotState(checkpointId);
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo2.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo2.java
new file mode 100644
index 000000000..dcf4dde42
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo2.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class FileAggregatedCommitInfo2 implements Serializable {
+    /**
+     * Storage the commit info in map.
+     * <p>K is the file path need to be moved to target dir.</p>
+     * <p>V is the target file path of the data file.</p>
+     */
+    private final Map<String, Map<String, String>> transactionMap;
+
+    /**
+     * Storage the partition information in map.
+     * <p>K is the partition column's name.</p>
+     * <p>V is the list of partition column's values.</p>
+     */
+    private final Map<String, List<String>> partitionDirAndValuesMap;
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo2.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo2.java
new file mode 100644
index 000000000..bb7586e37
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo2.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class FileCommitInfo2 implements Serializable {
+    /**
+     * Storage the commit info in map.
+     * <p>K is the file path need to be moved to target dir.</p>
+     * <p>V is the target file path of the data file.</p>
+     */
+    private final Map<String, String> needMoveFiles;
+
+    /**
+     * Storage the partition information in map.
+     * <p>K is the partition column's name.</p>
+     * <p>V is the list of partition column's values.</p>
+     */
+    private final Map<String, List<String>> partitionDirAndValuesMap;
+
+    /**
+     * Storage the transaction directory
+     */
+    private final String transactionDir;
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter2.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter2.java
new file mode 100644
index 000000000..1366a132e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter2.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FileSinkAggregatedCommitter2 implements 
SinkAggregatedCommitter<FileCommitInfo2, FileAggregatedCommitInfo2> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSinkAggregatedCommitter2.class);
+
+    @Override
+    public List<FileAggregatedCommitInfo2> 
commit(List<FileAggregatedCommitInfo2> aggregatedCommitInfos) throws 
IOException {
+        List<FileAggregatedCommitInfo2> errorAggregatedCommitInfoList = new 
ArrayList<>();
+        aggregatedCommitInfos.forEach(aggregatedCommitInfo -> {
+            try {
+                for (Map.Entry<String, Map<String, String>> entry : 
aggregatedCommitInfo.getTransactionMap().entrySet()) {
+                    for (Map.Entry<String, String> mvFileEntry : 
entry.getValue().entrySet()) {
+                        // first rename temp file
+                        FileSystemUtils.renameFile(mvFileEntry.getKey(), 
mvFileEntry.getValue(), true);
+                    }
+                    // second delete transaction directory
+                    FileSystemUtils.deleteFile(entry.getKey());
+                }
+            } catch (Exception e) {
+                LOGGER.error("commit aggregatedCommitInfo error ", e);
+                errorAggregatedCommitInfoList.add(aggregatedCommitInfo);
+            }
+        });
+        return errorAggregatedCommitInfoList;
+    }
+
+    /**
+     * The logic about how to combine commit message.
+     *
+     * @param commitInfos The list of commit message.
+     * @return The commit message after combine.
+     */
+    @Override
+    public FileAggregatedCommitInfo2 combine(List<FileCommitInfo2> 
commitInfos) {
+        if (commitInfos == null || commitInfos.size() == 0) {
+            return null;
+        }
+        Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
+        Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>();
+        commitInfos.forEach(commitInfo -> {
+            Map<String, String> needMoveFileMap = 
aggregateCommitInfo.computeIfAbsent(commitInfo.getTransactionDir(), k -> new 
HashMap<>());
+            needMoveFileMap.putAll(commitInfo.getNeedMoveFiles());
+            if (commitInfo.getPartitionDirAndValuesMap() != null && 
!commitInfo.getPartitionDirAndValuesMap().isEmpty()) {
+                
partitionDirAndValuesMap.putAll(commitInfo.getPartitionDirAndValuesMap());
+            }
+        });
+        return new FileAggregatedCommitInfo2(aggregateCommitInfo, 
partitionDirAndValuesMap);
+    }
+
+    /**
+     * If {@link #commit(List)} failed, this method will be called (**Only** 
on Spark engine at now).
+     *
+     * @param aggregatedCommitInfos The list of combine commit message.
+     * @throws Exception throw Exception when abort failed.
+     */
+    @Override
+    public void abort(List<FileAggregatedCommitInfo2> aggregatedCommitInfos) 
throws Exception {
+        if (aggregatedCommitInfos == null || aggregatedCommitInfos.size() == 
0) {
+            return;
+        }
+        aggregatedCommitInfos.forEach(aggregatedCommitInfo -> {
+            try {
+                for (Map.Entry<String, Map<String, String>> entry : 
aggregatedCommitInfo.getTransactionMap().entrySet()) {
+                    // rollback the file
+                    for (Map.Entry<String, String> mvFileEntry : 
entry.getValue().entrySet()) {
+                        if (FileSystemUtils.fileExist(mvFileEntry.getValue()) 
&& !FileSystemUtils.fileExist(mvFileEntry.getKey())) {
+                            FileSystemUtils.renameFile(mvFileEntry.getValue(), 
mvFileEntry.getKey(), true);
+                        }
+                    }
+                    // delete the transaction dir
+                    FileSystemUtils.deleteFile(entry.getKey());
+                }
+            } catch (Exception e) {
+                LOGGER.error("abort aggregatedCommitInfo error ", e);
+            }
+        });
+    }
+
+    /**
+     * Close this resource.
+     *
+     * @throws IOException throw IOException when close failed.
+     */
+    @Override
+    public void close() throws IOException {
+
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter2.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter2.java
new file mode 100644
index 000000000..1a1bfd905
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter2.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class FileSinkCommitter2 implements SinkCommitter<FileCommitInfo2> {
+
+    @Override
+    public List<FileCommitInfo2> commit(List<FileCommitInfo2> commitInfos) 
throws IOException {
+        ArrayList<FileCommitInfo2> failedCommitInfos = new ArrayList<>();
+        for (FileCommitInfo2 commitInfo : commitInfos) {
+            Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
+            needMoveFiles.forEach((k, v) -> {
+                try {
+                    FileSystemUtils.renameFile(k, v, true);
+                } catch (IOException e) {
+                    failedCommitInfos.add(commitInfo);
+                }
+            });
+            FileSystemUtils.deleteFile(commitInfo.getTransactionDir());
+        }
+        return failedCommitInfos;
+    }
+
+    /**
+     * Abort the transaction, this method will be called (**Only** on Spark 
engine) when the commit is failed.
+     *
+     * @param commitInfos The list of commit message, used to abort the commit.
+     * @throws IOException throw IOException when close failed.
+     */
+    @Override
+    public void abort(List<FileCommitInfo2> commitInfos) throws IOException {
+        for (FileCommitInfo2 commitInfo : commitInfos) {
+            Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
+            for (Map.Entry<String, String> entry : needMoveFiles.entrySet()) {
+                if (FileSystemUtils.fileExist(entry.getValue()) && 
!FileSystemUtils.fileExist(entry.getKey())) {
+                    FileSystemUtils.renameFile(entry.getValue(), 
entry.getKey(), true);
+                }
+            }
+            FileSystemUtils.deleteFile(commitInfo.getTransactionDir());
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
index ce94ff847..7f5823b43 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -114,7 +115,7 @@ public class TextFileSinkConfig extends BaseTextFileConfig 
implements PartitionC
 
         // check partition field must in seaTunnelRowTypeInfo
         if (!CollectionUtils.isEmpty(this.partitionFieldList)
-            && (CollectionUtils.isEmpty(this.sinkColumnList) || 
!this.sinkColumnList.containsAll(this.partitionFieldList))) {
+            && (CollectionUtils.isEmpty(this.sinkColumnList) || !new 
HashSet<>(this.sinkColumnList).containsAll(this.partitionFieldList))) {
             throw new RuntimeException("partition fields must in sink 
columns");
         }
 
@@ -136,12 +137,12 @@ public class TextFileSinkConfig extends 
BaseTextFileConfig implements PartitionC
 
         // init sink column index and partition field index, we will use the 
column index to found the data in SeaTunnelRow
         this.sinkColumnsIndexInRow = this.sinkColumnList.stream()
-            .map(columnName -> columnsMap.get(columnName))
+            .map(columnsMap::get)
             .collect(Collectors.toList());
 
         if (!CollectionUtils.isEmpty(this.partitionFieldList)) {
             this.partitionFieldsIndexInRow = this.partitionFieldList.stream()
-                .map(columnName -> columnsMap.get(columnName))
+                .map(columnsMap::get)
                 .collect(Collectors.toList());
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState2.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState2.java
new file mode 100644
index 000000000..1e5be30f8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState2.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class FileSinkState2 implements Serializable {
+    private final String transactionId;
+    private final Long checkpointId;
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
new file mode 100644
index 000000000..2f284791f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.util;
+
+import lombok.NonNull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileSystemUtils {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSystemUtils.class);
+
+    public static final int WRITE_BUFFER_SIZE = 2048;
+
+    public static Configuration CONF;
+
+    public static FileSystem getHdfsFs(@NonNull String path) throws 
IOException {
+        return FileSystem.get(URI.create(path), CONF);
+    }
+
+    public static FSDataOutputStream getOutputStream(@NonNull String 
outFilePath) throws IOException {
+        FileSystem hdfsFs = getHdfsFs(outFilePath);
+        Path path = new Path(outFilePath);
+        return hdfsFs.create(path, true, WRITE_BUFFER_SIZE);
+    }
+
+    public static void createFile(@NonNull String filePath) throws IOException 
{
+        FileSystem hdfsFs = getHdfsFs(filePath);
+        Path path = new Path(filePath);
+        if (!hdfsFs.createNewFile(path)) {
+            throw new IOException("create file " + filePath + " error");
+        }
+    }
+
+    public static void deleteFile(@NonNull String file) throws IOException {
+        FileSystem hdfsFs = getHdfsFs(file);
+        if (!hdfsFs.delete(new Path(file), true)) {
+            throw new IOException("delete file " + file + " error");
+        }
+    }
+
+    /**
+     * rename file
+     *
+     * @param oldName     old file name
+     * @param newName     target file name
+     * @param rmWhenExist if this is true, we will delete the target file when 
it already exists
+     * @throws IOException throw IOException
+     */
+    public static void renameFile(@NonNull String oldName, @NonNull String 
newName, boolean rmWhenExist) throws IOException {
+        FileSystem hdfsFs = getHdfsFs(newName);
+        LOGGER.info("begin rename file oldName :[" + oldName + "] to newName 
:[" + newName + "]");
+
+        Path oldPath = new Path(oldName);
+        Path newPath = new Path(newName);
+        if (rmWhenExist) {
+            if (fileExist(newName) && fileExist(oldName)) {
+                hdfsFs.delete(newPath, true);
+                LOGGER.info("Delete already file: {}", newPath);
+            }
+        }
+        if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
+            createDir(newName.substring(0, newName.lastIndexOf("/")));
+        }
+
+        if (hdfsFs.rename(oldPath, newPath)) {
+            LOGGER.info("rename file :[" + oldPath + "] to [" + newPath + "] 
finish");
+        } else {
+            throw new IOException("rename file :[" + oldPath + "] to [" + 
newPath + "] error");
+        }
+    }
+
+    public static void createDir(@NonNull String filePath) throws IOException {
+        FileSystem hdfsFs = getHdfsFs(filePath);
+        Path dfs = new Path(filePath);
+        if (!hdfsFs.mkdirs(dfs)) {
+            throw new IOException("create dir " + filePath + " error");
+        }
+    }
+
+    public static boolean fileExist(@NonNull String filePath) throws 
IOException {
+        FileSystem hdfsFs = getHdfsFs(filePath);
+        Path fileName = new Path(filePath);
+        return hdfsFs.exists(fileName);
+    }
+
+    /**
+     * get the dir in filePath
+     */
+    public static List<Path> dirList(@NonNull String filePath) throws 
FileNotFoundException, IOException {
+        FileSystem hdfsFs = getHdfsFs(filePath);
+        List<Path> pathList = new ArrayList<Path>();
+        Path fileName = new Path(filePath);
+        FileStatus[] status = hdfsFs.listStatus(fileName);
+        if (status != null && status.length > 0) {
+            for (FileStatus fileStatus : status) {
+                if (fileStatus.isDirectory()) {
+                    pathList.add(fileStatus.getPath());
+                }
+            }
+        }
+        return pathList;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
new file mode 100644
index 000000000..ed03c1324
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.utils.VariablesSubstitute;
+import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public abstract class AbstractWriteStrategy implements WriteStrategy {
+    protected final Logger log = LoggerFactory.getLogger(this.getClass());
+    protected final TextFileSinkConfig textFileSinkConfig;
+    protected final List<Integer> sinkColumnsIndexInRow;
+    protected String jobId;
+    protected int subTaskIndex;
+    protected HadoopConf hadoopConf;
+    protected String transactionId;
+    protected String transactionDirectory;
+    protected Map<String, String> needMoveFiles;
+    protected Map<String, String> beingWrittenFile;
+    private Map<String, List<String>> partitionDirAndValuesMap;
+    protected SeaTunnelRowType seaTunnelRowType;
+    protected Long checkpointId = 1L;
+
+    public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+        this.textFileSinkConfig = textFileSinkConfig;
+        this.sinkColumnsIndexInRow = 
textFileSinkConfig.getSinkColumnsIndexInRow();
+    }
+
+    /**
+     * init hadoop conf
+     *
+     * @param conf hadoop conf
+     */
+    @Override
+    public void init(HadoopConf conf, String jobId, int subTaskIndex) {
+        this.hadoopConf = conf;
+        this.jobId = jobId;
+        this.subTaskIndex = subTaskIndex;
+        FileSystemUtils.CONF = getConfiguration(hadoopConf);
+        this.beginTransaction(this.checkpointId);
+    }
+
+    /**
+     * use hadoop conf generate hadoop configuration
+     *
+     * @param conf hadoop conf
+     * @return Configuration
+     */
+    @Override
+    public Configuration getConfiguration(HadoopConf conf) {
+        Configuration configuration = new Configuration();
+        if (hadoopConf != null) {
+            configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
+            configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
+            hadoopConf.setExtraOptionsForConfiguration(configuration);
+        }
+        return configuration;
+    }
+
+    /**
+     * set seaTunnelRowTypeInfo in writer
+     *
+     * @param seaTunnelRowType seaTunnelRowType
+     */
+    @Override
+    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    /**
+     * use seaTunnelRow generate partition directory
+     *
+     * @param seaTunnelRow seaTunnelRow
+     * @return the map of partition directory
+     */
+    @Override
+    public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow 
seaTunnelRow) {
+        List<Integer> partitionFieldsIndexInRow = 
textFileSinkConfig.getPartitionFieldsIndexInRow();
+        Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>(1);
+        if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
+            partitionDirAndValuesMap.put(Constant.NON_PARTITION, null);
+            return partitionDirAndValuesMap;
+        }
+        List<String> partitionFieldList = 
textFileSinkConfig.getPartitionFieldList();
+        String partitionDirExpression = 
textFileSinkConfig.getPartitionDirExpression();
+        String[] keys = new String[partitionFieldList.size()];
+        String[] values = new String[partitionFieldList.size()];
+        for (int i = 0; i < partitionFieldList.size(); i++) {
+            keys[i] = "k" + i;
+            values[i] = "v" + i;
+        }
+        List<String> vals = new ArrayList<>(partitionFieldsIndexInRow.size());
+        String partitionDir;
+        if (StringUtils.isBlank(partitionDirExpression)) {
+            StringBuilder stringBuilder = new StringBuilder();
+            for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
+                stringBuilder.append(partitionFieldList.get(i))
+                        .append("=")
+                        
.append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)])
+                        .append("/");
+                
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+            }
+            partitionDir = stringBuilder.toString();
+        } else {
+            Map<String, String> valueMap = new 
HashMap<>(partitionFieldList.size() * 2);
+            for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
+                valueMap.put(keys[i], partitionFieldList.get(i));
+                valueMap.put(values[i], 
seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+                
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+            }
+            partitionDir = 
VariablesSubstitute.substitute(partitionDirExpression, valueMap);
+        }
+        partitionDirAndValuesMap.put(partitionDir, vals);
+        return partitionDirAndValuesMap;
+    }
+
+    /**
+     * use transaction id generate file name
+     *
+     * @param transactionId transaction id
+     * @return file name
+     */
+    @Override
+    public String generateFileName(String transactionId) {
+        String fileNameExpression = textFileSinkConfig.getFileNameExpression();
+        FileFormat fileFormat = textFileSinkConfig.getFileFormat();
+        if (StringUtils.isBlank(fileNameExpression)) {
+            return transactionId + fileFormat.getSuffix();
+        }
+        String timeFormat = textFileSinkConfig.getFileNameTimeFormat();
+        DateTimeFormatter df = DateTimeFormatter.ofPattern(timeFormat);
+        String formattedDate = df.format(ZonedDateTime.now());
+        Map<String, String> valuesMap = new HashMap<>();
+        valuesMap.put(Constants.UUID, UUID.randomUUID().toString());
+        valuesMap.put(Constants.NOW, formattedDate);
+        valuesMap.put(timeFormat, formattedDate);
+        valuesMap.put(Constant.TRANSACTION_EXPRESSION, transactionId);
+        String substitute = VariablesSubstitute.substitute(fileNameExpression, 
valuesMap);
+        return substitute + fileFormat.getSuffix();
+    }
+
+    /**
+     * prepare commit operation
+     *
+     * @return the file commit information
+     */
+    @Override
+    public Optional<FileCommitInfo2> prepareCommit() {
+        this.finishAndCloseFile();
+        Map<String, String> commitMap = new HashMap<>(this.needMoveFiles);
+        Map<String, List<String>> copyMap = 
this.partitionDirAndValuesMap.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
ArrayList<>(e.getValue())));
+        return Optional.of(new FileCommitInfo2(commitMap, copyMap, 
transactionDirectory));
+    }
+
+    /**
+     * abort prepare commit operation
+     */
+    @Override
+    public void abortPrepare() {
+        abortPrepare(transactionId);
+    }
+
+    /**
+     * abort prepare commit operation using transaction directory
+     * @param transactionId transaction id
+     */
+    public void abortPrepare(String transactionId) {
+        try {
+            FileSystemUtils.deleteFile(getTransactionDir(transactionId));
+        } catch (IOException e) {
+            throw new RuntimeException("abort transaction " + transactionId + 
" error.", e);
+        }
+    }
+
+    /**
+     * when a checkpoint completed, file connector should begin a new 
transaction and generate new transaction id
+     * @param checkpointId checkpoint id
+     */
+    public void beginTransaction(Long checkpointId) {
+        this.transactionId = "T" + Constant.TRANSACTION_ID_SPLIT + jobId + 
Constant.TRANSACTION_ID_SPLIT + subTaskIndex + Constant.TRANSACTION_ID_SPLIT + 
checkpointId;
+        this.transactionDirectory = getTransactionDir(this.transactionId);
+        this.needMoveFiles = new HashMap<>();
+        this.partitionDirAndValuesMap = new HashMap<>();
+        this.beingWrittenFile = new HashMap<>();
+    }
+
+    /**
+     * get transaction ids from file sink states
+     * @param fileStates file sink states
+     * @return transaction ids
+     */
+    public List<String> getTransactionIdFromStates(List<FileSinkState2> 
fileStates) {
+        String[] pathSegments = new String[]{textFileSinkConfig.getPath(), 
Constant.SEATUNNEL, jobId};
+        String jobDir = String.join(File.separator, pathSegments) + "/";
+        try {
+            List<String> transactionDirList = 
FileSystemUtils.dirList(jobDir).stream().map(Path::toString).collect(Collectors.toList());
+            return transactionDirList.stream().map(dir -> 
dir.replaceAll(jobDir, "")).collect(Collectors.toList());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * when a checkpoint was triggered, snapshot the state of connector
+     *
+     * @param checkpointId checkpointId
+     * @return the list of states
+     */
+    @Override
+    public List<FileSinkState2> snapshotState(long checkpointId) {
+        ArrayList<FileSinkState2> fileState = Lists.newArrayList(new 
FileSinkState2(this.transactionId, this.checkpointId));
+        this.checkpointId = checkpointId;
+        this.beginTransaction(checkpointId);
+        return fileState;
+    }
+
+    /**
+     * using transaction id generate transaction directory
+     * @param transactionId transaction id
+     * @return transaction directory
+     */
+    private String getTransactionDir(@NonNull String transactionId) {
+        String[] strings = new String[]{textFileSinkConfig.getTmpPath(), 
Constant.SEATUNNEL, jobId, transactionId};
+        return String.join(File.separator, strings);
+    }
+
+    public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow 
seaTunnelRow) {
+        Map<String, List<String>> dataPartitionDirAndValuesMap = 
generatorPartitionDir(seaTunnelRow);
+        String beingWrittenFileKey = 
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
+        // get filePath from beingWrittenFile
+        String beingWrittenFilePath = 
beingWrittenFile.get(beingWrittenFileKey);
+        if (beingWrittenFilePath != null) {
+            return beingWrittenFilePath;
+        } else {
+            String[] pathSegments = new String[]{transactionDirectory, 
beingWrittenFileKey, generateFileName(transactionId)};
+            String newBeingWrittenFilePath = String.join(File.separator, 
pathSegments);
+            beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
+            if 
(!Constant.NON_PARTITION.equals(dataPartitionDirAndValuesMap.keySet().toArray()[0].toString())){
+                partitionDirAndValuesMap.putAll(dataPartitionDirAndValuesMap);
+            }
+            return newBeingWrittenFilePath;
+        }
+    }
+
+    public String getTargetLocation(@NonNull String seaTunnelFilePath) {
+        String tmpPath = seaTunnelFilePath.replaceAll(transactionDirectory, 
textFileSinkConfig.getPath());
+        return tmpPath.replaceAll(Constant.NON_PARTITION + File.separator, "");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
new file mode 100644
index 000000000..a5aed5d79
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.NonNull;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonWriteStrategy extends AbstractWriteStrategy {
+    private final byte[] rowDelimiter;
+    private SerializationSchema serializationSchema;
+    private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
+
+    public JsonWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+        super(textFileSinkConfig);
+        this.beingWrittenOutputStream = new HashMap<>();
+        this.rowDelimiter = textFileSinkConfig.getRowDelimiter().getBytes();
+    }
+
+    @Override
+    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+        this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        FSDataOutputStream fsDataOutputStream = 
getOrCreateOutputStream(filePath);
+        try {
+            byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
+            fsDataOutputStream.write(rowBytes);
+            fsDataOutputStream.write(rowDelimiter);
+        } catch (IOException e) {
+            log.error("write data to file {} error", filePath);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseFile() {
+        beingWrittenOutputStream.forEach((key, value) -> {
+            try {
+                value.flush();
+            } catch (IOException e) {
+                log.error("error when flush file {}", key);
+                throw new RuntimeException(e);
+            } finally {
+                try {
+                    value.close();
+                } catch (IOException e) {
+                    log.error("error when close output stream {}", key, e);
+                }
+            }
+
+            needMoveFiles.put(key, getTargetLocation(key));
+        });
+    }
+
+    private FSDataOutputStream getOrCreateOutputStream(@NonNull String 
filePath) {
+        FSDataOutputStream fsDataOutputStream = 
beingWrittenOutputStream.get(filePath);
+        if (fsDataOutputStream == null) {
+            try {
+                fsDataOutputStream = FileSystemUtils.getOutputStream(filePath);
+                beingWrittenOutputStream.put(filePath, fsDataOutputStream);
+            } catch (IOException e) {
+                log.error("can not get output file stream");
+                throw new RuntimeException(e);
+            }
+        }
+        return fsDataOutputStream;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
new file mode 100644
index 000000000..875504e9b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+
+import lombok.NonNull;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class OrcWriteStrategy extends AbstractWriteStrategy {
+    private final Map<String, Writer> beingWrittenWriter;
+
+    public OrcWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+        super(textFileSinkConfig);
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        Writer writer = getOrCreateWriter(filePath);
+        TypeDescription schema = buildSchemaWithRowType();
+        VectorizedRowBatch rowBatch = schema.createRowBatch();
+        int i = 0;
+        int row = rowBatch.size++;
+        for (Integer index : sinkColumnsIndexInRow) {
+            Object value = seaTunnelRow.getField(index);
+            ColumnVector vector = rowBatch.cols[i];
+            setColumn(value, vector, row);
+            i++;
+        }
+        try {
+            writer.addRowBatch(rowBatch);
+            rowBatch.reset();
+        } catch (IOException e) {
+            String errorMsg = String.format("Write data to orc file [%s] 
error", filePath);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseFile() {
+        this.beingWrittenWriter.forEach((k, v) -> {
+            try {
+                v.close();
+            } catch (IOException e) {
+                String errorMsg = String.format("Close file [%s] orc writer 
failed, error msg: [%s]", k, e.getMessage());
+                throw new RuntimeException(errorMsg, e);
+            }
+            needMoveFiles.put(k, getTargetLocation(k));
+        });
+    }
+
+    private Writer getOrCreateWriter(@NonNull String filePath) {
+        Writer writer = this.beingWrittenWriter.get(filePath);
+        if (writer == null) {
+            TypeDescription schema = buildSchemaWithRowType();
+            Path path = new Path(filePath);
+            try {
+                OrcFile.WriterOptions options = 
OrcFile.writerOptions(getConfiguration(hadoopConf))
+                        .setSchema(schema)
+                        // temporarily used snappy
+                        .compress(CompressionKind.SNAPPY)
+                        // use orc version 0.12
+                        .version(OrcFile.Version.V_0_12)
+                        .overwrite(true);
+                Writer newWriter = OrcFile.createWriter(path, options);
+                this.beingWrittenWriter.put(filePath, newWriter);
+                return newWriter;
+            } catch (IOException e) {
+                String errorMsg = String.format("Get orc writer for file [%s] 
error", filePath);
+                throw new RuntimeException(errorMsg, e);
+            }
+        }
+        return writer;
+    }
+
+    private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
+        if (BasicType.BOOLEAN_TYPE.equals(type)) {
+            return TypeDescription.createBoolean();
+        }
+        if (BasicType.SHORT_TYPE.equals(type)) {
+            return TypeDescription.createShort();
+        }
+        if (BasicType.INT_TYPE.equals(type)) {
+            return TypeDescription.createInt();
+        }
+        if (BasicType.LONG_TYPE.equals(type)) {
+            return TypeDescription.createLong();
+        }
+        if (BasicType.FLOAT_TYPE.equals(type)) {
+            return TypeDescription.createFloat();
+        }
+        if (BasicType.DOUBLE_TYPE.equals(type)) {
+            return TypeDescription.createDouble();
+        }
+        if (BasicType.BYTE_TYPE.equals(type)) {
+            return TypeDescription.createByte();
+        }
+        return TypeDescription.createString();
+    }
+
+    private TypeDescription buildSchemaWithRowType() {
+        TypeDescription schema = TypeDescription.createStruct();
+        for (Integer i : sinkColumnsIndexInRow) {
+            TypeDescription fieldType = 
buildFieldWithRowType(seaTunnelRowType.getFieldType(i));
+            schema.addField(seaTunnelRowType.getFieldName(i), fieldType);
+        }
+        return schema;
+    }
+
+    private void setColumn(Object value, ColumnVector vector, int row) {
+        if (value == null) {
+            vector.isNull[row] = true;
+            vector.noNulls = false;
+        } else {
+            switch (vector.type) {
+                case LONG:
+                    LongColumnVector longVector = (LongColumnVector) vector;
+                    setLongColumnVector(value, longVector, row);
+                    break;
+                case DOUBLE:
+                    DoubleColumnVector doubleColumnVector = 
(DoubleColumnVector) vector;
+                    setDoubleVector(value, doubleColumnVector, row);
+                    break;
+                case BYTES:
+                    BytesColumnVector bytesColumnVector = (BytesColumnVector) 
vector;
+                    setByteColumnVector(value, bytesColumnVector, row);
+                    break;
+                default:
+                    throw new RuntimeException("Unexpected ColumnVector 
subtype");
+            }
+        }
+    }
+
+    private void setLongColumnVector(Object value, LongColumnVector 
longVector, int row) {
+        if (value instanceof Boolean) {
+            Boolean bool = (Boolean) value;
+            longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? 
Long.valueOf(1) : Long.valueOf(0);
+        }  else if (value instanceof Integer) {
+            longVector.vector[row] = ((Integer) value).longValue();
+        } else if (value instanceof Long) {
+            longVector.vector[row] = (Long) value;
+        } else if (value instanceof BigInteger) {
+            BigInteger bigInt = (BigInteger) value;
+            longVector.vector[row] = bigInt.longValue();
+        } else {
+            throw new RuntimeException("Long or Integer type expected for 
field");
+        }
+    }
+
+    private void setByteColumnVector(Object value, BytesColumnVector 
bytesColVector, int rowNum) {
+        if (value instanceof byte[] || value instanceof String) {
+            byte[] byteVec;
+            if (value instanceof String) {
+                String strVal = (String) value;
+                byteVec = strVal.getBytes(StandardCharsets.UTF_8);
+            } else {
+                byteVec = (byte[]) value;
+            }
+            bytesColVector.setRef(rowNum, byteVec, 0, byteVec.length);
+        } else {
+            throw new RuntimeException("byte[] or String type expected for 
field ");
+        }
+    }
+
+    private void setDoubleVector(Object value, DoubleColumnVector 
doubleVector, int rowNum) {
+        if (value instanceof Double) {
+            doubleVector.vector[rowNum] = (Double) value;
+        } else if (value instanceof Float) {
+            Float floatValue = (Float) value;
+            doubleVector.vector[rowNum] = floatValue.doubleValue();
+        } else {
+            throw new RuntimeException("Double or Float type expected for 
field ");
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
new file mode 100644
index 000000000..3fccdfa01
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParquetWriteStrategy extends AbstractWriteStrategy {
+    private final Map<String, ParquetWriter<GenericRecord>> beingWrittenWriter;
+
+    public ParquetWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+        super(textFileSinkConfig);
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
+        Schema schema = buildSchemaWithRowType();
+        GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
+        sinkColumnsIndexInRow.forEach(index -> 
recordBuilder.set(seaTunnelRowType.getFieldName(index), 
seaTunnelRow.getField(index)));
+        GenericData.Record record = recordBuilder.build();
+        try {
+            writer.write(record);
+        } catch (IOException e) {
+            String errorMsg = String.format("Write data to file [%s] error", 
filePath);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseFile() {
+        this.beingWrittenWriter.forEach((k, v) -> {
+            try {
+                v.close();
+            } catch (IOException e) {
+                String errorMsg = String.format("Close file [%s] parquet 
writer failed, error msg: [%s]", k, e.getMessage());
+                throw new RuntimeException(errorMsg, e);
+            }
+            needMoveFiles.put(k, getTargetLocation(k));
+        });
+    }
+
+    private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String 
filePath) {
+        ParquetWriter<GenericRecord> writer = 
this.beingWrittenWriter.get(filePath);
+        if (writer == null) {
+            Schema schema = buildSchemaWithRowType();
+            Path path = new Path(filePath);
+            try {
+                HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, 
getConfiguration(hadoopConf));
+                ParquetWriter<GenericRecord> newWriter = 
AvroParquetWriter.<GenericRecord>builder(outputFile)
+                        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+                        // use parquet v1 to improve compatibility
+                        
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+                        // Temporarily use snappy compress
+                        // I think we can use the compress option in config to 
control this
+                        .withCompressionCodec(CompressionCodecName.SNAPPY)
+                        .withSchema(schema)
+                        .build();
+                this.beingWrittenWriter.put(filePath, newWriter);
+                return newWriter;
+            } catch (IOException e) {
+                String errorMsg = String.format("Get parquet writer for file 
[%s] error", filePath);
+                throw new RuntimeException(errorMsg, e);
+            }
+        }
+        return writer;
+    }
+
+    private Schema buildSchemaWithRowType() {
+        ArrayList<Schema.Field> fields = new ArrayList<>();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        sinkColumnsIndexInRow.forEach(index -> {
+            if (BasicType.BOOLEAN_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], 
Schema.create(Schema.Type.BOOLEAN), null, null);
+                fields.add(field);
+            } else if (BasicType.SHORT_TYPE.equals(fieldTypes[index]) || 
BasicType.INT_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], 
Schema.create(Schema.Type.INT), null, null);
+                fields.add(field);
+            } else if (BasicType.LONG_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], 
Schema.create(Schema.Type.LONG), null, null);
+                fields.add(field);
+            } else if (BasicType.FLOAT_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], 
Schema.create(Schema.Type.FLOAT), null, null);
+                fields.add(field);
+            } else if (BasicType.DOUBLE_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], 
Schema.create(Schema.Type.DOUBLE), null, null);
+                fields.add(field);
+            } else if (BasicType.STRING_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], 
Schema.create(Schema.Type.STRING), null, null);
+                fields.add(field);
+            } else if (BasicType.BYTE_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], 
Schema.create(Schema.Type.BYTES), null, null);
+                fields.add(field);
+            } else if (BasicType.VOID_TYPE.equals(fieldTypes[index])) {
+                Schema.Field field = new Schema.Field(fieldNames[index], 
Schema.create(Schema.Type.NULL), null, null);
+                fields.add(field);
+            }
+        });
+        return Schema.createRecord("SeatunnelRecord",
+                "The record generated by seatunnel file connector",
+                "org.apache.parquet.avro",
+                false,
+                fields);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
new file mode 100644
index 000000000..04fb7679c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+
+import lombok.NonNull;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TextWriteStrategy extends AbstractWriteStrategy {
+    private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
+    private final String fieldDelimiter;
+    private final String rowDelimiter;
+
+    public TextWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+        super(textFileSinkConfig);
+        this.beingWrittenOutputStream = new HashMap<>();
+        this.fieldDelimiter = textFileSinkConfig.getFieldDelimiter();
+        this.rowDelimiter = textFileSinkConfig.getRowDelimiter();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        FSDataOutputStream fsDataOutputStream = 
getOrCreateOutputStream(filePath);
+        String line = transformRowToLine(seaTunnelRow);
+        try {
+            fsDataOutputStream.write(line.getBytes());
+            fsDataOutputStream.write(rowDelimiter.getBytes());
+        } catch (IOException e) {
+            log.error("write data to file {} error", filePath);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseFile() {
+        beingWrittenOutputStream.forEach((key, value) -> {
+            try {
+                value.flush();
+            } catch (IOException e) {
+                log.error("error when flush file {}", key);
+                throw new RuntimeException(e);
+            } finally {
+                try {
+                    value.close();
+                } catch (IOException e) {
+                    log.error("error when close output stream {}", key);
+                }
+            }
+            needMoveFiles.put(key, getTargetLocation(key));
+        });
+    }
+
+    private FSDataOutputStream getOrCreateOutputStream(@NonNull String 
filePath) {
+        FSDataOutputStream fsDataOutputStream = 
beingWrittenOutputStream.get(filePath);
+        if (fsDataOutputStream == null) {
+            try {
+                fsDataOutputStream = FileSystemUtils.getOutputStream(filePath);
+                beingWrittenOutputStream.put(filePath, fsDataOutputStream);
+            } catch (IOException e) {
+                log.error("can not get output file stream");
+                throw new RuntimeException(e);
+            }
+        }
+        return fsDataOutputStream;
+    }
+
+    private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) {
+        return this.sinkColumnsIndexInRow.stream().map(index -> 
seaTunnelRow.getFields()[index] == null ? "" : 
seaTunnelRow.getFields()[index].toString()).collect(Collectors.joining(fieldDelimiter));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/Transaction.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/Transaction.java
new file mode 100644
index 000000000..86f267ce6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/Transaction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+public interface Transaction extends Serializable {
+    /**
+     * prepare commit operation
+     * @return the file commit information
+     */
+    Optional<FileCommitInfo2> prepareCommit();
+
+    /**
+     * abort prepare commit operation
+     */
+    void abortPrepare();
+
+    /**
+     * abort prepare commit operation using transaction id
+     * @param transactionId transaction id
+     */
+    void abortPrepare(String transactionId);
+
+    /**
+     * when a checkpoint was triggered, snapshot the state of connector
+     * @param checkpointId checkpointId
+     * @return the list of states
+     */
+    List<FileSinkState2> snapshotState(long checkpointId);
+
+    /**
+     * get dirty transaction ids from file sink states
+     * @param fileSinkState file sink states
+     * @return transaction ids
+     */
+    List<String> getTransactionIdFromStates(List<FileSinkState2> 
fileSinkState);
+
+    /**
+     * when a checkpoint triggered, file sink should begin a new transaction
+     * @param checkpointId checkpoint id
+     */
+    void beginTransaction(Long checkpointId);
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
new file mode 100644
index 000000000..aecc80b0d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public interface WriteStrategy extends Transaction, Serializable {
+    /**
+     * init hadoop conf
+     * @param conf hadoop conf
+     */
+    void init(HadoopConf conf, String jobId, int subTaskIndex);
+
+    /**
+     * use hadoop conf generate hadoop configuration
+     * @param conf hadoop conf
+     * @return Configuration
+     */
+    Configuration getConfiguration(HadoopConf conf);
+
+    /**
+     * write seaTunnelRow to target datasource
+     * @param seaTunnelRow seaTunnelRow
+     * @throws Exception Exceptions
+     */
+    void write(SeaTunnelRow seaTunnelRow) throws Exception;
+
+    /**
+     * set seaTunnelRowTypeInfo in writer
+     * @param seaTunnelRowType seaTunnelRowType
+     */
+    void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
+
+    /**
+     * use seaTunnelRow generate partition directory
+     * @param seaTunnelRow seaTunnelRow
+     * @return the map of partition directory
+     */
+    Map<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow);
+
+    /**
+     * use transaction id generate file name
+     * @param transactionId transaction id
+     * @return file name
+     */
+    String generateFileName(String transactionId);
+
+    /**
+     * when a transaction is triggered, release resources
+     */
+    void finishAndCloseFile();
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
new file mode 100644
index 000000000..92467ca92
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class WriteStrategyFactory {
+
+    private WriteStrategyFactory() {}
+
+    public static WriteStrategy of(String fileType, TextFileSinkConfig 
textFileSinkConfig) {
+        try {
+            FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
+            return fileFormat.getWriteStrategy(textFileSinkConfig);
+        } catch (IllegalArgumentException e) {
+            String errorMsg = String.format("File sink connector not support 
this file type [%s], please check your config", fileType);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
+    public static WriteStrategy of(FileFormat fileFormat, TextFileSinkConfig 
textFileSinkConfig) {
+        return fileFormat.getWriteStrategy(textFileSinkConfig);
+    }
+}

Reply via email to