This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 631509293 [Improve][Connector-V2] Refactor the structure of file sink
to reduce redundant codes (#2555)
631509293 is described below
commit 631509293037f3c6d7ef5ce6df53070f28e8d78f
Author: TyrantLucifer <[email protected]>
AuthorDate: Wed Aug 31 09:15:17 2022 +0800
[Improve][Connector-V2] Refactor the structure of file sink to reduce
redundant codes (#2555)
---
.../seatunnel/file/config/FileFormat.java | 36 +++
.../seatunnel/file/sink/BaseFileSink.java | 120 +++++++++
.../seatunnel/file/sink/BaseFileSinkWriter.java | 84 ++++++
.../sink/commit/FileAggregatedCommitInfo2.java | 43 +++
.../file/sink/commit/FileCommitInfo2.java | 48 ++++
.../sink/commit/FileSinkAggregatedCommitter2.java | 117 ++++++++
.../file/sink/commit/FileSinkCommitter2.java | 65 +++++
.../file/sink/config/TextFileSinkConfig.java | 7 +-
.../seatunnel/file/sink/state/FileSinkState2.java | 30 +++
.../seatunnel/file/sink/util/FileSystemUtils.java | 129 +++++++++
.../file/sink/writer/AbstractWriteStrategy.java | 295 +++++++++++++++++++++
.../file/sink/writer/JsonWriteStrategy.java | 98 +++++++
.../file/sink/writer/OrcWriteStrategy.java | 210 +++++++++++++++
.../file/sink/writer/ParquetWriteStrategy.java | 143 ++++++++++
.../file/sink/writer/TextWriteStrategy.java | 94 +++++++
.../seatunnel/file/sink/writer/Transaction.java | 64 +++++
.../seatunnel/file/sink/writer/WriteStrategy.java | 75 ++++++
.../file/sink/writer/WriteStrategyFactory.java | 43 +++
18 files changed, 1698 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index a356ed0c6..4cddc2f4c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -17,6 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.JsonWriteStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.TextWriteStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
@@ -27,30 +33,56 @@ import java.io.Serializable;
public enum FileFormat implements Serializable {
CSV("csv") {
+ @Override
+ public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
+ textFileSinkConfig.setFieldDelimiter(",");
+ return new TextWriteStrategy(textFileSinkConfig);
+ }
+
@Override
public ReadStrategy getReadStrategy() {
return new TextReadStrategy();
}
},
TEXT("txt") {
+ @Override
+ public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
+ return new TextWriteStrategy(textFileSinkConfig);
+ }
+
@Override
public ReadStrategy getReadStrategy() {
return new TextReadStrategy();
}
},
PARQUET("parquet") {
+ @Override
+ public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
+ return new ParquetWriteStrategy(textFileSinkConfig);
+ }
+
@Override
public ReadStrategy getReadStrategy() {
return new ParquetReadStrategy();
}
},
ORC("orc") {
+ @Override
+ public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
+ return new OrcWriteStrategy(textFileSinkConfig);
+ }
+
@Override
public ReadStrategy getReadStrategy() {
return new OrcReadStrategy();
}
},
JSON("json") {
+ @Override
+ public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
+ return new JsonWriteStrategy(textFileSinkConfig);
+ }
+
@Override
public ReadStrategy getReadStrategy() {
return new JsonReadStrategy();
@@ -70,4 +102,8 @@ public enum FileFormat implements Serializable {
public ReadStrategy getReadStrategy() {
return null;
}
+
+ public WriteStrategy getWriteStrategy(TextFileSinkConfig
textFileSinkConfig) {
+ return null;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
new file mode 100644
index 000000000..0faa8d77e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo2;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter2;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkCommitter2;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public abstract class BaseFileSink implements SeaTunnelSink<SeaTunnelRow,
FileSinkState2, FileCommitInfo2, FileAggregatedCommitInfo2> {
+ protected SeaTunnelRowType seaTunnelRowType;
+ protected Config pluginConfig;
+ protected HadoopConf hadoopConf;
+ protected TextFileSinkConfig textFileSinkConfig;
+ protected WriteStrategy writeStrategy;
+ protected SeaTunnelContext seaTunnelContext;
+ protected String jobId;
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ this.jobId = seaTunnelContext.getJobId();
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.textFileSinkConfig = new TextFileSinkConfig(pluginConfig,
seaTunnelRowType);
+ this.writeStrategy =
WriteStrategyFactory.of(textFileSinkConfig.getFileFormat(), textFileSinkConfig);
+ this.writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return seaTunnelRowType;
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, FileCommitInfo2, FileSinkState2>
restoreWriter(SinkWriter.Context context, List<FileSinkState2> states) throws
IOException {
+ return new BaseFileSinkWriter(writeStrategy, hadoopConf, context,
jobId, states);
+ }
+
+ @Override
+ public Optional<SinkCommitter<FileCommitInfo2>> createCommitter() throws
IOException {
+ return Optional.of(new FileSinkCommitter2());
+ }
+
+ @Override
+ public Optional<SinkAggregatedCommitter<FileCommitInfo2,
FileAggregatedCommitInfo2>> createAggregatedCommitter() throws IOException {
+ return Optional.of(new FileSinkAggregatedCommitter2());
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, FileCommitInfo2, FileSinkState2>
createWriter(SinkWriter.Context context) throws IOException {
+ return new BaseFileSinkWriter(writeStrategy, hadoopConf, context,
jobId);
+ }
+
+ @Override
+ public Optional<Serializer<FileCommitInfo2>> getCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<Serializer<FileAggregatedCommitInfo2>>
getAggregatedCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<Serializer<FileSinkState2>> getWriterStateSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ /**
+ * Use the pluginConfig to do some initialize operation.
+ *
+ * @param pluginConfig plugin config.
+ * @throws PrepareFailException if plugin prepare failed, the {@link
PrepareFailException} will throw.
+ */
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ this.pluginConfig = pluginConfig;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
new file mode 100644
index 000000000..d02411d77
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow,
FileCommitInfo2, FileSinkState2> {
+ private final WriteStrategy writeStrategy;
+ private final HadoopConf hadoopConf;
+ private final SinkWriter.Context context;
+ private final int subTaskIndex;
+ private final String jobId;
+
+ public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf
hadoopConf, SinkWriter.Context context, String jobId, List<FileSinkState2>
fileSinkStates) {
+ this.writeStrategy = writeStrategy;
+ this.context = context;
+ this.hadoopConf = hadoopConf;
+ this.jobId = jobId;
+ this.subTaskIndex = context.getIndexOfSubtask();
+ writeStrategy.init(hadoopConf, jobId, subTaskIndex);
+ if (!fileSinkStates.isEmpty()) {
+ List<String> transactionIds =
writeStrategy.getTransactionIdFromStates(fileSinkStates);
+ transactionIds.forEach(writeStrategy::abortPrepare);
+
writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId());
+ }
+ }
+
+ public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf
hadoopConf, SinkWriter.Context context, String jobId) {
+ this(writeStrategy, hadoopConf, context, jobId,
Collections.emptyList());
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ try {
+ writeStrategy.write(element);
+ } catch (Exception e) {
+ throw new RuntimeException("Write data error, please check", e);
+ }
+ }
+
+ @Override
+ public Optional<FileCommitInfo2> prepareCommit() throws IOException {
+ return writeStrategy.prepareCommit();
+ }
+
+ @Override
+ public void abortPrepare() {
+ writeStrategy.abortPrepare();
+ }
+
+ @Override
+ public List<FileSinkState2> snapshotState(long checkpointId) throws
IOException {
+ return writeStrategy.snapshotState(checkpointId);
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo2.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo2.java
new file mode 100644
index 000000000..dcf4dde42
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo2.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class FileAggregatedCommitInfo2 implements Serializable {
+ /**
+ * Storage the commit info in map.
+ * <p>K is the file path need to be moved to target dir.</p>
+ * <p>V is the target file path of the data file.</p>
+ */
+ private final Map<String, Map<String, String>> transactionMap;
+
+ /**
+ * Storage the partition information in map.
+ * <p>K is the partition column's name.</p>
+ * <p>V is the list of partition column's values.</p>
+ */
+ private final Map<String, List<String>> partitionDirAndValuesMap;
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo2.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo2.java
new file mode 100644
index 000000000..bb7586e37
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo2.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@AllArgsConstructor
+public class FileCommitInfo2 implements Serializable {
+ /**
+ * Storage the commit info in map.
+ * <p>K is the file path need to be moved to target dir.</p>
+ * <p>V is the target file path of the data file.</p>
+ */
+ private final Map<String, String> needMoveFiles;
+
+ /**
+ * Storage the partition information in map.
+ * <p>K is the partition column's name.</p>
+ * <p>V is the list of partition column's values.</p>
+ */
+ private final Map<String, List<String>> partitionDirAndValuesMap;
+
+ /**
+ * Storage the transaction directory
+ */
+ private final String transactionDir;
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter2.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter2.java
new file mode 100644
index 000000000..1366a132e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter2.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FileSinkAggregatedCommitter2 implements
SinkAggregatedCommitter<FileCommitInfo2, FileAggregatedCommitInfo2> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileSinkAggregatedCommitter2.class);
+
+ @Override
+ public List<FileAggregatedCommitInfo2>
commit(List<FileAggregatedCommitInfo2> aggregatedCommitInfos) throws
IOException {
+ List<FileAggregatedCommitInfo2> errorAggregatedCommitInfoList = new
ArrayList<>();
+ aggregatedCommitInfos.forEach(aggregatedCommitInfo -> {
+ try {
+ for (Map.Entry<String, Map<String, String>> entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
+ for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
+ // first rename temp file
+ FileSystemUtils.renameFile(mvFileEntry.getKey(),
mvFileEntry.getValue(), true);
+ }
+ // second delete transaction directory
+ FileSystemUtils.deleteFile(entry.getKey());
+ }
+ } catch (Exception e) {
+ LOGGER.error("commit aggregatedCommitInfo error ", e);
+ errorAggregatedCommitInfoList.add(aggregatedCommitInfo);
+ }
+ });
+ return errorAggregatedCommitInfoList;
+ }
+
+ /**
+ * The logic about how to combine commit message.
+ *
+ * @param commitInfos The list of commit message.
+ * @return The commit message after combine.
+ */
+ @Override
+ public FileAggregatedCommitInfo2 combine(List<FileCommitInfo2>
commitInfos) {
+ if (commitInfos == null || commitInfos.size() == 0) {
+ return null;
+ }
+ Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
+ Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>();
+ commitInfos.forEach(commitInfo -> {
+ Map<String, String> needMoveFileMap =
aggregateCommitInfo.computeIfAbsent(commitInfo.getTransactionDir(), k -> new
HashMap<>());
+ needMoveFileMap.putAll(commitInfo.getNeedMoveFiles());
+ if (commitInfo.getPartitionDirAndValuesMap() != null &&
!commitInfo.getPartitionDirAndValuesMap().isEmpty()) {
+
partitionDirAndValuesMap.putAll(commitInfo.getPartitionDirAndValuesMap());
+ }
+ });
+ return new FileAggregatedCommitInfo2(aggregateCommitInfo,
partitionDirAndValuesMap);
+ }
+
+ /**
+ * If {@link #commit(List)} failed, this method will be called (**Only**
on Spark engine at now).
+ *
+ * @param aggregatedCommitInfos The list of combine commit message.
+ * @throws Exception throw Exception when abort failed.
+ */
+ @Override
+ public void abort(List<FileAggregatedCommitInfo2> aggregatedCommitInfos)
throws Exception {
+ if (aggregatedCommitInfos == null || aggregatedCommitInfos.size() ==
0) {
+ return;
+ }
+ aggregatedCommitInfos.forEach(aggregatedCommitInfo -> {
+ try {
+ for (Map.Entry<String, Map<String, String>> entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
+ // rollback the file
+ for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
+ if (FileSystemUtils.fileExist(mvFileEntry.getValue())
&& !FileSystemUtils.fileExist(mvFileEntry.getKey())) {
+ FileSystemUtils.renameFile(mvFileEntry.getValue(),
mvFileEntry.getKey(), true);
+ }
+ }
+ // delete the transaction dir
+ FileSystemUtils.deleteFile(entry.getKey());
+ }
+ } catch (Exception e) {
+ LOGGER.error("abort aggregatedCommitInfo error ", e);
+ }
+ });
+ }
+
+ /**
+ * Close this resource.
+ *
+ * @throws IOException throw IOException when close failed.
+ */
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter2.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter2.java
new file mode 100644
index 000000000..1a1bfd905
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter2.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class FileSinkCommitter2 implements SinkCommitter<FileCommitInfo2> {
+
+ @Override
+ public List<FileCommitInfo2> commit(List<FileCommitInfo2> commitInfos)
throws IOException {
+ ArrayList<FileCommitInfo2> failedCommitInfos = new ArrayList<>();
+ for (FileCommitInfo2 commitInfo : commitInfos) {
+ Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
+ needMoveFiles.forEach((k, v) -> {
+ try {
+ FileSystemUtils.renameFile(k, v, true);
+ } catch (IOException e) {
+ failedCommitInfos.add(commitInfo);
+ }
+ });
+ FileSystemUtils.deleteFile(commitInfo.getTransactionDir());
+ }
+ return failedCommitInfos;
+ }
+
+ /**
+ * Abort the transaction, this method will be called (**Only** on Spark
engine) when the commit is failed.
+ *
+ * @param commitInfos The list of commit message, used to abort the commit.
+ * @throws IOException throw IOException when close failed.
+ */
+ @Override
+ public void abort(List<FileCommitInfo2> commitInfos) throws IOException {
+ for (FileCommitInfo2 commitInfo : commitInfos) {
+ Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
+ for (Map.Entry<String, String> entry : needMoveFiles.entrySet()) {
+ if (FileSystemUtils.fileExist(entry.getValue()) &&
!FileSystemUtils.fileExist(entry.getKey())) {
+ FileSystemUtils.renameFile(entry.getValue(),
entry.getKey(), true);
+ }
+ }
+ FileSystemUtils.deleteFile(commitInfo.getTransactionDir());
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
index ce94ff847..7f5823b43 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -114,7 +115,7 @@ public class TextFileSinkConfig extends BaseTextFileConfig
implements PartitionC
// check partition field must in seaTunnelRowTypeInfo
if (!CollectionUtils.isEmpty(this.partitionFieldList)
- && (CollectionUtils.isEmpty(this.sinkColumnList) ||
!this.sinkColumnList.containsAll(this.partitionFieldList))) {
+ && (CollectionUtils.isEmpty(this.sinkColumnList) || !new
HashSet<>(this.sinkColumnList).containsAll(this.partitionFieldList))) {
throw new RuntimeException("partition fields must in sink
columns");
}
@@ -136,12 +137,12 @@ public class TextFileSinkConfig extends
BaseTextFileConfig implements PartitionC
// init sink column index and partition field index, we will use the
column index to found the data in SeaTunnelRow
this.sinkColumnsIndexInRow = this.sinkColumnList.stream()
- .map(columnName -> columnsMap.get(columnName))
+ .map(columnsMap::get)
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(this.partitionFieldList)) {
this.partitionFieldsIndexInRow = this.partitionFieldList.stream()
- .map(columnName -> columnsMap.get(columnName))
+ .map(columnsMap::get)
.collect(Collectors.toList());
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState2.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState2.java
new file mode 100644
index 000000000..1e5be30f8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState2.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class FileSinkState2 implements Serializable {
+ private final String transactionId;
+ private final Long checkpointId;
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
new file mode 100644
index 000000000..2f284791f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.util;
+
+import lombok.NonNull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileSystemUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileSystemUtils.class);
+
+ public static final int WRITE_BUFFER_SIZE = 2048;
+
+ public static Configuration CONF;
+
+ public static FileSystem getHdfsFs(@NonNull String path) throws
IOException {
+ return FileSystem.get(URI.create(path), CONF);
+ }
+
+ public static FSDataOutputStream getOutputStream(@NonNull String
outFilePath) throws IOException {
+ FileSystem hdfsFs = getHdfsFs(outFilePath);
+ Path path = new Path(outFilePath);
+ return hdfsFs.create(path, true, WRITE_BUFFER_SIZE);
+ }
+
+ public static void createFile(@NonNull String filePath) throws IOException
{
+ FileSystem hdfsFs = getHdfsFs(filePath);
+ Path path = new Path(filePath);
+ if (!hdfsFs.createNewFile(path)) {
+ throw new IOException("create file " + filePath + " error");
+ }
+ }
+
+ public static void deleteFile(@NonNull String file) throws IOException {
+ FileSystem hdfsFs = getHdfsFs(file);
+ if (!hdfsFs.delete(new Path(file), true)) {
+ throw new IOException("delete file " + file + " error");
+ }
+ }
+
+ /**
+ * rename file
+ *
+ * @param oldName old file name
+ * @param newName target file name
+ * @param rmWhenExist if this is true, we will delete the target file when
it already exists
+ * @throws IOException throw IOException
+ */
+ public static void renameFile(@NonNull String oldName, @NonNull String
newName, boolean rmWhenExist) throws IOException {
+ FileSystem hdfsFs = getHdfsFs(newName);
+ LOGGER.info("begin rename file oldName :[" + oldName + "] to newName
:[" + newName + "]");
+
+ Path oldPath = new Path(oldName);
+ Path newPath = new Path(newName);
+ if (rmWhenExist) {
+ if (fileExist(newName) && fileExist(oldName)) {
+ hdfsFs.delete(newPath, true);
+ LOGGER.info("Delete already file: {}", newPath);
+ }
+ }
+ if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
+ createDir(newName.substring(0, newName.lastIndexOf("/")));
+ }
+
+ if (hdfsFs.rename(oldPath, newPath)) {
+ LOGGER.info("rename file :[" + oldPath + "] to [" + newPath + "]
finish");
+ } else {
+ throw new IOException("rename file :[" + oldPath + "] to [" +
newPath + "] error");
+ }
+ }
+
+ public static void createDir(@NonNull String filePath) throws IOException {
+ FileSystem hdfsFs = getHdfsFs(filePath);
+ Path dfs = new Path(filePath);
+ if (!hdfsFs.mkdirs(dfs)) {
+ throw new IOException("create dir " + filePath + " error");
+ }
+ }
+
+ public static boolean fileExist(@NonNull String filePath) throws
IOException {
+ FileSystem hdfsFs = getHdfsFs(filePath);
+ Path fileName = new Path(filePath);
+ return hdfsFs.exists(fileName);
+ }
+
+ /**
+ * get the dir in filePath
+ */
+ public static List<Path> dirList(@NonNull String filePath) throws
FileNotFoundException, IOException {
+ FileSystem hdfsFs = getHdfsFs(filePath);
+ List<Path> pathList = new ArrayList<Path>();
+ Path fileName = new Path(filePath);
+ FileStatus[] status = hdfsFs.listStatus(fileName);
+ if (status != null && status.length > 0) {
+ for (FileStatus fileStatus : status) {
+ if (fileStatus.isDirectory()) {
+ pathList.add(fileStatus.getPath());
+ }
+ }
+ }
+ return pathList;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
new file mode 100644
index 000000000..ed03c1324
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.utils.VariablesSubstitute;
+import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public abstract class AbstractWriteStrategy implements WriteStrategy {
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+ protected final TextFileSinkConfig textFileSinkConfig;
+ protected final List<Integer> sinkColumnsIndexInRow;
+ protected String jobId;
+ protected int subTaskIndex;
+ protected HadoopConf hadoopConf;
+ protected String transactionId;
+ protected String transactionDirectory;
+ protected Map<String, String> needMoveFiles;
+ protected Map<String, String> beingWrittenFile;
+ private Map<String, List<String>> partitionDirAndValuesMap;
+ protected SeaTunnelRowType seaTunnelRowType;
+ protected Long checkpointId = 1L;
+
+ public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+ this.textFileSinkConfig = textFileSinkConfig;
+ this.sinkColumnsIndexInRow =
textFileSinkConfig.getSinkColumnsIndexInRow();
+ }
+
+ /**
+ * init hadoop conf
+ *
+ * @param conf hadoop conf
+ */
+ @Override
+ public void init(HadoopConf conf, String jobId, int subTaskIndex) {
+ this.hadoopConf = conf;
+ this.jobId = jobId;
+ this.subTaskIndex = subTaskIndex;
+ FileSystemUtils.CONF = getConfiguration(hadoopConf);
+ this.beginTransaction(this.checkpointId);
+ }
+
+ /**
+ * use hadoop conf generate hadoop configuration
+ *
+ * @param conf hadoop conf
+ * @return Configuration
+ */
+ @Override
+ public Configuration getConfiguration(HadoopConf conf) {
+ Configuration configuration = new Configuration();
+ if (hadoopConf != null) {
+ configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
hadoopConf.getHdfsNameKey());
+ configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
+ hadoopConf.setExtraOptionsForConfiguration(configuration);
+ }
+ return configuration;
+ }
+
+ /**
+ * set seaTunnelRowTypeInfo in writer
+ *
+ * @param seaTunnelRowType seaTunnelRowType
+ */
+ @Override
+ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ /**
+ * use seaTunnelRow generate partition directory
+ *
+ * @param seaTunnelRow seaTunnelRow
+ * @return the map of partition directory
+ */
+ @Override
+ public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow
seaTunnelRow) {
+ List<Integer> partitionFieldsIndexInRow =
textFileSinkConfig.getPartitionFieldsIndexInRow();
+ Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>(1);
+ if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
+ partitionDirAndValuesMap.put(Constant.NON_PARTITION, null);
+ return partitionDirAndValuesMap;
+ }
+ List<String> partitionFieldList =
textFileSinkConfig.getPartitionFieldList();
+ String partitionDirExpression =
textFileSinkConfig.getPartitionDirExpression();
+ String[] keys = new String[partitionFieldList.size()];
+ String[] values = new String[partitionFieldList.size()];
+ for (int i = 0; i < partitionFieldList.size(); i++) {
+ keys[i] = "k" + i;
+ values[i] = "v" + i;
+ }
+ List<String> vals = new ArrayList<>(partitionFieldsIndexInRow.size());
+ String partitionDir;
+ if (StringUtils.isBlank(partitionDirExpression)) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
+ stringBuilder.append(partitionFieldList.get(i))
+ .append("=")
+
.append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)])
+ .append("/");
+
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+ }
+ partitionDir = stringBuilder.toString();
+ } else {
+ Map<String, String> valueMap = new
HashMap<>(partitionFieldList.size() * 2);
+ for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
+ valueMap.put(keys[i], partitionFieldList.get(i));
+ valueMap.put(values[i],
seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+ }
+ partitionDir =
VariablesSubstitute.substitute(partitionDirExpression, valueMap);
+ }
+ partitionDirAndValuesMap.put(partitionDir, vals);
+ return partitionDirAndValuesMap;
+ }
+
+ /**
+ * use transaction id generate file name
+ *
+ * @param transactionId transaction id
+ * @return file name
+ */
+ @Override
+ public String generateFileName(String transactionId) {
+ String fileNameExpression = textFileSinkConfig.getFileNameExpression();
+ FileFormat fileFormat = textFileSinkConfig.getFileFormat();
+ if (StringUtils.isBlank(fileNameExpression)) {
+ return transactionId + fileFormat.getSuffix();
+ }
+ String timeFormat = textFileSinkConfig.getFileNameTimeFormat();
+ DateTimeFormatter df = DateTimeFormatter.ofPattern(timeFormat);
+ String formattedDate = df.format(ZonedDateTime.now());
+ Map<String, String> valuesMap = new HashMap<>();
+ valuesMap.put(Constants.UUID, UUID.randomUUID().toString());
+ valuesMap.put(Constants.NOW, formattedDate);
+ valuesMap.put(timeFormat, formattedDate);
+ valuesMap.put(Constant.TRANSACTION_EXPRESSION, transactionId);
+ String substitute = VariablesSubstitute.substitute(fileNameExpression,
valuesMap);
+ return substitute + fileFormat.getSuffix();
+ }
+
+ /**
+ * prepare commit operation
+ *
+ * @return the file commit information
+ */
+ @Override
+ public Optional<FileCommitInfo2> prepareCommit() {
+ this.finishAndCloseFile();
+ Map<String, String> commitMap = new HashMap<>(this.needMoveFiles);
+ Map<String, List<String>> copyMap =
this.partitionDirAndValuesMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> new
ArrayList<>(e.getValue())));
+ return Optional.of(new FileCommitInfo2(commitMap, copyMap,
transactionDirectory));
+ }
+
+ /**
+ * abort prepare commit operation
+ */
+ @Override
+ public void abortPrepare() {
+ abortPrepare(transactionId);
+ }
+
+ /**
+ * abort prepare commit operation using transaction directory
+ * @param transactionId transaction id
+ */
+ public void abortPrepare(String transactionId) {
+ try {
+ FileSystemUtils.deleteFile(getTransactionDir(transactionId));
+ } catch (IOException e) {
+ throw new RuntimeException("abort transaction " + transactionId +
" error.", e);
+ }
+ }
+
+ /**
+ * when a checkpoint completed, file connector should begin a new
transaction and generate new transaction id
+ * @param checkpointId checkpoint id
+ */
+ public void beginTransaction(Long checkpointId) {
+ this.transactionId = "T" + Constant.TRANSACTION_ID_SPLIT + jobId +
Constant.TRANSACTION_ID_SPLIT + subTaskIndex + Constant.TRANSACTION_ID_SPLIT +
checkpointId;
+ this.transactionDirectory = getTransactionDir(this.transactionId);
+ this.needMoveFiles = new HashMap<>();
+ this.partitionDirAndValuesMap = new HashMap<>();
+ this.beingWrittenFile = new HashMap<>();
+ }
+
+ /**
+ * get transaction ids from file sink states
+ * @param fileStates file sink states
+ * @return transaction ids
+ */
+ public List<String> getTransactionIdFromStates(List<FileSinkState2>
fileStates) {
+ String[] pathSegments = new String[]{textFileSinkConfig.getPath(),
Constant.SEATUNNEL, jobId};
+ String jobDir = String.join(File.separator, pathSegments) + "/";
+ try {
+ List<String> transactionDirList =
FileSystemUtils.dirList(jobDir).stream().map(Path::toString).collect(Collectors.toList());
+ return transactionDirList.stream().map(dir ->
dir.replaceAll(jobDir, "")).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * when a checkpoint was triggered, snapshot the state of connector
+ *
+ * @param checkpointId checkpointId
+ * @return the list of states
+ */
+ @Override
+ public List<FileSinkState2> snapshotState(long checkpointId) {
+ ArrayList<FileSinkState2> fileState = Lists.newArrayList(new
FileSinkState2(this.transactionId, this.checkpointId));
+ this.checkpointId = checkpointId;
+ this.beginTransaction(checkpointId);
+ return fileState;
+ }
+
+ /**
+ * using transaction id generate transaction directory
+ * @param transactionId transaction id
+ * @return transaction directory
+ */
+ private String getTransactionDir(@NonNull String transactionId) {
+ String[] strings = new String[]{textFileSinkConfig.getTmpPath(),
Constant.SEATUNNEL, jobId, transactionId};
+ return String.join(File.separator, strings);
+ }
+
+ public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow
seaTunnelRow) {
+ Map<String, List<String>> dataPartitionDirAndValuesMap =
generatorPartitionDir(seaTunnelRow);
+ String beingWrittenFileKey =
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
+ // get filePath from beingWrittenFile
+ String beingWrittenFilePath =
beingWrittenFile.get(beingWrittenFileKey);
+ if (beingWrittenFilePath != null) {
+ return beingWrittenFilePath;
+ } else {
+ String[] pathSegments = new String[]{transactionDirectory,
beingWrittenFileKey, generateFileName(transactionId)};
+ String newBeingWrittenFilePath = String.join(File.separator,
pathSegments);
+ beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
+ if
(!Constant.NON_PARTITION.equals(dataPartitionDirAndValuesMap.keySet().toArray()[0].toString())){
+ partitionDirAndValuesMap.putAll(dataPartitionDirAndValuesMap);
+ }
+ return newBeingWrittenFilePath;
+ }
+ }
+
+ public String getTargetLocation(@NonNull String seaTunnelFilePath) {
+ String tmpPath = seaTunnelFilePath.replaceAll(transactionDirectory,
textFileSinkConfig.getPath());
+ return tmpPath.replaceAll(Constant.NON_PARTITION + File.separator, "");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
new file mode 100644
index 000000000..a5aed5d79
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.NonNull;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonWriteStrategy extends AbstractWriteStrategy {
+ private final byte[] rowDelimiter;
+ private SerializationSchema serializationSchema;
+ private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
+
+ public JsonWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+ super(textFileSinkConfig);
+ this.beingWrittenOutputStream = new HashMap<>();
+ this.rowDelimiter = textFileSinkConfig.getRowDelimiter().getBytes();
+ }
+
+ @Override
+ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ this.serializationSchema = new
JsonSerializationSchema(seaTunnelRowType);
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ FSDataOutputStream fsDataOutputStream =
getOrCreateOutputStream(filePath);
+ try {
+ byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
+ fsDataOutputStream.write(rowBytes);
+ fsDataOutputStream.write(rowDelimiter);
+ } catch (IOException e) {
+ log.error("write data to file {} error", filePath);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void finishAndCloseFile() {
+ beingWrittenOutputStream.forEach((key, value) -> {
+ try {
+ value.flush();
+ } catch (IOException e) {
+ log.error("error when flush file {}", key);
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ value.close();
+ } catch (IOException e) {
+ log.error("error when close output stream {}", key, e);
+ }
+ }
+
+ needMoveFiles.put(key, getTargetLocation(key));
+ });
+ }
+
+ private FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
+ FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
+ if (fsDataOutputStream == null) {
+ try {
+ fsDataOutputStream = FileSystemUtils.getOutputStream(filePath);
+ beingWrittenOutputStream.put(filePath, fsDataOutputStream);
+ } catch (IOException e) {
+ log.error("can not get output file stream");
+ throw new RuntimeException(e);
+ }
+ }
+ return fsDataOutputStream;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
new file mode 100644
index 000000000..875504e9b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+
+import lombok.NonNull;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class OrcWriteStrategy extends AbstractWriteStrategy {
+ private final Map<String, Writer> beingWrittenWriter;
+
+ public OrcWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+ super(textFileSinkConfig);
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ Writer writer = getOrCreateWriter(filePath);
+ TypeDescription schema = buildSchemaWithRowType();
+ VectorizedRowBatch rowBatch = schema.createRowBatch();
+ int i = 0;
+ int row = rowBatch.size++;
+ for (Integer index : sinkColumnsIndexInRow) {
+ Object value = seaTunnelRow.getField(index);
+ ColumnVector vector = rowBatch.cols[i];
+ setColumn(value, vector, row);
+ i++;
+ }
+ try {
+ writer.addRowBatch(rowBatch);
+ rowBatch.reset();
+ } catch (IOException e) {
+ String errorMsg = String.format("Write data to orc file [%s]
error", filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ @Override
+ public void finishAndCloseFile() {
+ this.beingWrittenWriter.forEach((k, v) -> {
+ try {
+ v.close();
+ } catch (IOException e) {
+ String errorMsg = String.format("Close file [%s] orc writer
failed, error msg: [%s]", k, e.getMessage());
+ throw new RuntimeException(errorMsg, e);
+ }
+ needMoveFiles.put(k, getTargetLocation(k));
+ });
+ }
+
+ private Writer getOrCreateWriter(@NonNull String filePath) {
+ Writer writer = this.beingWrittenWriter.get(filePath);
+ if (writer == null) {
+ TypeDescription schema = buildSchemaWithRowType();
+ Path path = new Path(filePath);
+ try {
+ OrcFile.WriterOptions options =
OrcFile.writerOptions(getConfiguration(hadoopConf))
+ .setSchema(schema)
+ // temporarily used snappy
+ .compress(CompressionKind.SNAPPY)
+ // use orc version 0.12
+ .version(OrcFile.Version.V_0_12)
+ .overwrite(true);
+ Writer newWriter = OrcFile.createWriter(path, options);
+ this.beingWrittenWriter.put(filePath, newWriter);
+ return newWriter;
+ } catch (IOException e) {
+ String errorMsg = String.format("Get orc writer for file [%s]
error", filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+ return writer;
+ }
+
+ private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
+ if (BasicType.BOOLEAN_TYPE.equals(type)) {
+ return TypeDescription.createBoolean();
+ }
+ if (BasicType.SHORT_TYPE.equals(type)) {
+ return TypeDescription.createShort();
+ }
+ if (BasicType.INT_TYPE.equals(type)) {
+ return TypeDescription.createInt();
+ }
+ if (BasicType.LONG_TYPE.equals(type)) {
+ return TypeDescription.createLong();
+ }
+ if (BasicType.FLOAT_TYPE.equals(type)) {
+ return TypeDescription.createFloat();
+ }
+ if (BasicType.DOUBLE_TYPE.equals(type)) {
+ return TypeDescription.createDouble();
+ }
+ if (BasicType.BYTE_TYPE.equals(type)) {
+ return TypeDescription.createByte();
+ }
+ return TypeDescription.createString();
+ }
+
+ private TypeDescription buildSchemaWithRowType() {
+ TypeDescription schema = TypeDescription.createStruct();
+ for (Integer i : sinkColumnsIndexInRow) {
+ TypeDescription fieldType =
buildFieldWithRowType(seaTunnelRowType.getFieldType(i));
+ schema.addField(seaTunnelRowType.getFieldName(i), fieldType);
+ }
+ return schema;
+ }
+
+ private void setColumn(Object value, ColumnVector vector, int row) {
+ if (value == null) {
+ vector.isNull[row] = true;
+ vector.noNulls = false;
+ } else {
+ switch (vector.type) {
+ case LONG:
+ LongColumnVector longVector = (LongColumnVector) vector;
+ setLongColumnVector(value, longVector, row);
+ break;
+ case DOUBLE:
+ DoubleColumnVector doubleColumnVector =
(DoubleColumnVector) vector;
+ setDoubleVector(value, doubleColumnVector, row);
+ break;
+ case BYTES:
+ BytesColumnVector bytesColumnVector = (BytesColumnVector)
vector;
+ setByteColumnVector(value, bytesColumnVector, row);
+ break;
+ default:
+ throw new RuntimeException("Unexpected ColumnVector
subtype");
+ }
+ }
+ }
+
+ private void setLongColumnVector(Object value, LongColumnVector
longVector, int row) {
+ if (value instanceof Boolean) {
+ Boolean bool = (Boolean) value;
+ longVector.vector[row] = (bool.equals(Boolean.TRUE)) ?
Long.valueOf(1) : Long.valueOf(0);
+ } else if (value instanceof Integer) {
+ longVector.vector[row] = ((Integer) value).longValue();
+ } else if (value instanceof Long) {
+ longVector.vector[row] = (Long) value;
+ } else if (value instanceof BigInteger) {
+ BigInteger bigInt = (BigInteger) value;
+ longVector.vector[row] = bigInt.longValue();
+ } else {
+ throw new RuntimeException("Long or Integer type expected for
field");
+ }
+ }
+
+ private void setByteColumnVector(Object value, BytesColumnVector
bytesColVector, int rowNum) {
+ if (value instanceof byte[] || value instanceof String) {
+ byte[] byteVec;
+ if (value instanceof String) {
+ String strVal = (String) value;
+ byteVec = strVal.getBytes(StandardCharsets.UTF_8);
+ } else {
+ byteVec = (byte[]) value;
+ }
+ bytesColVector.setRef(rowNum, byteVec, 0, byteVec.length);
+ } else {
+ throw new RuntimeException("byte[] or String type expected for
field ");
+ }
+ }
+
+ private void setDoubleVector(Object value, DoubleColumnVector
doubleVector, int rowNum) {
+ if (value instanceof Double) {
+ doubleVector.vector[rowNum] = (Double) value;
+ } else if (value instanceof Float) {
+ Float floatValue = (Float) value;
+ doubleVector.vector[rowNum] = floatValue.doubleValue();
+ } else {
+ throw new RuntimeException("Double or Float type expected for
field ");
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
new file mode 100644
index 000000000..3fccdfa01
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+
+import lombok.NonNull;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParquetWriteStrategy extends AbstractWriteStrategy {
+ private final Map<String, ParquetWriter<GenericRecord>> beingWrittenWriter;
+
+ public ParquetWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+ super(textFileSinkConfig);
+ this.beingWrittenWriter = new HashMap<>();
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
+ Schema schema = buildSchemaWithRowType();
+ GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
+ sinkColumnsIndexInRow.forEach(index ->
recordBuilder.set(seaTunnelRowType.getFieldName(index),
seaTunnelRow.getField(index)));
+ GenericData.Record record = recordBuilder.build();
+ try {
+ writer.write(record);
+ } catch (IOException e) {
+ String errorMsg = String.format("Write data to file [%s] error",
filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ @Override
+ public void finishAndCloseFile() {
+ this.beingWrittenWriter.forEach((k, v) -> {
+ try {
+ v.close();
+ } catch (IOException e) {
+ String errorMsg = String.format("Close file [%s] parquet
writer failed, error msg: [%s]", k, e.getMessage());
+ throw new RuntimeException(errorMsg, e);
+ }
+ needMoveFiles.put(k, getTargetLocation(k));
+ });
+ }
+
+ private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String
filePath) {
+ ParquetWriter<GenericRecord> writer =
this.beingWrittenWriter.get(filePath);
+ if (writer == null) {
+ Schema schema = buildSchemaWithRowType();
+ Path path = new Path(filePath);
+ try {
+ HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path,
getConfiguration(hadoopConf));
+ ParquetWriter<GenericRecord> newWriter =
AvroParquetWriter.<GenericRecord>builder(outputFile)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ // use parquet v1 to improve compatibility
+
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+ // Temporarily use snappy compress
+ // I think we can use the compress option in config to
control this
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withSchema(schema)
+ .build();
+ this.beingWrittenWriter.put(filePath, newWriter);
+ return newWriter;
+ } catch (IOException e) {
+ String errorMsg = String.format("Get parquet writer for file
[%s] error", filePath);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+ return writer;
+ }
+
+ private Schema buildSchemaWithRowType() {
+ ArrayList<Schema.Field> fields = new ArrayList<>();
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ sinkColumnsIndexInRow.forEach(index -> {
+ if (BasicType.BOOLEAN_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index],
Schema.create(Schema.Type.BOOLEAN), null, null);
+ fields.add(field);
+ } else if (BasicType.SHORT_TYPE.equals(fieldTypes[index]) ||
BasicType.INT_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index],
Schema.create(Schema.Type.INT), null, null);
+ fields.add(field);
+ } else if (BasicType.LONG_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index],
Schema.create(Schema.Type.LONG), null, null);
+ fields.add(field);
+ } else if (BasicType.FLOAT_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index],
Schema.create(Schema.Type.FLOAT), null, null);
+ fields.add(field);
+ } else if (BasicType.DOUBLE_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index],
Schema.create(Schema.Type.DOUBLE), null, null);
+ fields.add(field);
+ } else if (BasicType.STRING_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index],
Schema.create(Schema.Type.STRING), null, null);
+ fields.add(field);
+ } else if (BasicType.BYTE_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index],
Schema.create(Schema.Type.BYTES), null, null);
+ fields.add(field);
+ } else if (BasicType.VOID_TYPE.equals(fieldTypes[index])) {
+ Schema.Field field = new Schema.Field(fieldNames[index],
Schema.create(Schema.Type.NULL), null, null);
+ fields.add(field);
+ }
+ });
+ return Schema.createRecord("SeatunnelRecord",
+ "The record generated by seatunnel file connector",
+ "org.apache.parquet.avro",
+ false,
+ fields);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
new file mode 100644
index 000000000..04fb7679c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+
+import lombok.NonNull;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class TextWriteStrategy extends AbstractWriteStrategy {
+ private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
+ private final String fieldDelimiter;
+ private final String rowDelimiter;
+
+ public TextWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+ super(textFileSinkConfig);
+ this.beingWrittenOutputStream = new HashMap<>();
+ this.fieldDelimiter = textFileSinkConfig.getFieldDelimiter();
+ this.rowDelimiter = textFileSinkConfig.getRowDelimiter();
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ FSDataOutputStream fsDataOutputStream =
getOrCreateOutputStream(filePath);
+ String line = transformRowToLine(seaTunnelRow);
+ try {
+ fsDataOutputStream.write(line.getBytes());
+ fsDataOutputStream.write(rowDelimiter.getBytes());
+ } catch (IOException e) {
+ log.error("write data to file {} error", filePath);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void finishAndCloseFile() {
+ beingWrittenOutputStream.forEach((key, value) -> {
+ try {
+ value.flush();
+ } catch (IOException e) {
+ log.error("error when flush file {}", key);
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ value.close();
+ } catch (IOException e) {
+ log.error("error when close output stream {}", key);
+ }
+ }
+ needMoveFiles.put(key, getTargetLocation(key));
+ });
+ }
+
+ private FSDataOutputStream getOrCreateOutputStream(@NonNull String
filePath) {
+ FSDataOutputStream fsDataOutputStream =
beingWrittenOutputStream.get(filePath);
+ if (fsDataOutputStream == null) {
+ try {
+ fsDataOutputStream = FileSystemUtils.getOutputStream(filePath);
+ beingWrittenOutputStream.put(filePath, fsDataOutputStream);
+ } catch (IOException e) {
+ log.error("can not get output file stream");
+ throw new RuntimeException(e);
+ }
+ }
+ return fsDataOutputStream;
+ }
+
+ private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) {
+ return this.sinkColumnsIndexInRow.stream().map(index ->
seaTunnelRow.getFields()[index] == null ? "" :
seaTunnelRow.getFields()[index].toString()).collect(Collectors.joining(fieldDelimiter));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/Transaction.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/Transaction.java
new file mode 100644
index 000000000..86f267ce6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/Transaction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+public interface Transaction extends Serializable {
+ /**
+ * prepare commit operation
+ * @return the file commit information
+ */
+ Optional<FileCommitInfo2> prepareCommit();
+
+ /**
+ * abort prepare commit operation
+ */
+ void abortPrepare();
+
+ /**
+ * abort prepare commit operation using transaction id
+ * @param transactionId transaction id
+ */
+ void abortPrepare(String transactionId);
+
+ /**
+ * when a checkpoint was triggered, snapshot the state of connector
+ * @param checkpointId checkpointId
+ * @return the list of states
+ */
+ List<FileSinkState2> snapshotState(long checkpointId);
+
+ /**
+ * get dirty transaction ids from file sink states
+ * @param fileSinkState file sink states
+ * @return transaction ids
+ */
+ List<String> getTransactionIdFromStates(List<FileSinkState2>
fileSinkState);
+
+ /**
+ * when a checkpoint triggered, file sink should begin a new transaction
+ * @param checkpointId checkpoint id
+ */
+ void beginTransaction(Long checkpointId);
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
new file mode 100644
index 000000000..aecc80b0d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public interface WriteStrategy extends Transaction, Serializable {
+ /**
+ * init hadoop conf
+ * @param conf hadoop conf
+ */
+ void init(HadoopConf conf, String jobId, int subTaskIndex);
+
+ /**
+ * use hadoop conf generate hadoop configuration
+ * @param conf hadoop conf
+ * @return Configuration
+ */
+ Configuration getConfiguration(HadoopConf conf);
+
+ /**
+ * write seaTunnelRow to target datasource
+ * @param seaTunnelRow seaTunnelRow
+ * @throws Exception Exceptions
+ */
+ void write(SeaTunnelRow seaTunnelRow) throws Exception;
+
+ /**
+ * set seaTunnelRowTypeInfo in writer
+ * @param seaTunnelRowType seaTunnelRowType
+ */
+ void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
+
+ /**
+ * use seaTunnelRow generate partition directory
+ * @param seaTunnelRow seaTunnelRow
+ * @return the map of partition directory
+ */
+ Map<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow);
+
+ /**
+ * use transaction id generate file name
+ * @param transactionId transaction id
+ * @return file name
+ */
+ String generateFileName(String transactionId);
+
+ /**
+ * when a transaction is triggered, release resources
+ */
+ void finishAndCloseFile();
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
new file mode 100644
index 000000000..92467ca92
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class WriteStrategyFactory {
+
+ private WriteStrategyFactory() {}
+
+ public static WriteStrategy of(String fileType, TextFileSinkConfig
textFileSinkConfig) {
+ try {
+ FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
+ return fileFormat.getWriteStrategy(textFileSinkConfig);
+ } catch (IllegalArgumentException e) {
+ String errorMsg = String.format("File sink connector not support
this file type [%s], please check your config", fileType);
+ throw new RuntimeException(errorMsg, e);
+ }
+ }
+
+ public static WriteStrategy of(FileFormat fileFormat, TextFileSinkConfig
textFileSinkConfig) {
+ return fileFormat.getWriteStrategy(textFileSinkConfig);
+ }
+}