hailin0 commented on code in PR #2555:
URL: 
https://github.com/apache/incubator-seatunnel/pull/2555#discussion_r958254940


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo2.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Data

Review Comment:
   @Data include @Setter annotation
   
   If not allowed modify field value, please define as final



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.util;
+
+import lombok.NonNull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileSystemUtils {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSystemUtils.class);
+
+    public static final int WRITE_BUFFER_SIZE = 2048;
+
+    public static Configuration CONF;
+
+    public static FileSystem getHdfsFs(@NonNull String path) throws 
IOException {
+        return FileSystem.get(URI.create(path), CONF);
+    }
+
+    public static FSDataOutputStream getOutputStream(@NonNull String 
outFilePath) throws IOException {
+        FileSystem hdfsFs = getHdfsFs(outFilePath);
+        Path path = new Path(outFilePath);
+        return hdfsFs.create(path, true, WRITE_BUFFER_SIZE);
+    }
+
+    public static void createFile(@NonNull String filePath) throws IOException 
{
+        FileSystem hdfsFs = getHdfsFs(filePath);
+        Path path = new Path(filePath);
+        if (!hdfsFs.createNewFile(path)) {
+            throw new IOException("create file " + filePath + " error");
+        }
+    }
+
+    public static void deleteFile(@NonNull String file) throws IOException {
+        FileSystem hdfsFs = getHdfsFs(file);
+        if (!hdfsFs.delete(new Path(file), true)) {
+            throw new IOException("delete file " + file + " error");
+        }
+    }
+
+    /**
+     * rename file
+     *
+     * @param oldName     old file name
+     * @param newName     target file name
+     * @param rmWhenExist if this is true, we will delete the target file when 
it already exists
+     * @throws IOException throw IOException
+     */
+    public static void renameFile(@NonNull String oldName, @NonNull String 
newName, boolean rmWhenExist) throws IOException {
+        FileSystem hdfsFs = getHdfsFs(newName);
+        LOGGER.info("begin rename file oldName :[" + oldName + "] to newName 
:[" + newName + "]");
+
+        Path oldPath = new Path(oldName);
+        Path newPath = new Path(newName);
+        if (rmWhenExist) {
+            if (fileExist(newName) && fileExist(oldName)) {
+                hdfsFs.delete(newPath, true);

Review Comment:
   add `log.info("Delete already file: {}", newPath)`



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.utils.VariablesSubstitute;
+import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public abstract class AbstractWriteStrategy implements WriteStrategy {
+    protected final Logger log = LoggerFactory.getLogger(this.getClass());
+    protected final TextFileSinkConfig textFileSinkConfig;
+    protected final List<Integer> sinkColumnsIndexInRow;
+    protected String jobId;
+    protected int subTaskIndex;
+    protected HadoopConf hadoopConf;
+    protected String transactionId;
+    protected String transactionDirectory;
+    protected Map<String, String> needMoveFiles;
+    protected Map<String, String> beingWrittenFile;
+    private Map<String, List<String>> partitionDirAndValuesMap;
+    protected SeaTunnelRowType seaTunnelRowType;
+    protected Long checkpointId = 1L;
+
+    public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+        this.textFileSinkConfig = textFileSinkConfig;
+        this.sinkColumnsIndexInRow = 
textFileSinkConfig.getSinkColumnsIndexInRow();
+    }
+
+    /**
+     * init hadoop conf
+     *
+     * @param conf hadoop conf
+     */
+    @Override
+    public void init(HadoopConf conf, String jobId, int subTaskIndex) {
+        this.hadoopConf = conf;
+        this.jobId = jobId;
+        this.subTaskIndex = subTaskIndex;
+        FileSystemUtils.CONF = getConfiguration(hadoopConf);
+        this.beginTransaction(this.checkpointId);
+    }
+
+    /**
+     * use hadoop conf generate hadoop configuration
+     *
+     * @param conf hadoop conf
+     * @return Configuration
+     */
+    @Override
+    public Configuration getConfiguration(HadoopConf conf) {
+        Configuration configuration = new Configuration();
+        if (hadoopConf != null) {
+            configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
+            configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
+            hadoopConf.setExtraOptionsForConfiguration(configuration);
+        }
+        return configuration;
+    }
+
+    /**
+     * set seaTunnelRowTypeInfo in writer
+     *
+     * @param seaTunnelRowType seaTunnelRowType
+     */
+    @Override
+    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    /**
+     * use seaTunnelRow generate partition directory
+     *
+     * @param seaTunnelRow seaTunnelRow
+     * @return the map of partition directory
+     */
+    @Override
+    public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow 
seaTunnelRow) {
+        List<String> partitionFieldList = 
textFileSinkConfig.getPartitionFieldList();
+        List<Integer> partitionFieldsIndexInRow = 
textFileSinkConfig.getPartitionFieldsIndexInRow();
+        String partitionDirExpression = 
textFileSinkConfig.getPartitionDirExpression();
+        String[] keys = new String[partitionFieldList.size()];
+        String[] values = new String[partitionFieldList.size()];
+        for (int i = 0; i < partitionFieldList.size(); i++) {
+            keys[i] = "k" + i;
+            values[i] = "v" + i;
+        }
+        Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>(1);
+        if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
+            partitionDirAndValuesMap.put(Constant.NON_PARTITION, null);
+            return partitionDirAndValuesMap;
+        }
+        List<String> vals = new ArrayList<>(partitionFieldsIndexInRow.size());
+        String partitionDir;
+        if (StringUtils.isBlank(partitionDirExpression)) {
+            StringBuilder stringBuilder = new StringBuilder();
+            for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
+                stringBuilder.append(partitionFieldList.get(i))
+                        .append("=")
+                        
.append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)])
+                        .append("/");
+                
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+            }
+            partitionDir = stringBuilder.toString();
+        } else {
+            Map<String, String> valueMap = new 
HashMap<>(partitionFieldList.size() * 2);
+            for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
+                valueMap.put(keys[i], partitionFieldList.get(i));
+                valueMap.put(values[i], 
seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+                
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+            }
+            partitionDir = 
VariablesSubstitute.substitute(partitionDirExpression, valueMap);
+        }
+        partitionDirAndValuesMap.put(partitionDir, vals);
+        return partitionDirAndValuesMap;
+    }
+
+    /**
+     * use transaction id generate file name
+     *
+     * @param transactionId transaction id
+     * @return file name
+     */
+    @Override
+    public String generateFileName(String transactionId) {
+        String fileNameExpression = textFileSinkConfig.getFileNameExpression();
+        FileFormat fileFormat = textFileSinkConfig.getFileFormat();
+        if (StringUtils.isBlank(fileNameExpression)) {
+            return transactionId + fileFormat.getSuffix();
+        }
+        String timeFormat = textFileSinkConfig.getFileNameTimeFormat();
+        DateTimeFormatter df = DateTimeFormatter.ofPattern(timeFormat);
+        String formattedDate = df.format(ZonedDateTime.now());
+        Map<String, String> valuesMap = new HashMap<>();
+        valuesMap.put(Constants.UUID, UUID.randomUUID().toString());
+        valuesMap.put(Constants.NOW, formattedDate);
+        valuesMap.put(timeFormat, formattedDate);
+        valuesMap.put(Constant.TRANSACTION_EXPRESSION, transactionId);
+        String substitute = VariablesSubstitute.substitute(fileNameExpression, 
valuesMap);
+        return substitute + fileFormat.getSuffix();
+    }
+
+    /**
+     * prepare commit operation
+     *
+     * @return the file commit information
+     */
+    @Override
+    public Optional<FileCommitInfo2> prepareCommit() {
+        this.finishAndCloseFile();
+        Map<String, String> commitMap = new HashMap<>(this.needMoveFiles);
+        Map<String, List<String>> copyMap = 
this.partitionDirAndValuesMap.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
ArrayList<>(e.getValue())));
+        return Optional.of(new FileCommitInfo2(commitMap, copyMap, 
transactionDirectory));
+    }
+
+    /**
+     * abort prepare commit operation
+     */
+    @Override
+    public void abortPrepare() {
+        abortPrepare(transactionId);
+    }
+
+    /**
+     * abort prepare commit operation using transaction directory
+     * @param transactionId transaction id
+     */
+    public void abortPrepare(String transactionId) {
+        try {
+            FileSystemUtils.deleteFile(getTransactionDir(transactionId));
+        } catch (IOException e) {
+            throw new RuntimeException("abort transaction " + transactionId + 
" error.", e);
+        }
+    }
+
+    /**
+     * when a checkpoint completed, file connector should begin a new 
transaction and generate new transaction id
+     * @param checkpointId checkpoint id
+     */
+    public void beginTransaction(Long checkpointId) {
+        this.transactionId = "T" + Constant.TRANSACTION_ID_SPLIT + jobId + 
Constant.TRANSACTION_ID_SPLIT + subTaskIndex + Constant.TRANSACTION_ID_SPLIT + 
checkpointId;
+        this.transactionDirectory = getTransactionDir(this.transactionId);
+        this.needMoveFiles = new HashMap<>();
+        this.partitionDirAndValuesMap = new HashMap<>();
+        this.beingWrittenFile = new HashMap<>();
+    }
+
+    /**
+     * get transaction ids from file sink states
+     * @param fileStates file sink states
+     * @return transaction ids
+     */
+    public List<String> getTransactionIdFromStates(List<FileSinkState2> 
fileStates) {
+        String[] strings = new String[]{textFileSinkConfig.getPath(), 
Constant.SEATUNNEL, jobId};

Review Comment:
   strings -> pathSegments
   
   Use understandable names



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.utils.VariablesSubstitute;
+import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public abstract class AbstractWriteStrategy implements WriteStrategy {
+    protected final Logger log = LoggerFactory.getLogger(this.getClass());
+    protected final TextFileSinkConfig textFileSinkConfig;
+    protected final List<Integer> sinkColumnsIndexInRow;
+    protected String jobId;
+    protected int subTaskIndex;
+    protected HadoopConf hadoopConf;
+    protected String transactionId;
+    protected String transactionDirectory;
+    protected Map<String, String> needMoveFiles;
+    protected Map<String, String> beingWrittenFile;
+    private Map<String, List<String>> partitionDirAndValuesMap;
+    protected SeaTunnelRowType seaTunnelRowType;
+    protected Long checkpointId = 1L;
+
+    public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+        this.textFileSinkConfig = textFileSinkConfig;
+        this.sinkColumnsIndexInRow = 
textFileSinkConfig.getSinkColumnsIndexInRow();
+    }
+
+    /**
+     * init hadoop conf
+     *
+     * @param conf hadoop conf
+     */
+    @Override
+    public void init(HadoopConf conf, String jobId, int subTaskIndex) {
+        this.hadoopConf = conf;
+        this.jobId = jobId;
+        this.subTaskIndex = subTaskIndex;
+        FileSystemUtils.CONF = getConfiguration(hadoopConf);
+        this.beginTransaction(this.checkpointId);
+    }
+
+    /**
+     * use hadoop conf generate hadoop configuration
+     *
+     * @param conf hadoop conf
+     * @return Configuration
+     */
+    @Override
+    public Configuration getConfiguration(HadoopConf conf) {
+        Configuration configuration = new Configuration();
+        if (hadoopConf != null) {
+            configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
+            configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
+            hadoopConf.setExtraOptionsForConfiguration(configuration);
+        }
+        return configuration;
+    }
+
+    /**
+     * set seaTunnelRowTypeInfo in writer
+     *
+     * @param seaTunnelRowType seaTunnelRowType
+     */
+    @Override
+    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    /**
+     * use seaTunnelRow generate partition directory
+     *
+     * @param seaTunnelRow seaTunnelRow
+     * @return the map of partition directory
+     */
+    @Override
+    public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow 
seaTunnelRow) {
+        List<String> partitionFieldList = 
textFileSinkConfig.getPartitionFieldList();
+        List<Integer> partitionFieldsIndexInRow = 
textFileSinkConfig.getPartitionFieldsIndexInRow();
+        String partitionDirExpression = 
textFileSinkConfig.getPartitionDirExpression();
+        String[] keys = new String[partitionFieldList.size()];
+        String[] values = new String[partitionFieldList.size()];
+        for (int i = 0; i < partitionFieldList.size(); i++) {
+            keys[i] = "k" + i;
+            values[i] = "v" + i;
+        }

Review Comment:
   move to after line#136



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+
+import lombok.NonNull;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class OrcWriteStrategy extends AbstractWriteStrategy {
+    private final Map<String, Writer> beingWrittenWriter;
+
+    public OrcWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+        super(textFileSinkConfig);
+        this.beingWrittenWriter = new HashMap<>();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        Writer writer = getOrCreateWriter(filePath);
+        TypeDescription schema = buildSchemaWithRowType();
+        VectorizedRowBatch rowBatch = schema.createRowBatch();
+        int i = 0;
+        int row = rowBatch.size++;
+        for (Integer index : sinkColumnsIndexInRow) {
+            Object value = seaTunnelRow.getField(index);
+            ColumnVector vector = rowBatch.cols[i];
+            setColumn(value, vector, row);
+            i++;
+        }
+        try {
+            writer.addRowBatch(rowBatch);
+            rowBatch.reset();
+        } catch (IOException e) {
+            String errorMsg = String.format("Write data to orc file [%s] 
error", filePath);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseFile() {
+        this.beingWrittenWriter.forEach((k, v) -> {
+            try {
+                v.close();
+            } catch (IOException e) {
+                String errorMsg = String.format("Close file [%s] orc writer 
failed, error msg: [%s]", k, e.getMessage());
+                throw new RuntimeException(errorMsg, e);
+            }
+            needMoveFiles.put(k, getTargetLocation(k));
+        });
+    }
+
+    private Writer getOrCreateWriter(@NonNull String filePath) {
+        Writer writer = this.beingWrittenWriter.get(filePath);
+        if (writer == null) {
+            TypeDescription schema = buildSchemaWithRowType();
+            Path path = new Path(filePath);
+            try {
+                OrcFile.WriterOptions options = 
OrcFile.writerOptions(getConfiguration(hadoopConf))
+                        .setSchema(schema)
+                        // temporarily used snappy
+                        .compress(CompressionKind.SNAPPY)
+                        // use orc version 0.12
+                        .version(OrcFile.Version.V_0_12)
+                        .overwrite(true);
+                Writer newWriter = OrcFile.createWriter(path, options);
+                this.beingWrittenWriter.put(filePath, newWriter);
+                return newWriter;
+            } catch (IOException e) {
+                String errorMsg = String.format("Get orc writer for file [%s] 
error", filePath);
+                throw new RuntimeException(errorMsg, e);
+            }
+        }
+        return writer;
+    }
+
+    private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
+        if (BasicType.BOOLEAN_TYPE.equals(type)) {
+            return TypeDescription.createBoolean();
+        }
+        if (BasicType.SHORT_TYPE.equals(type)) {
+            return TypeDescription.createShort();
+        }
+        if (BasicType.INT_TYPE.equals(type)) {
+            return TypeDescription.createInt();
+        }
+        if (BasicType.LONG_TYPE.equals(type)) {
+            return TypeDescription.createLong();
+        }
+        if (BasicType.FLOAT_TYPE.equals(type)) {
+            return TypeDescription.createFloat();
+        }
+        if (BasicType.DOUBLE_TYPE.equals(type)) {
+            return TypeDescription.createDouble();
+        }
+        if (BasicType.BYTE_TYPE.equals(type)) {
+            return TypeDescription.createByte();
+        }
+        return TypeDescription.createString();
+    }
+
+    private TypeDescription buildSchemaWithRowType() {
+        TypeDescription schema = TypeDescription.createStruct();
+        for (Integer i : sinkColumnsIndexInRow) {
+            TypeDescription fieldType = 
buildFieldWithRowType(seaTunnelRowType.getFieldType(i));
+            schema.addField(seaTunnelRowType.getFieldName(i), fieldType);
+        }
+        return schema;
+    }
+
+    private void setColumn(Object value, ColumnVector vector, int row) {
+        if (value == null) {
+            vector.isNull[row] = true;
+            vector.noNulls = false;
+        } else {
+            switch (vector.type) {
+                case LONG:
+                    LongColumnVector longVector = (LongColumnVector) vector;
+                    setLongColumnVector(value, longVector, row);
+                    break;
+                case DOUBLE:
+                    DoubleColumnVector doubleColumnVector = 
(DoubleColumnVector) vector;
+                    setDoubleVector(value, doubleColumnVector, row);
+                    break;
+                case BYTES:
+                    BytesColumnVector bytesColumnVector = (BytesColumnVector) 
vector;
+                    setByteColumnVector(value, bytesColumnVector, row);
+                    break;
+                default:
+                    throw new RuntimeException("Unexpected ColumnVector 
subtype");
+            }
+        }
+    }
+
+    private void setLongColumnVector(Object value, LongColumnVector 
longVector, int row) {
+        if (value instanceof Boolean) {
+            Boolean bool = (Boolean) value;
+            longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? 
Long.valueOf(1) : Long.valueOf(0);
+        }  else if (value instanceof Integer) {
+            longVector.vector[row] = (Integer) value;

Review Comment:
   `(Integer) value` -> `((Long) value).longValue()`



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.NonNull;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonWriteStrategy extends AbstractWriteStrategy {
+    private final byte[] rowDelimiter;
+    private SerializationSchema serializationSchema;
+    private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
+
+    public JsonWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+        super(textFileSinkConfig);
+        this.beingWrittenOutputStream = new HashMap<>();
+        this.rowDelimiter = textFileSinkConfig.getRowDelimiter().getBytes();
+    }
+
+    @Override
+    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+        this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        FSDataOutputStream fsDataOutputStream = 
getOrCreateOutputStream(filePath);
+        try {
+            byte[] rowBytes = serializationSchema.serialize(seaTunnelRow);
+            fsDataOutputStream.write(rowBytes);
+            fsDataOutputStream.write(rowDelimiter);
+        } catch (IOException e) {
+            log.error("write data to file {} error", filePath);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseFile() {
+        beingWrittenOutputStream.forEach((key, value) -> {
+            try {
+                value.flush();
+            } catch (IOException e) {
+                log.error("error when flush file {}", key);
+                throw new RuntimeException(e);
+            } finally {
+                try {
+                    value.close();
+                } catch (IOException e) {
+                    log.error("error when close output stream {}", key);

Review Comment:
   Use `log.error(..., key, e)`?



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class WriteStrategyFactory {
+
+    private WriteStrategyFactory() {}
+
+    public static WriteStrategy of(String fileType, TextFileSinkConfig 
textFileSinkConfig) {
+        try {
+            FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
+            return fileFormat.getWriteStrategy(textFileSinkConfig);
+        } catch (IllegalArgumentException e) {
+            String errorMsg = String.format("File sink connector not support 
this file type [%s], please check your config", fileType);
+            throw new RuntimeException(errorMsg);

Review Comment:
   use `throw new RuntimeException(..., e)`



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.utils.VariablesSubstitute;
+import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState2;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
+
+import com.google.common.collect.Lists;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public abstract class AbstractWriteStrategy implements WriteStrategy {
+    protected final Logger log = LoggerFactory.getLogger(this.getClass());
+    protected final TextFileSinkConfig textFileSinkConfig;
+    protected final List<Integer> sinkColumnsIndexInRow;
+    protected String jobId;
+    protected int subTaskIndex;
+    protected HadoopConf hadoopConf;
+    protected String transactionId;
+    protected String transactionDirectory;
+    protected Map<String, String> needMoveFiles;
+    protected Map<String, String> beingWrittenFile;
+    private Map<String, List<String>> partitionDirAndValuesMap;
+    protected SeaTunnelRowType seaTunnelRowType;
+    protected Long checkpointId = 1L;
+
+    public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
+        this.textFileSinkConfig = textFileSinkConfig;
+        this.sinkColumnsIndexInRow = 
textFileSinkConfig.getSinkColumnsIndexInRow();
+    }
+
+    /**
+     * init hadoop conf
+     *
+     * @param conf hadoop conf
+     */
+    @Override
+    public void init(HadoopConf conf, String jobId, int subTaskIndex) {
+        this.hadoopConf = conf;
+        this.jobId = jobId;
+        this.subTaskIndex = subTaskIndex;
+        FileSystemUtils.CONF = getConfiguration(hadoopConf);
+        this.beginTransaction(this.checkpointId);
+    }
+
+    /**
+     * use hadoop conf generate hadoop configuration
+     *
+     * @param conf hadoop conf
+     * @return Configuration
+     */
+    @Override
+    public Configuration getConfiguration(HadoopConf conf) {
+        Configuration configuration = new Configuration();
+        if (hadoopConf != null) {
+            configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
+            configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
+            hadoopConf.setExtraOptionsForConfiguration(configuration);
+        }
+        return configuration;
+    }
+
+    /**
+     * set seaTunnelRowTypeInfo in writer
+     *
+     * @param seaTunnelRowType seaTunnelRowType
+     */
+    @Override
+    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    /**
+     * use seaTunnelRow generate partition directory
+     *
+     * @param seaTunnelRow seaTunnelRow
+     * @return the map of partition directory
+     */
+    @Override
+    public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow 
seaTunnelRow) {
+        List<String> partitionFieldList = 
textFileSinkConfig.getPartitionFieldList();
+        List<Integer> partitionFieldsIndexInRow = 
textFileSinkConfig.getPartitionFieldsIndexInRow();
+        String partitionDirExpression = 
textFileSinkConfig.getPartitionDirExpression();
+        String[] keys = new String[partitionFieldList.size()];
+        String[] values = new String[partitionFieldList.size()];
+        for (int i = 0; i < partitionFieldList.size(); i++) {
+            keys[i] = "k" + i;
+            values[i] = "v" + i;
+        }
+        Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>(1);
+        if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
+            partitionDirAndValuesMap.put(Constant.NON_PARTITION, null);
+            return partitionDirAndValuesMap;
+        }
+        List<String> vals = new ArrayList<>(partitionFieldsIndexInRow.size());
+        String partitionDir;
+        if (StringUtils.isBlank(partitionDirExpression)) {
+            StringBuilder stringBuilder = new StringBuilder();
+            for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
+                stringBuilder.append(partitionFieldList.get(i))
+                        .append("=")
+                        
.append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)])
+                        .append("/");
+                
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+            }
+            partitionDir = stringBuilder.toString();
+        } else {
+            Map<String, String> valueMap = new 
HashMap<>(partitionFieldList.size() * 2);
+            for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
+                valueMap.put(keys[i], partitionFieldList.get(i));
+                valueMap.put(values[i], 
seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+                
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
+            }
+            partitionDir = 
VariablesSubstitute.substitute(partitionDirExpression, valueMap);
+        }
+        partitionDirAndValuesMap.put(partitionDir, vals);
+        return partitionDirAndValuesMap;
+    }
+
+    /**
+     * use transaction id generate file name
+     *
+     * @param transactionId transaction id
+     * @return file name
+     */
+    @Override
+    public String generateFileName(String transactionId) {
+        String fileNameExpression = textFileSinkConfig.getFileNameExpression();
+        FileFormat fileFormat = textFileSinkConfig.getFileFormat();
+        if (StringUtils.isBlank(fileNameExpression)) {
+            return transactionId + fileFormat.getSuffix();
+        }
+        String timeFormat = textFileSinkConfig.getFileNameTimeFormat();
+        DateTimeFormatter df = DateTimeFormatter.ofPattern(timeFormat);
+        String formattedDate = df.format(ZonedDateTime.now());
+        Map<String, String> valuesMap = new HashMap<>();
+        valuesMap.put(Constants.UUID, UUID.randomUUID().toString());
+        valuesMap.put(Constants.NOW, formattedDate);
+        valuesMap.put(timeFormat, formattedDate);
+        valuesMap.put(Constant.TRANSACTION_EXPRESSION, transactionId);
+        String substitute = VariablesSubstitute.substitute(fileNameExpression, 
valuesMap);
+        return substitute + fileFormat.getSuffix();
+    }
+
+    /**
+     * prepare commit operation
+     *
+     * @return the file commit information
+     */
+    @Override
+    public Optional<FileCommitInfo2> prepareCommit() {
+        this.finishAndCloseFile();
+        Map<String, String> commitMap = new HashMap<>(this.needMoveFiles);
+        Map<String, List<String>> copyMap = 
this.partitionDirAndValuesMap.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
ArrayList<>(e.getValue())));
+        return Optional.of(new FileCommitInfo2(commitMap, copyMap, 
transactionDirectory));
+    }
+
+    /**
+     * abort prepare commit operation
+     */
+    @Override
+    public void abortPrepare() {
+        abortPrepare(transactionId);
+    }
+
+    /**
+     * abort prepare commit operation using transaction directory
+     * @param transactionId transaction id
+     */
+    public void abortPrepare(String transactionId) {
+        try {
+            FileSystemUtils.deleteFile(getTransactionDir(transactionId));
+        } catch (IOException e) {
+            throw new RuntimeException("abort transaction " + transactionId + 
" error.", e);
+        }
+    }
+
+    /**
+     * when a checkpoint completed, file connector should begin a new 
transaction and generate new transaction id
+     * @param checkpointId checkpoint id
+     */
+    public void beginTransaction(Long checkpointId) {
+        this.transactionId = "T" + Constant.TRANSACTION_ID_SPLIT + jobId + 
Constant.TRANSACTION_ID_SPLIT + subTaskIndex + Constant.TRANSACTION_ID_SPLIT + 
checkpointId;
+        this.transactionDirectory = getTransactionDir(this.transactionId);
+        this.needMoveFiles = new HashMap<>();
+        this.partitionDirAndValuesMap = new HashMap<>();
+        this.beingWrittenFile = new HashMap<>();
+    }
+
+    /**
+     * get transaction ids from file sink states
+     * @param fileStates file sink states
+     * @return transaction ids
+     */
+    public List<String> getTransactionIdFromStates(List<FileSinkState2> 
fileStates) {
+        String[] strings = new String[]{textFileSinkConfig.getPath(), 
Constant.SEATUNNEL, jobId};
+        String jobDir = String.join(File.separator, strings) + "/";
+        try {
+            List<String> transactionDirList = 
FileSystemUtils.dirList(jobDir).stream().map(Path::toString).collect(Collectors.toList());
+            return transactionDirList.stream().map(dir -> 
dir.replaceAll(jobDir, "")).collect(Collectors.toList());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * when a checkpoint was triggered, snapshot the state of connector
+     *
+     * @param checkpointId checkpointId
+     * @return the list of states
+     */
+    @Override
+    public List<FileSinkState2> snapshotState(long checkpointId) {
+        ArrayList<FileSinkState2> fileState = Lists.newArrayList(new 
FileSinkState2(this.transactionId, this.checkpointId));
+        this.checkpointId = checkpointId;
+        this.beginTransaction(checkpointId);
+        return fileState;
+    }
+
+    /**
+     * using transaction id generate transaction directory
+     * @param transactionId transaction id
+     * @return transaction directory
+     */
+    private String getTransactionDir(@NonNull String transactionId) {
+        String[] strings = new String[]{textFileSinkConfig.getTmpPath(), 
Constant.SEATUNNEL, jobId, transactionId};
+        return String.join(File.separator, strings);
+    }
+
+    public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow 
seaTunnelRow) {
+        Map<String, List<String>> dataPartitionDirAndValuesMap = 
generatorPartitionDir(seaTunnelRow);
+        String beingWrittenFileKey = 
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
+        // get filePath from beingWrittenFile
+        String beingWrittenFilePath = 
beingWrittenFile.get(beingWrittenFileKey);
+        if (beingWrittenFilePath != null) {
+            return beingWrittenFilePath;
+        } else {
+            String[] strings = new String[]{transactionDirectory, 
beingWrittenFileKey, generateFileName(transactionId)};

Review Comment:
   as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to