This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1e18a8c530 [Improve][Connector[File] Optimize files commit order
(#5045)
1e18a8c530 is described below
commit 1e18a8c5303fb071d6c4d5d8dc893b8882c3bc4a
Author: hailin0 <[email protected]>
AuthorDate: Mon Jul 24 15:35:27 2023 +0800
[Improve][Connector[File] Optimize files commit order (#5045)
Before using `HashMap` store files path, so every checkpoint file commit is
out of order.
Now switch to using `LinkedHashMap` to ensure that files are commit in the
generated order
---
.../seatunnel/file/sink/BaseFileSinkWriter.java | 6 +-
.../file/sink/commit/FileAggregatedCommitInfo.java | 6 +-
.../seatunnel/file/sink/commit/FileCommitInfo.java | 6 +-
.../sink/commit/FileSinkAggregatedCommitter.java | 15 +++--
.../file/sink/commit/FileSinkCommitter.java | 75 ----------------------
.../seatunnel/file/sink/state/FileSinkState.java | 6 +-
.../file/sink/writer/AbstractWriteStrategy.java | 44 ++++++++-----
.../file/sink/writer/ExcelWriteStrategy.java | 7 +-
.../file/sink/writer/JsonWriteStrategy.java | 5 +-
.../file/sink/writer/OrcWriteStrategy.java | 6 +-
.../file/sink/writer/ParquetWriteStrategy.java | 7 +-
.../file/sink/writer/TextWriteStrategy.java | 5 +-
.../seatunnel/file/sink/writer/WriteStrategy.java | 4 +-
.../commit/S3RedshiftSinkAggregatedCommitter.java | 5 +-
14 files changed, 68 insertions(+), 129 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 7102e954a4..22200249f6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -34,14 +34,14 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow,
FileCommitInfo, FileSinkState> {
- private final WriteStrategy writeStrategy;
+ protected final WriteStrategy writeStrategy;
private final FileSystemUtils fileSystemUtils;
@SuppressWarnings("checkstyle:MagicNumber")
@@ -67,7 +67,7 @@ public class BaseFileSinkWriter implements
SinkWriter<SeaTunnelRow, FileCommitIn
List<String> transactions = findTransactionList(jobId,
uuidPrefix);
FileSinkAggregatedCommitter fileSinkAggregatedCommitter =
new FileSinkAggregatedCommitter(fileSystemUtils);
- HashMap<String, FileSinkState> fileStatesMap = new HashMap<>();
+ LinkedHashMap<String, FileSinkState> fileStatesMap = new
LinkedHashMap<>();
fileSinkStates.forEach(
fileSinkState ->
fileStatesMap.put(fileSinkState.getTransactionId(), fileSinkState));
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
index 16d94a1f63..5ca3b30fad 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
@@ -21,8 +21,8 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
@Data
@AllArgsConstructor
@@ -34,7 +34,7 @@ public class FileAggregatedCommitInfo implements Serializable
{
*
* <p>V is the target file path of the data file.
*/
- private final Map<String, Map<String, String>> transactionMap;
+ private final LinkedHashMap<String, LinkedHashMap<String, String>>
transactionMap;
/**
* Storage the partition information in map.
@@ -43,5 +43,5 @@ public class FileAggregatedCommitInfo implements Serializable
{
*
* <p>V is the list of partition column's values.
*/
- private final Map<String, List<String>> partitionDirAndValuesMap;
+ private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java
index 86c433b8f5..27e74ff0a8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java
@@ -21,8 +21,8 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
@Data
@AllArgsConstructor
@@ -34,7 +34,7 @@ public class FileCommitInfo implements Serializable {
*
* <p>V is the target file path of the data file.
*/
- private final Map<String, String> needMoveFiles;
+ private final LinkedHashMap<String, String> needMoveFiles;
/**
* Storage the partition information in map.
@@ -43,7 +43,7 @@ public class FileCommitInfo implements Serializable {
*
* <p>V is the list of partition column's values.
*/
- private final Map<String, List<String>> partitionDirAndValuesMap;
+ private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
/** Storage the transaction directory */
private final String transactionDir;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index b12ef1165a..a076188e2a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -24,7 +24,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -44,7 +44,7 @@ public class FileSinkAggregatedCommitter
aggregatedCommitInfos.forEach(
aggregatedCommitInfo -> {
try {
- for (Map.Entry<String, Map<String, String>> entry :
+ for (Map.Entry<String, LinkedHashMap<String, String>>
entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
@@ -77,13 +77,14 @@ public class FileSinkAggregatedCommitter
if (commitInfos == null || commitInfos.size() == 0) {
return null;
}
- Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
- Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>();
+ LinkedHashMap<String, LinkedHashMap<String, String>>
aggregateCommitInfo =
+ new LinkedHashMap<>();
+ LinkedHashMap<String, List<String>> partitionDirAndValuesMap = new
LinkedHashMap<>();
commitInfos.forEach(
commitInfo -> {
- Map<String, String> needMoveFileMap =
+ LinkedHashMap<String, String> needMoveFileMap =
aggregateCommitInfo.computeIfAbsent(
- commitInfo.getTransactionDir(), k -> new
HashMap<>());
+ commitInfo.getTransactionDir(), k -> new
LinkedHashMap<>());
needMoveFileMap.putAll(commitInfo.getNeedMoveFiles());
if (commitInfo.getPartitionDirAndValuesMap() != null
&&
!commitInfo.getPartitionDirAndValuesMap().isEmpty()) {
@@ -109,7 +110,7 @@ public class FileSinkAggregatedCommitter
aggregatedCommitInfos.forEach(
aggregatedCommitInfo -> {
try {
- for (Map.Entry<String, Map<String, String>> entry :
+ for (Map.Entry<String, LinkedHashMap<String, String>>
entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
// rollback the file
for (Map.Entry<String, String> mvFileEntry :
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
deleted file mode 100644
index 6525b5e7d4..0000000000
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
-
-import org.apache.seatunnel.api.sink.SinkCommitter;
-import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/** Deprecated interface since 2.3.0-beta, now used {@link
FileSinkAggregatedCommitter} */
-@Deprecated
-public class FileSinkCommitter implements SinkCommitter<FileCommitInfo> {
- private final FileSystemUtils fileSystemUtils;
-
- public FileSinkCommitter(FileSystemUtils fileSystemUtils) {
- this.fileSystemUtils = fileSystemUtils;
- }
-
- @Override
- public List<FileCommitInfo> commit(List<FileCommitInfo> commitInfos)
throws IOException {
- ArrayList<FileCommitInfo> failedCommitInfos = new ArrayList<>();
- for (FileCommitInfo commitInfo : commitInfos) {
- Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
- needMoveFiles.forEach(
- (k, v) -> {
- try {
- fileSystemUtils.renameFile(k, v, true);
- } catch (IOException e) {
- failedCommitInfos.add(commitInfo);
- }
- });
- fileSystemUtils.deleteFile(commitInfo.getTransactionDir());
- }
- return failedCommitInfos;
- }
-
- /**
- * Abort the transaction, this method will be called (**Only** on Spark
engine) when the commit
- * is failed.
- *
- * @param commitInfos The list of commit message, used to abort the commit.
- * @throws IOException throw IOException when close failed.
- */
- @Override
- public void abort(List<FileCommitInfo> commitInfos) throws IOException {
- for (FileCommitInfo commitInfo : commitInfos) {
- Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
- for (Map.Entry<String, String> entry : needMoveFiles.entrySet()) {
- if (fileSystemUtils.fileExist(entry.getValue())
- && !fileSystemUtils.fileExist(entry.getKey())) {
- fileSystemUtils.renameFile(entry.getValue(),
entry.getKey(), true);
- }
- }
- fileSystemUtils.deleteFile(commitInfo.getTransactionDir());
- }
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
index 7d28df2305..34ca13625f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
@@ -21,8 +21,8 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
@Data
@AllArgsConstructor
@@ -30,7 +30,7 @@ public class FileSinkState implements Serializable {
private final String transactionId;
private final String uuidPrefix;
private final Long checkpointId;
- private final Map<String, String> needMoveFiles;
- private final Map<String, List<String>> partitionDirAndValuesMap;
+ private final LinkedHashMap<String, String> needMoveFiles;
+ private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
private final String transactionDir;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 6820d28d85..f3160eec7e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -50,6 +50,7 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -76,9 +77,9 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
protected String uuidPrefix;
protected String transactionDirectory;
- protected Map<String, String> needMoveFiles;
- protected Map<String, String> beingWrittenFile = new HashMap<>();
- private Map<String, List<String>> partitionDirAndValuesMap;
+ protected LinkedHashMap<String, String> needMoveFiles;
+ protected LinkedHashMap<String, String> beingWrittenFile = new
LinkedHashMap<>();
+ private LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
protected SeaTunnelRowType seaTunnelRowType;
// Checkpoint id from engine is start with 1
@@ -111,13 +112,18 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
@Override
public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException
{
if (currentBatchSize >= batchSize) {
- this.partId++;
+ newFilePart();
currentBatchSize = 0;
- beingWrittenFile.clear();
}
currentBatchSize++;
}
+ public synchronized void newFilePart() {
+ this.partId++;
+ beingWrittenFile.clear();
+ log.debug("new file part: {}", partId);
+ }
+
protected SeaTunnelRowType buildSchemaWithRowType(
SeaTunnelRowType seaTunnelRowType, List<Integer> sinkColumnsIndex)
{
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
@@ -177,9 +183,9 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
* @return the map of partition directory
*/
@Override
- public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow
seaTunnelRow) {
+ public LinkedHashMap<String, List<String>>
generatorPartitionDir(SeaTunnelRow seaTunnelRow) {
List<Integer> partitionFieldsIndexInRow =
fileSinkConfig.getPartitionFieldsIndexInRow();
- Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>(1);
+ LinkedHashMap<String, List<String>> partitionDirAndValuesMap = new
LinkedHashMap<>(1);
if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
partitionDirAndValuesMap.put(BaseSinkConfig.NON_PARTITION, null);
return partitionDirAndValuesMap;
@@ -258,12 +264,15 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
@Override
public Optional<FileCommitInfo> prepareCommit() {
this.finishAndCloseFile();
- Map<String, String> commitMap = new HashMap<>(this.needMoveFiles);
- Map<String, List<String>> copyMap =
+ LinkedHashMap<String, String> commitMap = new
LinkedHashMap<>(this.needMoveFiles);
+ LinkedHashMap<String, List<String>> copyMap =
this.partitionDirAndValuesMap.entrySet().stream()
.collect(
Collectors.toMap(
- Map.Entry::getKey, e -> new
ArrayList<>(e.getValue())));
+ Map.Entry::getKey,
+ e -> new ArrayList<>(e.getValue()),
+ (e1, e2) -> e1,
+ LinkedHashMap::new));
return Optional.of(new FileCommitInfo(commitMap, copyMap,
transactionDirectory));
}
@@ -301,8 +310,8 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
this.checkpointId = checkpointId;
this.transactionId = getTransactionId(checkpointId);
this.transactionDirectory = getTransactionDir(this.transactionId);
- this.needMoveFiles = new HashMap<>();
- this.partitionDirAndValuesMap = new HashMap<>();
+ this.needMoveFiles = new LinkedHashMap<>();
+ this.partitionDirAndValuesMap = new LinkedHashMap<>();
}
private String getTransactionId(Long checkpointId) {
@@ -325,18 +334,21 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
*/
@Override
public List<FileSinkState> snapshotState(long checkpointId) {
- Map<String, List<String>> commitMap =
+ LinkedHashMap<String, List<String>> commitMap =
this.partitionDirAndValuesMap.entrySet().stream()
.collect(
Collectors.toMap(
- Map.Entry::getKey, e -> new
ArrayList<>(e.getValue())));
+ Map.Entry::getKey,
+ e -> new ArrayList<>(e.getValue()),
+ (e1, e2) -> e1,
+ LinkedHashMap::new));
ArrayList<FileSinkState> fileState =
Lists.newArrayList(
new FileSinkState(
this.transactionId,
this.uuidPrefix,
this.checkpointId,
- new HashMap<>(this.needMoveFiles),
+ new LinkedHashMap<>(this.needMoveFiles),
commitMap,
this.getTransactionDir(transactionId)));
this.beingWrittenFile.clear();
@@ -363,7 +375,7 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
}
public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow
seaTunnelRow) {
- Map<String, List<String>> dataPartitionDirAndValuesMap =
+ LinkedHashMap<String, List<String>> dataPartitionDirAndValuesMap =
generatorPartitionDir(seaTunnelRow);
String beingWrittenFileKey =
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
// get filePath from beingWrittenFile
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
index bb8d09d30f..d5786ea2f8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
@@ -28,15 +28,14 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import lombok.NonNull;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.LinkedHashMap;
public class ExcelWriteStrategy extends AbstractWriteStrategy {
- private final Map<String, ExcelGenerator> beingWrittenWriter;
+ private final LinkedHashMap<String, ExcelGenerator> beingWrittenWriter;
public ExcelWriteStrategy(FileSinkConfig fileSinkConfig) {
super(fileSinkConfig);
- this.beingWrittenWriter = new HashMap<>();
+ this.beingWrittenWriter = new LinkedHashMap<>();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index c16f613577..c72a38068d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -33,17 +33,18 @@ import lombok.NonNull;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
public class JsonWriteStrategy extends AbstractWriteStrategy {
private final byte[] rowDelimiter;
private SerializationSchema serializationSchema;
- private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
+ private final LinkedHashMap<String, FSDataOutputStream>
beingWrittenOutputStream;
private final Map<String, Boolean> isFirstWrite;
public JsonWriteStrategy(FileSinkConfig textFileSinkConfig) {
super(textFileSinkConfig);
- this.beingWrittenOutputStream = new HashMap<>();
+ this.beingWrittenOutputStream = new LinkedHashMap<>();
this.isFirstWrite = new HashMap<>();
this.rowDelimiter = textFileSinkConfig.getRowDelimiter().getBytes();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 551d02f5b9..0e55b46e26 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -55,16 +55,16 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.ChronoField;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class OrcWriteStrategy extends AbstractWriteStrategy {
- private final Map<String, Writer> beingWrittenWriter;
+ private final LinkedHashMap<String, Writer> beingWrittenWriter;
public OrcWriteStrategy(FileSinkConfig fileSinkConfig) {
super(fileSinkConfig);
- this.beingWrittenWriter = new HashMap<>();
+ this.beingWrittenWriter = new LinkedHashMap<>();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index ce104da800..8c2c938200 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -57,15 +57,14 @@ import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@SuppressWarnings("checkstyle:MagicNumber")
public class ParquetWriteStrategy extends AbstractWriteStrategy {
- private final Map<String, ParquetWriter<GenericRecord>> beingWrittenWriter;
+ private final LinkedHashMap<String, ParquetWriter<GenericRecord>>
beingWrittenWriter;
private AvroSchemaConverter schemaConverter;
private Schema schema;
public static final int[] PRECISION_TO_BYTE_COUNT = new int[38];
@@ -80,7 +79,7 @@ public class ParquetWriteStrategy extends
AbstractWriteStrategy {
public ParquetWriteStrategy(FileSinkConfig fileSinkConfig) {
super(fileSinkConfig);
- this.beingWrittenWriter = new HashMap<>();
+ this.beingWrittenWriter = new LinkedHashMap<>();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 7e94e13c96..f309edb70f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -36,10 +36,11 @@ import lombok.NonNull;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
public class TextWriteStrategy extends AbstractWriteStrategy {
- private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
+ private final LinkedHashMap<String, FSDataOutputStream>
beingWrittenOutputStream;
private final Map<String, Boolean> isFirstWrite;
private final String fieldDelimiter;
private final String rowDelimiter;
@@ -50,7 +51,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
super(fileSinkConfig);
- this.beingWrittenOutputStream = new HashMap<>();
+ this.beingWrittenOutputStream = new LinkedHashMap<>();
this.isFirstWrite = new HashMap<>();
this.fieldDelimiter = fileSinkConfig.getFieldDelimiter();
this.rowDelimiter = fileSinkConfig.getRowDelimiter();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index 6d75de29c6..a64af87d06 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -27,8 +27,8 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.hadoop.conf.Configuration;
import java.io.Serializable;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
public interface WriteStrategy extends Transaction, Serializable {
/**
@@ -67,7 +67,7 @@ public interface WriteStrategy extends Transaction,
Serializable {
* @param seaTunnelRow seaTunnelRow
* @return the map of partition directory
*/
- Map<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow);
+ LinkedHashMap<String, List<String>> generatorPartitionDir(SeaTunnelRow
seaTunnelRow);
/**
* use transaction id generate file name
diff --git
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
index 97476fafc5..620bea134b 100644
---
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
@@ -35,6 +35,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -58,7 +59,7 @@ public class S3RedshiftSinkAggregatedCommitter extends
FileSinkAggregatedCommitt
aggregatedCommitInfos.forEach(
aggregatedCommitInfo -> {
try {
- for (Map.Entry<String, Map<String, String>> entry :
+ for (Map.Entry<String, LinkedHashMap<String, String>>
entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
for (Map.Entry<String, String> mvFileEntry :
entry.getValue().entrySet()) {
@@ -92,7 +93,7 @@ public class S3RedshiftSinkAggregatedCommitter extends
FileSinkAggregatedCommitt
aggregatedCommitInfos.forEach(
aggregatedCommitInfo -> {
try {
- for (Map.Entry<String, Map<String, String>> entry :
+ for (Map.Entry<String, LinkedHashMap<String, String>>
entry :
aggregatedCommitInfo.getTransactionMap().entrySet()) {
// delete the transaction dir
fileSystemUtils.deleteFile(entry.getKey());