This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f39e3a531 [Improve][Connector-V2][File] Support split file based on
batch size (#3625)
f39e3a531 is described below
commit f39e3a531d22de53c42e88cbb613f8c4730d1540
Author: Tyrantlucifer <[email protected]>
AuthorDate: Wed Dec 7 22:33:49 2022 +0800
[Improve][Connector-V2][File] Support split file based on batch size (#3625)
---
docs/en/connector-v2/sink/FtpFile.md | 7 ++++
docs/en/connector-v2/sink/HdfsFile.md | 9 +++++-
docs/en/connector-v2/sink/LocalFile.md | 7 ++++
docs/en/connector-v2/sink/OssFile.md | 9 +++++-
docs/en/connector-v2/sink/S3File.md | 8 ++++-
docs/en/connector-v2/sink/SftpFile.md | 9 +++++-
.../seatunnel/file/config/BaseSinkConfig.java | 5 +++
.../seatunnel/file/config/BaseTextFileConfig.java | 5 ++-
.../seatunnel/file/sink/BaseFileSinkWriter.java | 34 ++++++++++++++++++--
.../seatunnel/file/sink/state/FileSinkState.java | 5 +++
.../file/sink/writer/AbstractWriteStrategy.java | 37 ++++++++++++++++++----
.../file/sink/writer/JsonWriteStrategy.java | 1 +
.../file/sink/writer/OrcWriteStrategy.java | 1 +
.../file/sink/writer/ParquetWriteStrategy.java | 1 +
.../file/sink/writer/TextWriteStrategy.java | 1 +
.../seatunnel/file/sink/writer/WriteStrategy.java | 3 ++
16 files changed, 129 insertions(+), 13 deletions(-)
diff --git a/docs/en/connector-v2/sink/FtpFile.md
b/docs/en/connector-v2/sink/FtpFile.md
index 273af49ca..637345d60 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -39,6 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| is_partition_field_write_in_file | boolean | no | false
|
| sink_columns | array | no | When this parameter
is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true
|
+| batch_size | int | no | 1000000
|
| common-options | | no | -
|
### host [string]
@@ -127,6 +128,10 @@ Please note that, If `is_enable_transaction` is `true`, we
will auto add `${tran
Only support `true` now.
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of
lines in the file is determined by `batch_size` and `checkpoint.interval`
jointly decide. If the value of `checkpoint.interval` is large enough, sink
writer will write rows in a file until the rows in the file larger than
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a
new file when a new checkpoint trigger.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
@@ -173,3 +178,5 @@ FtpFile {
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
+
+- [Improve] Support setting batch size for every file
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/HdfsFile.md
b/docs/en/connector-v2/sink/HdfsFile.md
index 1d562d663..4af318e71 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -38,6 +38,7 @@ In order to use this connector, You must ensure your
spark/flink cluster already
| is_partition_field_write_in_file | boolean | no | false
|
| sink_columns | array | no | When this parameter
is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true
|
+| batch_size | int | no | 1000000
|
| common-options | | no | -
|
### fs.defaultFS [string]
@@ -115,6 +116,10 @@ Please note that, If `is_enable_transaction` is `true`, we
will auto add `${tran
Only support `true` now.
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of
lines in the file is determined by `batch_size` and `checkpoint.interval`
jointly decide. If the value of `checkpoint.interval` is large enough, sink
writer will write rows in a file until the rows in the file larger than
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a
new file when a new checkpoint trigger.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
@@ -195,4 +200,6 @@ HdfsFile {
- [BugFix] Fixed the following bugs that failed to write data to files
([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- - When restore writer from states getting transaction directly failed
\ No newline at end of file
+ - When restore writer from states getting transaction directly failed
+
+- [Improve] Support setting batch size for every file
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/LocalFile.md
b/docs/en/connector-v2/sink/LocalFile.md
index d0fa20da6..fde2d2247 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -35,6 +35,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| is_partition_field_write_in_file | boolean | no | false
|
| sink_columns | array | no | When this parameter
is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true
|
+| batch_size | int | no | 1000000
|
| common-options | | no | -
|
### path [string]
@@ -108,6 +109,10 @@ Please note that, If `is_enable_transaction` is `true`, we
will auto add `${tran
Only support `true` now.
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of
lines in the file is determined by `batch_size` and `checkpoint.interval`
jointly decide. If the value of `checkpoint.interval` is large enough, sink
writer will write rows in a file until the rows in the file larger than
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a
new file when a new checkpoint trigger.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
@@ -186,3 +191,5 @@ LocalFile {
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
+
+- [Improve] Support setting batch size for every file
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
diff --git a/docs/en/connector-v2/sink/OssFile.md
b/docs/en/connector-v2/sink/OssFile.md
index f2db1223b..aa5a8614e 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -42,6 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| is_partition_field_write_in_file | boolean | no | false
|
| sink_columns | array | no | When this parameter
is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true
|
+| batch_size | int | no | 1000000
|
| common-options | | no | -
|
### path [string]
@@ -131,6 +132,10 @@ Please note that, If `is_enable_transaction` is `true`, we
will auto add `${tran
Only support `true` now.
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of
lines in the file is determined by `batch_size` and `checkpoint.interval`
jointly decide. If the value of `checkpoint.interval` is large enough, sink
writer will write rows in a file until the rows in the file larger than
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a
new file when a new checkpoint trigger.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
@@ -224,4 +229,6 @@ For orc file format
- [BugFix] Fixed the following bugs that failed to write data to files
([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- - When restore writer from states getting transaction directly failed
\ No newline at end of file
+ - When restore writer from states getting transaction directly failed
+
+- [Improve] Support setting batch size for every file
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/S3File.md
b/docs/en/connector-v2/sink/S3File.md
index 39e23fc29..444d0f672 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -43,6 +43,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| is_partition_field_write_in_file | boolean | no | false
|
| sink_columns | array | no | When this parameter
is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true
|
+| batch_size | int | no | 1000000
|
| common-options | | no | -
|
### path [string]
@@ -137,6 +138,10 @@ Please note that, If `is_enable_transaction` is `true`, we
will auto add `${tran
Only support `true` now.
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of
lines in the file is determined by `batch_size` and `checkpoint.interval`
jointly decide. If the value of `checkpoint.interval` is large enough, sink
writer will write rows in a file until the rows in the file larger than
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a
new file when a new checkpoint trigger.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
@@ -229,4 +234,5 @@ For orc file format
- [Feature] Support S3A protocol
([3632](https://github.com/apache/incubator-seatunnel/pull/3632))
- Allow user to add additional hadoop-s3 parameters
- Allow the use of the s3a protocol
- - Decouple hadoop-aws dependencies
\ No newline at end of file
+ - Decouple hadoop-aws dependencies
+- [Improve] Support setting batch size for every file
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/SftpFile.md
b/docs/en/connector-v2/sink/SftpFile.md
index 89c5bfadb..c08e0b93d 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -39,6 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| is_partition_field_write_in_file | boolean | no | false
|
| sink_columns | array | no | When this parameter
is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true
|
+| batch_size | int | no | 1000000
|
| common-options | | no | -
|
### host [string]
@@ -127,6 +128,10 @@ Please note that, If `is_enable_transaction` is `true`, we
will auto add `${tran
Only support `true` now.
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of
lines in the file is determined by `batch_size` and `checkpoint.interval`
jointly decide. If the value of `checkpoint.interval` is large enough, sink
writer will write rows in a file until the rows in the file larger than
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a
new file when a new checkpoint trigger.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
@@ -165,4 +170,6 @@ SftpFile {
- [BugFix] Fixed the following bugs that failed to write data to files
([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- - When restore writer from states getting transaction directly failed
\ No newline at end of file
+ - When restore writer from states getting transaction directly failed
+
+- [Improve] Support setting batch size for every file
([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 46546a626..2caed7ffa 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -35,6 +35,7 @@ public class BaseSinkConfig {
public static final String DEFAULT_PARTITION_DIR_EXPRESSION =
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/";
public static final String DEFAULT_TMP_PATH = "/tmp/seatunnel";
public static final String DEFAULT_FILE_NAME_EXPRESSION =
"${transactionId}";
+ public static final int DEFAULT_BATCH_SIZE = 1000000;
public static final Option<String> COMPRESS_CODEC =
Options.key("compress_codec")
.stringType()
@@ -107,4 +108,8 @@ public class BaseSinkConfig {
.booleanType()
.defaultValue(true)
.withDescription("If or not enable transaction");
+ public static final Option<Integer> BATCH_SIZE = Options.key("batch_size")
+ .intType()
+ .defaultValue(DEFAULT_BATCH_SIZE)
+ .withDescription("The batch size of each split file");
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
index 9fcd01319..6c63c7262 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
@@ -40,6 +40,7 @@ public class BaseTextFileConfig implements DelimiterConfig,
CompressConfig, Seri
protected String compressCodec;
protected String fieldDelimiter =
BaseSinkConfig.FIELD_DELIMITER.defaultValue();
protected String rowDelimiter =
BaseSinkConfig.ROW_DELIMITER.defaultValue();
+ protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue();
protected String path;
protected String fileNameExpression;
protected FileFormat fileFormat = FileFormat.TEXT;
@@ -52,7 +53,9 @@ public class BaseTextFileConfig implements DelimiterConfig,
CompressConfig, Seri
throw new
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
"Compress not supported by SeaTunnel file connector now");
}
-
+ if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
+ this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
+ }
if (config.hasPath(BaseSinkConfig.FIELD_DELIMITER.key()) &&
StringUtils.isNotEmpty(config.getString(BaseSinkConfig.FIELD_DELIMITER.key())))
{
this.fieldDelimiter =
config.getString(BaseSinkConfig.FIELD_DELIMITER.key());
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 3f8a203d2..51f566858 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -22,14 +22,21 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+import org.apache.hadoop.fs.Path;
+
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow,
FileCommitInfo, FileSinkState> {
private final WriteStrategy writeStrategy;
@@ -46,8 +53,31 @@ public class BaseFileSinkWriter implements
SinkWriter<SeaTunnelRow, FileCommitIn
this.subTaskIndex = context.getIndexOfSubtask();
writeStrategy.init(hadoopConf, jobId, subTaskIndex);
if (!fileSinkStates.isEmpty()) {
- List<String> transactionIds =
writeStrategy.getTransactionIdFromStates(fileSinkStates);
- transactionIds.forEach(writeStrategy::abortPrepare);
+ try {
+ List<Path> paths =
FileSystemUtils.dirList(writeStrategy.getFileSinkConfig().getTmpPath());
+ List<String> transactions =
paths.stream().map(Path::getName).collect(Collectors.toList());
+ FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new
FileSinkAggregatedCommitter(hadoopConf);
+ HashMap<String, FileSinkState> fileStatesMap = new HashMap<>();
+ fileSinkStates.forEach(fileSinkState ->
+ fileStatesMap.put(fileSinkState.getTransactionId(),
fileSinkState));
+ for (String transaction : transactions) {
+ if (fileStatesMap.containsKey(transaction)) {
+ // need commit
+ FileSinkState fileSinkState =
fileStatesMap.get(transaction);
+ FileAggregatedCommitInfo fileCommitInfo =
fileSinkAggregatedCommitter
+ .combine(Collections.singletonList(new
FileCommitInfo(fileSinkState.getNeedMoveFiles(),
+
fileSinkState.getPartitionDirAndValuesMap(),
+ fileSinkState.getTransactionDir())));
+
fileSinkAggregatedCommitter.commit(Collections.singletonList(fileCommitInfo));
+ } else {
+ // need abort
+ writeStrategy.abortPrepare(transaction);
+ }
+ }
+ } catch (IOException e) {
+ String errorMsg = String.format("Try to process these
fileStates %s failed", fileSinkStates);
+ throw new
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
+ }
writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId() + 1);
} else {
writeStrategy.beginTransaction(1L);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
index 3b0ed862e..5e48496d4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
@@ -21,10 +21,15 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
@Data
@AllArgsConstructor
public class FileSinkState implements Serializable {
private final String transactionId;
private final Long checkpointId;
+ private final Map<String, String> needMoveFiles;
+ private final Map<String, List<String>> partitionDirAndValuesMap;
+ private final String transactionDir;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index f6baf69cd..9ce7673aa 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import com.google.common.collect.Lists;
+import lombok.Getter;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -60,6 +61,7 @@ import java.util.stream.Collectors;
public abstract class AbstractWriteStrategy implements WriteStrategy {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
+ @Getter
protected final TextFileSinkConfig textFileSinkConfig;
protected final List<Integer> sinkColumnsIndexInRow;
protected String jobId;
@@ -68,16 +70,20 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
protected String transactionId;
protected String transactionDirectory;
protected Map<String, String> needMoveFiles;
- protected Map<String, String> beingWrittenFile;
+ protected Map<String, String> beingWrittenFile = new HashMap<>();
private Map<String, List<String>> partitionDirAndValuesMap;
protected SeaTunnelRowType seaTunnelRowType;
// Checkpoint id from engine is start with 1
protected Long checkpointId = 0L;
+ protected int partId = 0;
+ protected int batchSize;
+ protected int currentBatchSize = 0;
public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
this.textFileSinkConfig = textFileSinkConfig;
this.sinkColumnsIndexInRow =
textFileSinkConfig.getSinkColumnsIndexInRow();
+ this.batchSize = textFileSinkConfig.getBatchSize();
}
/**
@@ -93,6 +99,16 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
FileSystemUtils.CONF = getConfiguration(hadoopConf);
}
+ @Override
+ public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException
{
+ if (currentBatchSize >= batchSize) {
+ this.partId++;
+ currentBatchSize = 0;
+ beingWrittenFile.clear();
+ }
+ currentBatchSize++;
+ }
+
/**
* use hadoop conf generate hadoop configuration
*
@@ -151,8 +167,7 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
for (int i = 0; i < partitionFieldsIndexInRow.size(); i++) {
stringBuilder.append(partitionFieldList.get(i))
.append("=")
-
.append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)])
- .append(File.separator);
+
.append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)]);
vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
}
partitionDir = stringBuilder.toString();
@@ -190,7 +205,7 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
valuesMap.put(Constants.NOW, formattedDate);
valuesMap.put(timeFormat, formattedDate);
valuesMap.put(BaseSinkConfig.TRANSACTION_EXPRESSION, transactionId);
- String substitute = VariablesSubstitute.substitute(fileNameExpression,
valuesMap);
+ String substitute = VariablesSubstitute.substitute(fileNameExpression,
valuesMap) + "_" + partId;
return substitute + fileFormat.getSuffix();
}
@@ -239,7 +254,6 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
this.transactionDirectory = getTransactionDir(this.transactionId);
this.needMoveFiles = new HashMap<>();
this.partitionDirAndValuesMap = new HashMap<>();
- this.beingWrittenFile = new HashMap<>();
}
/**
@@ -274,7 +288,12 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
*/
@Override
public List<FileSinkState> snapshotState(long checkpointId) {
- ArrayList<FileSinkState> fileState = Lists.newArrayList(new
FileSinkState(this.transactionId, this.checkpointId));
+ Map<String, List<String>> commitMap =
this.partitionDirAndValuesMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> new
ArrayList<>(e.getValue())));
+ ArrayList<FileSinkState> fileState = Lists.newArrayList(new
FileSinkState(this.transactionId,
+ this.checkpointId, new HashMap<>(this.needMoveFiles),
+ commitMap, this.getTransactionDir(transactionId)));
+ this.beingWrittenFile.clear();
this.beginTransaction(checkpointId + 1);
return fileState;
}
@@ -313,7 +332,13 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
return tmpPath.replaceAll(BaseSinkConfig.NON_PARTITION +
Matcher.quoteReplacement(File.separator), "");
}
+ @Override
public long getCheckpointId() {
return this.checkpointId;
}
+
+ @Override
+ public TextFileSinkConfig getFileSinkConfig() {
+ return textFileSinkConfig;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index a53163c79..95585606c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -54,6 +54,7 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
@Override
public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ super.write(seaTunnelRow);
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
FSDataOutputStream fsDataOutputStream =
getOrCreateOutputStream(filePath);
try {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 116eeb5b8..a2e164d3b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -69,6 +69,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
@Override
public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ super.write(seaTunnelRow);
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
Writer writer = getOrCreateWriter(filePath);
TypeDescription schema = buildSchemaWithRowType();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 0e04ce650..33cfa32e0 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -91,6 +91,7 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
@Override
public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ super.write(seaTunnelRow);
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
ParquetWriter<GenericRecord> writer = getOrCreateWriter(filePath);
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 3545acc88..d3aa838e2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -71,6 +71,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
@Override
public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ super.write(seaTunnelRow);
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
FSDataOutputStream fsDataOutputStream =
getOrCreateOutputStream(filePath);
try {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index c1370345c..e6d299530 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
import org.apache.hadoop.conf.Configuration;
@@ -75,4 +76,6 @@ public interface WriteStrategy extends Transaction,
Serializable {
void finishAndCloseFile();
long getCheckpointId();
+
+ TextFileSinkConfig getFileSinkConfig();
}