This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f39e3a531 [Improve][Connector-V2][File] Support split file based on 
batch size (#3625)
f39e3a531 is described below

commit f39e3a531d22de53c42e88cbb613f8c4730d1540
Author: Tyrantlucifer <[email protected]>
AuthorDate: Wed Dec 7 22:33:49 2022 +0800

    [Improve][Connector-V2][File] Support split file based on batch size (#3625)
---
 docs/en/connector-v2/sink/FtpFile.md               |  7 ++++
 docs/en/connector-v2/sink/HdfsFile.md              |  9 +++++-
 docs/en/connector-v2/sink/LocalFile.md             |  7 ++++
 docs/en/connector-v2/sink/OssFile.md               |  9 +++++-
 docs/en/connector-v2/sink/S3File.md                |  8 ++++-
 docs/en/connector-v2/sink/SftpFile.md              |  9 +++++-
 .../seatunnel/file/config/BaseSinkConfig.java      |  5 +++
 .../seatunnel/file/config/BaseTextFileConfig.java  |  5 ++-
 .../seatunnel/file/sink/BaseFileSinkWriter.java    | 34 ++++++++++++++++++--
 .../seatunnel/file/sink/state/FileSinkState.java   |  5 +++
 .../file/sink/writer/AbstractWriteStrategy.java    | 37 ++++++++++++++++++----
 .../file/sink/writer/JsonWriteStrategy.java        |  1 +
 .../file/sink/writer/OrcWriteStrategy.java         |  1 +
 .../file/sink/writer/ParquetWriteStrategy.java     |  1 +
 .../file/sink/writer/TextWriteStrategy.java        |  1 +
 .../seatunnel/file/sink/writer/WriteStrategy.java  |  3 ++
 16 files changed, 129 insertions(+), 13 deletions(-)

diff --git a/docs/en/connector-v2/sink/FtpFile.md 
b/docs/en/connector-v2/sink/FtpFile.md
index 273af49ca..637345d60 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -39,6 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | is_partition_field_write_in_file | boolean | no       | false                
                                     |
 | sink_columns                     | array   | no       | When this parameter 
is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                 
                                     |
+| batch_size                       | int     | no       | 1000000              
                                     |
 | common-options                   |         | no       | -                    
                                     |
 
 ### host [string]
@@ -127,6 +128,10 @@ Please note that, If `is_enable_transaction` is `true`, we 
will auto add `${tran
 
 Only support `true` now.
 
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of 
lines in the file is determined by `batch_size` and `checkpoint.interval` 
jointly decide. If the value of `checkpoint.interval` is large enough, sink 
writer will write rows in a file until the rows in the file larger than 
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a 
new file when a new checkpoint trigger.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
@@ -173,3 +178,5 @@ FtpFile {
   - When field from upstream is null it will throw NullPointerException
   - Sink columns mapping failed
   - When restore writer from states getting transaction directly failed
+
+- [Improve] Support setting batch size for every file 
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/HdfsFile.md 
b/docs/en/connector-v2/sink/HdfsFile.md
index 1d562d663..4af318e71 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -38,6 +38,7 @@ In order to use this connector, You must ensure your 
spark/flink cluster already
 | is_partition_field_write_in_file | boolean | no       | false                
                                     |
 | sink_columns                     | array   | no       | When this parameter 
is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                 
                                     |
+| batch_size                       | int     | no       | 1000000              
                                     |
 | common-options                   |         | no       | -                    
                                     |
 
 ### fs.defaultFS [string]
@@ -115,6 +116,10 @@ Please note that, If `is_enable_transaction` is `true`, we 
will auto add `${tran
 
 Only support `true` now.
 
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of 
lines in the file is determined by `batch_size` and `checkpoint.interval` 
jointly decide. If the value of `checkpoint.interval` is large enough, sink 
writer will write rows in a file until the rows in the file larger than 
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a 
new file when a new checkpoint trigger.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
@@ -195,4 +200,6 @@ HdfsFile {
 - [BugFix] Fixed the following bugs that failed to write data to files 
([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
   - When field from upstream is null it will throw NullPointerException
   - Sink columns mapping failed
-  - When restore writer from states getting transaction directly failed
\ No newline at end of file
+  - When restore writer from states getting transaction directly failed
+
+- [Improve] Support setting batch size for every file 
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/LocalFile.md 
b/docs/en/connector-v2/sink/LocalFile.md
index d0fa20da6..fde2d2247 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -35,6 +35,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | is_partition_field_write_in_file | boolean | no       | false                
                                     |
 | sink_columns                     | array   | no       | When this parameter 
is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                 
                                     |
+| batch_size                       | int     | no       | 1000000              
                                     |
 | common-options                   |         | no       | -                    
                                     |
 
 ### path [string]
@@ -108,6 +109,10 @@ Please note that, If `is_enable_transaction` is `true`, we 
will auto add `${tran
 
 Only support `true` now.
 
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of 
lines in the file is determined by `batch_size` and `checkpoint.interval` 
jointly decide. If the value of `checkpoint.interval` is large enough, sink 
writer will write rows in a file until the rows in the file larger than 
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a 
new file when a new checkpoint trigger.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
@@ -186,3 +191,5 @@ LocalFile {
   - When field from upstream is null it will throw NullPointerException
   - Sink columns mapping failed
   - When restore writer from states getting transaction directly failed
+
+- [Improve] Support setting batch size for every file 
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
diff --git a/docs/en/connector-v2/sink/OssFile.md 
b/docs/en/connector-v2/sink/OssFile.md
index f2db1223b..aa5a8614e 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -42,6 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | is_partition_field_write_in_file | boolean | no       | false                
                                     |
 | sink_columns                     | array   | no       | When this parameter 
is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                 
                                     |
+| batch_size                       | int     | no       | 1000000              
                                     |
 | common-options                   |         | no       | -                    
                                     |
 
 ### path [string]
@@ -131,6 +132,10 @@ Please note that, If `is_enable_transaction` is `true`, we 
will auto add `${tran
 
 Only support `true` now.
 
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of 
lines in the file is determined by `batch_size` and `checkpoint.interval` 
jointly decide. If the value of `checkpoint.interval` is large enough, sink 
writer will write rows in a file until the rows in the file larger than 
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a 
new file when a new checkpoint trigger.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
@@ -224,4 +229,6 @@ For orc file format
 - [BugFix] Fixed the following bugs that failed to write data to files 
([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
   - When field from upstream is null it will throw NullPointerException
   - Sink columns mapping failed
-  - When restore writer from states getting transaction directly failed
\ No newline at end of file
+  - When restore writer from states getting transaction directly failed
+
+- [Improve] Support setting batch size for every file 
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/S3File.md 
b/docs/en/connector-v2/sink/S3File.md
index 39e23fc29..444d0f672 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -43,6 +43,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | is_partition_field_write_in_file | boolean | no       | false                
                                     |
 | sink_columns                     | array   | no       | When this parameter 
is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                 
                                     |
+| batch_size                       | int     | no       | 1000000              
                                     |
 | common-options                   |         | no       | -                    
                                     |
 
 ### path [string]
@@ -137,6 +138,10 @@ Please note that, If `is_enable_transaction` is `true`, we 
will auto add `${tran
 
 Only support `true` now.
 
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of 
lines in the file is determined by `batch_size` and `checkpoint.interval` 
jointly decide. If the value of `checkpoint.interval` is large enough, sink 
writer will write rows in a file until the rows in the file larger than 
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a 
new file when a new checkpoint trigger.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
@@ -229,4 +234,5 @@ For orc file format
 - [Feature] Support S3A protocol 
([3632](https://github.com/apache/incubator-seatunnel/pull/3632))
   - Allow user to add additional hadoop-s3 parameters
   - Allow the use of the s3a protocol
-  - Decouple hadoop-aws dependencies
\ No newline at end of file
+  - Decouple hadoop-aws dependencies
+- [Improve] Support setting batch size for every file 
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/SftpFile.md 
b/docs/en/connector-v2/sink/SftpFile.md
index 89c5bfadb..c08e0b93d 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -39,6 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | is_partition_field_write_in_file | boolean | no       | false                
                                     |
 | sink_columns                     | array   | no       | When this parameter 
is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                 
                                     |
+| batch_size                       | int     | no       | 1000000              
                                     |
 | common-options                   |         | no       | -                    
                                     |
 
 ### host [string]
@@ -127,6 +128,10 @@ Please note that, If `is_enable_transaction` is `true`, we 
will auto add `${tran
 
 Only support `true` now.
 
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of 
lines in the file is determined by `batch_size` and `checkpoint.interval` 
jointly decide. If the value of `checkpoint.interval` is large enough, sink 
writer will write rows in a file until the rows in the file larger than 
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a 
new file when a new checkpoint trigger.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
@@ -165,4 +170,6 @@ SftpFile {
 - [BugFix] Fixed the following bugs that failed to write data to files 
([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
   - When field from upstream is null it will throw NullPointerException
   - Sink columns mapping failed
-  - When restore writer from states getting transaction directly failed
\ No newline at end of file
+  - When restore writer from states getting transaction directly failed
+
+- [Improve] Support setting batch size for every file 
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 46546a626..2caed7ffa 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -35,6 +35,7 @@ public class BaseSinkConfig {
     public static final String DEFAULT_PARTITION_DIR_EXPRESSION = 
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/";
     public static final String DEFAULT_TMP_PATH = "/tmp/seatunnel";
     public static final String DEFAULT_FILE_NAME_EXPRESSION = 
"${transactionId}";
+    public static final int DEFAULT_BATCH_SIZE = 1000000;
 
     public static final Option<String> COMPRESS_CODEC = 
Options.key("compress_codec")
             .stringType()
@@ -107,4 +108,8 @@ public class BaseSinkConfig {
             .booleanType()
             .defaultValue(true)
             .withDescription("If or not enable transaction");
+    public static final Option<Integer> BATCH_SIZE = Options.key("batch_size")
+            .intType()
+            .defaultValue(DEFAULT_BATCH_SIZE)
+            .withDescription("The batch size of each split file");
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
index 9fcd01319..6c63c7262 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
@@ -40,6 +40,7 @@ public class BaseTextFileConfig implements DelimiterConfig, 
CompressConfig, Seri
     protected String compressCodec;
     protected String fieldDelimiter = 
BaseSinkConfig.FIELD_DELIMITER.defaultValue();
     protected String rowDelimiter = 
BaseSinkConfig.ROW_DELIMITER.defaultValue();
+    protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue();
     protected String path;
     protected String fileNameExpression;
     protected FileFormat fileFormat = FileFormat.TEXT;
@@ -52,7 +53,9 @@ public class BaseTextFileConfig implements DelimiterConfig, 
CompressConfig, Seri
             throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
                     "Compress not supported by SeaTunnel file connector now");
         }
-
+        if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
+            this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
+        }
         if (config.hasPath(BaseSinkConfig.FIELD_DELIMITER.key()) &&
                 
StringUtils.isNotEmpty(config.getString(BaseSinkConfig.FIELD_DELIMITER.key()))) 
{
             this.fieldDelimiter = 
config.getString(BaseSinkConfig.FIELD_DELIMITER.key());
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 3f8a203d2..51f566858 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -22,14 +22,21 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
 
+import org.apache.hadoop.fs.Path;
+
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, 
FileCommitInfo, FileSinkState> {
     private final WriteStrategy writeStrategy;
@@ -46,8 +53,31 @@ public class BaseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, FileCommitIn
         this.subTaskIndex = context.getIndexOfSubtask();
         writeStrategy.init(hadoopConf, jobId, subTaskIndex);
         if (!fileSinkStates.isEmpty()) {
-            List<String> transactionIds = 
writeStrategy.getTransactionIdFromStates(fileSinkStates);
-            transactionIds.forEach(writeStrategy::abortPrepare);
+            try {
+                List<Path> paths = 
FileSystemUtils.dirList(writeStrategy.getFileSinkConfig().getTmpPath());
+                List<String> transactions = 
paths.stream().map(Path::getName).collect(Collectors.toList());
+                FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new 
FileSinkAggregatedCommitter(hadoopConf);
+                HashMap<String, FileSinkState> fileStatesMap = new HashMap<>();
+                fileSinkStates.forEach(fileSinkState ->
+                        fileStatesMap.put(fileSinkState.getTransactionId(), 
fileSinkState));
+                for (String transaction : transactions) {
+                    if (fileStatesMap.containsKey(transaction)) {
+                        // need commit
+                        FileSinkState fileSinkState = 
fileStatesMap.get(transaction);
+                        FileAggregatedCommitInfo fileCommitInfo = 
fileSinkAggregatedCommitter
+                                .combine(Collections.singletonList(new 
FileCommitInfo(fileSinkState.getNeedMoveFiles(),
+                                        
fileSinkState.getPartitionDirAndValuesMap(),
+                                        fileSinkState.getTransactionDir())));
+                        
fileSinkAggregatedCommitter.commit(Collections.singletonList(fileCommitInfo));
+                    } else {
+                        // need abort
+                        writeStrategy.abortPrepare(transaction);
+                    }
+                }
+            } catch (IOException e) {
+                String errorMsg = String.format("Try to process these 
fileStates %s failed", fileSinkStates);
+                throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
+            }
             
writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId() + 1);
         } else {
             writeStrategy.beginTransaction(1L);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
index 3b0ed862e..5e48496d4 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
@@ -21,10 +21,15 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
 
 @Data
 @AllArgsConstructor
 public class FileSinkState implements Serializable {
     private final String transactionId;
     private final Long checkpointId;
+    private final Map<String, String> needMoveFiles;
+    private final Map<String, List<String>> partitionDirAndValuesMap;
+    private final String transactionDir;
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index f6baf69cd..9ce7673aa 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 
 import com.google.common.collect.Lists;
+import lombok.Getter;
 import lombok.NonNull;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -60,6 +61,7 @@ import java.util.stream.Collectors;
 
 public abstract class AbstractWriteStrategy implements WriteStrategy {
     protected final Logger log = LoggerFactory.getLogger(this.getClass());
+    @Getter
     protected final TextFileSinkConfig textFileSinkConfig;
     protected final List<Integer> sinkColumnsIndexInRow;
     protected String jobId;
@@ -68,16 +70,20 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
     protected String transactionId;
     protected String transactionDirectory;
     protected Map<String, String> needMoveFiles;
-    protected Map<String, String> beingWrittenFile;
+    protected Map<String, String> beingWrittenFile = new HashMap<>();
     private Map<String, List<String>> partitionDirAndValuesMap;
     protected SeaTunnelRowType seaTunnelRowType;
 
     // Checkpoint id from engine is start with 1
     protected Long checkpointId = 0L;
+    protected int partId = 0;
+    protected int batchSize;
+    protected int currentBatchSize = 0;
 
     public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
         this.textFileSinkConfig = textFileSinkConfig;
         this.sinkColumnsIndexInRow = 
textFileSinkConfig.getSinkColumnsIndexInRow();
+        this.batchSize = textFileSinkConfig.getBatchSize();
     }
 
     /**
@@ -93,6 +99,16 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
         FileSystemUtils.CONF = getConfiguration(hadoopConf);
     }
 
+    @Override
+    public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException 
{
+        if (currentBatchSize >= batchSize) {
+            this.partId++;
+            currentBatchSize = 0;
+            beingWrittenFile.clear();
+        }
+        currentBatchSize++;
+    }
+
     /**
      * use hadoop conf generate hadoop configuration
      *
@@ -151,8 +167,7 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
             for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
                 stringBuilder.append(partitionFieldList.get(i))
                         .append("=")
-                        
.append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)])
-                        .append(File.separator);
+                        
.append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)]);
                 
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
             }
             partitionDir = stringBuilder.toString();
@@ -190,7 +205,7 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
         valuesMap.put(Constants.NOW, formattedDate);
         valuesMap.put(timeFormat, formattedDate);
         valuesMap.put(BaseSinkConfig.TRANSACTION_EXPRESSION, transactionId);
-        String substitute = VariablesSubstitute.substitute(fileNameExpression, 
valuesMap);
+        String substitute = VariablesSubstitute.substitute(fileNameExpression, 
valuesMap) + "_" + partId;
         return substitute + fileFormat.getSuffix();
     }
 
@@ -239,7 +254,6 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
         this.transactionDirectory = getTransactionDir(this.transactionId);
         this.needMoveFiles = new HashMap<>();
         this.partitionDirAndValuesMap = new HashMap<>();
-        this.beingWrittenFile = new HashMap<>();
     }
 
     /**
@@ -274,7 +288,12 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
      */
     @Override
     public List<FileSinkState> snapshotState(long checkpointId) {
-        ArrayList<FileSinkState> fileState = Lists.newArrayList(new 
FileSinkState(this.transactionId, this.checkpointId));
+        Map<String, List<String>> commitMap = 
this.partitionDirAndValuesMap.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
ArrayList<>(e.getValue())));
+        ArrayList<FileSinkState> fileState = Lists.newArrayList(new 
FileSinkState(this.transactionId,
+                this.checkpointId, new HashMap<>(this.needMoveFiles),
+                commitMap, this.getTransactionDir(transactionId)));
+        this.beingWrittenFile.clear();
         this.beginTransaction(checkpointId + 1);
         return fileState;
     }
@@ -313,7 +332,13 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
         return tmpPath.replaceAll(BaseSinkConfig.NON_PARTITION + 
Matcher.quoteReplacement(File.separator), "");
     }
 
+    @Override
     public long getCheckpointId() {
         return this.checkpointId;
     }
+
+    @Override
+    public TextFileSinkConfig getFileSinkConfig() {
+        return textFileSinkConfig;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index a53163c79..95585606c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -54,6 +54,7 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
 
     @Override
     public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        super.write(seaTunnelRow);
         String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
         FSDataOutputStream fsDataOutputStream = 
getOrCreateOutputStream(filePath);
         try {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 116eeb5b8..a2e164d3b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -69,6 +69,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
 
     @Override
     public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        super.write(seaTunnelRow);
         String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
         Writer writer = getOrCreateWriter(filePath);
         TypeDescription schema = buildSchemaWithRowType();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 0e04ce650..33cfa32e0 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -91,6 +91,7 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
 
     @Override
     public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        super.write(seaTunnelRow);
         String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
         ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
         GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 3545acc88..d3aa838e2 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -71,6 +71,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
 
     @Override
     public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        super.write(seaTunnelRow);
         String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
         FSDataOutputStream fsDataOutputStream = 
getOrCreateOutputStream(filePath);
         try {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index c1370345c..e6d299530 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -75,4 +76,6 @@ public interface WriteStrategy extends Transaction, 
Serializable {
     void finishAndCloseFile();
 
     long getCheckpointId();
+
+    TextFileSinkConfig getFileSinkConfig();
 }

Reply via email to