This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1e18a8c530 [Improve][Connector[File] Optimize files commit order 
(#5045)
1e18a8c530 is described below

commit 1e18a8c5303fb071d6c4d5d8dc893b8882c3bc4a
Author: hailin0 <[email protected]>
AuthorDate: Mon Jul 24 15:35:27 2023 +0800

    [Improve][Connector[File] Optimize files commit order (#5045)
    
    Before using `HashMap` store files path, so every checkpoint file commit is 
out of order.
    
    Now switch to using `LinkedHashMap` to ensure that files are commit in the 
generated order
---
 .../seatunnel/file/sink/BaseFileSinkWriter.java    |  6 +-
 .../file/sink/commit/FileAggregatedCommitInfo.java |  6 +-
 .../seatunnel/file/sink/commit/FileCommitInfo.java |  6 +-
 .../sink/commit/FileSinkAggregatedCommitter.java   | 15 +++--
 .../file/sink/commit/FileSinkCommitter.java        | 75 ----------------------
 .../seatunnel/file/sink/state/FileSinkState.java   |  6 +-
 .../file/sink/writer/AbstractWriteStrategy.java    | 44 ++++++++-----
 .../file/sink/writer/ExcelWriteStrategy.java       |  7 +-
 .../file/sink/writer/JsonWriteStrategy.java        |  5 +-
 .../file/sink/writer/OrcWriteStrategy.java         |  6 +-
 .../file/sink/writer/ParquetWriteStrategy.java     |  7 +-
 .../file/sink/writer/TextWriteStrategy.java        |  5 +-
 .../seatunnel/file/sink/writer/WriteStrategy.java  |  4 +-
 .../commit/S3RedshiftSinkAggregatedCommitter.java  |  5 +-
 14 files changed, 68 insertions(+), 129 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 7102e954a4..22200249f6 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -34,14 +34,14 @@ import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
 public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, 
FileCommitInfo, FileSinkState> {
-    private final WriteStrategy writeStrategy;
+    protected final WriteStrategy writeStrategy;
     private final FileSystemUtils fileSystemUtils;
 
     @SuppressWarnings("checkstyle:MagicNumber")
@@ -67,7 +67,7 @@ public class BaseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, FileCommitIn
                 List<String> transactions = findTransactionList(jobId, 
uuidPrefix);
                 FileSinkAggregatedCommitter fileSinkAggregatedCommitter =
                         new FileSinkAggregatedCommitter(fileSystemUtils);
-                HashMap<String, FileSinkState> fileStatesMap = new HashMap<>();
+                LinkedHashMap<String, FileSinkState> fileStatesMap = new 
LinkedHashMap<>();
                 fileSinkStates.forEach(
                         fileSinkState ->
                                 
fileStatesMap.put(fileSinkState.getTransactionId(), fileSinkState));
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
index 16d94a1f63..5ca3b30fad 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileAggregatedCommitInfo.java
@@ -21,8 +21,8 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
 @Data
 @AllArgsConstructor
@@ -34,7 +34,7 @@ public class FileAggregatedCommitInfo implements Serializable 
{
      *
      * <p>V is the target file path of the data file.
      */
-    private final Map<String, Map<String, String>> transactionMap;
+    private final LinkedHashMap<String, LinkedHashMap<String, String>> 
transactionMap;
 
     /**
      * Storage the partition information in map.
@@ -43,5 +43,5 @@ public class FileAggregatedCommitInfo implements Serializable 
{
      *
      * <p>V is the list of partition column's values.
      */
-    private final Map<String, List<String>> partitionDirAndValuesMap;
+    private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java
index 86c433b8f5..27e74ff0a8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileCommitInfo.java
@@ -21,8 +21,8 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
 @Data
 @AllArgsConstructor
@@ -34,7 +34,7 @@ public class FileCommitInfo implements Serializable {
      *
      * <p>V is the target file path of the data file.
      */
-    private final Map<String, String> needMoveFiles;
+    private final LinkedHashMap<String, String> needMoveFiles;
 
     /**
      * Storage the partition information in map.
@@ -43,7 +43,7 @@ public class FileCommitInfo implements Serializable {
      *
      * <p>V is the list of partition column's values.
      */
-    private final Map<String, List<String>> partitionDirAndValuesMap;
+    private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
 
     /** Storage the transaction directory */
     private final String transactionDir;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index b12ef1165a..a076188e2a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -24,7 +24,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -44,7 +44,7 @@ public class FileSinkAggregatedCommitter
         aggregatedCommitInfos.forEach(
                 aggregatedCommitInfo -> {
                     try {
-                        for (Map.Entry<String, Map<String, String>> entry :
+                        for (Map.Entry<String, LinkedHashMap<String, String>> 
entry :
                                 
aggregatedCommitInfo.getTransactionMap().entrySet()) {
                             for (Map.Entry<String, String> mvFileEntry :
                                     entry.getValue().entrySet()) {
@@ -77,13 +77,14 @@ public class FileSinkAggregatedCommitter
         if (commitInfos == null || commitInfos.size() == 0) {
             return null;
         }
-        Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
-        Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>();
+        LinkedHashMap<String, LinkedHashMap<String, String>> 
aggregateCommitInfo =
+                new LinkedHashMap<>();
+        LinkedHashMap<String, List<String>> partitionDirAndValuesMap = new 
LinkedHashMap<>();
         commitInfos.forEach(
                 commitInfo -> {
-                    Map<String, String> needMoveFileMap =
+                    LinkedHashMap<String, String> needMoveFileMap =
                             aggregateCommitInfo.computeIfAbsent(
-                                    commitInfo.getTransactionDir(), k -> new 
HashMap<>());
+                                    commitInfo.getTransactionDir(), k -> new 
LinkedHashMap<>());
                     needMoveFileMap.putAll(commitInfo.getNeedMoveFiles());
                     if (commitInfo.getPartitionDirAndValuesMap() != null
                             && 
!commitInfo.getPartitionDirAndValuesMap().isEmpty()) {
@@ -109,7 +110,7 @@ public class FileSinkAggregatedCommitter
         aggregatedCommitInfos.forEach(
                 aggregatedCommitInfo -> {
                     try {
-                        for (Map.Entry<String, Map<String, String>> entry :
+                        for (Map.Entry<String, LinkedHashMap<String, String>> 
entry :
                                 
aggregatedCommitInfo.getTransactionMap().entrySet()) {
                             // rollback the file
                             for (Map.Entry<String, String> mvFileEntry :
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
deleted file mode 100644
index 6525b5e7d4..0000000000
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkCommitter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
-
-import org.apache.seatunnel.api.sink.SinkCommitter;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/** Deprecated interface since 2.3.0-beta, now used {@link 
FileSinkAggregatedCommitter} */
-@Deprecated
-public class FileSinkCommitter implements SinkCommitter<FileCommitInfo> {
-    private final FileSystemUtils fileSystemUtils;
-
-    public FileSinkCommitter(FileSystemUtils fileSystemUtils) {
-        this.fileSystemUtils = fileSystemUtils;
-    }
-
-    @Override
-    public List<FileCommitInfo> commit(List<FileCommitInfo> commitInfos) 
throws IOException {
-        ArrayList<FileCommitInfo> failedCommitInfos = new ArrayList<>();
-        for (FileCommitInfo commitInfo : commitInfos) {
-            Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
-            needMoveFiles.forEach(
-                    (k, v) -> {
-                        try {
-                            fileSystemUtils.renameFile(k, v, true);
-                        } catch (IOException e) {
-                            failedCommitInfos.add(commitInfo);
-                        }
-                    });
-            fileSystemUtils.deleteFile(commitInfo.getTransactionDir());
-        }
-        return failedCommitInfos;
-    }
-
-    /**
-     * Abort the transaction, this method will be called (**Only** on Spark 
engine) when the commit
-     * is failed.
-     *
-     * @param commitInfos The list of commit message, used to abort the commit.
-     * @throws IOException throw IOException when close failed.
-     */
-    @Override
-    public void abort(List<FileCommitInfo> commitInfos) throws IOException {
-        for (FileCommitInfo commitInfo : commitInfos) {
-            Map<String, String> needMoveFiles = commitInfo.getNeedMoveFiles();
-            for (Map.Entry<String, String> entry : needMoveFiles.entrySet()) {
-                if (fileSystemUtils.fileExist(entry.getValue())
-                        && !fileSystemUtils.fileExist(entry.getKey())) {
-                    fileSystemUtils.renameFile(entry.getValue(), 
entry.getKey(), true);
-                }
-            }
-            fileSystemUtils.deleteFile(commitInfo.getTransactionDir());
-        }
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
index 7d28df2305..34ca13625f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/state/FileSinkState.java
@@ -21,8 +21,8 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.io.Serializable;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
 @Data
 @AllArgsConstructor
@@ -30,7 +30,7 @@ public class FileSinkState implements Serializable {
     private final String transactionId;
     private final String uuidPrefix;
     private final Long checkpointId;
-    private final Map<String, String> needMoveFiles;
-    private final Map<String, List<String>> partitionDirAndValuesMap;
+    private final LinkedHashMap<String, String> needMoveFiles;
+    private final LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
     private final String transactionDir;
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 6820d28d85..f3160eec7e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -50,6 +50,7 @@ import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -76,9 +77,9 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
     protected String uuidPrefix;
 
     protected String transactionDirectory;
-    protected Map<String, String> needMoveFiles;
-    protected Map<String, String> beingWrittenFile = new HashMap<>();
-    private Map<String, List<String>> partitionDirAndValuesMap;
+    protected LinkedHashMap<String, String> needMoveFiles;
+    protected LinkedHashMap<String, String> beingWrittenFile = new 
LinkedHashMap<>();
+    private LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
     protected SeaTunnelRowType seaTunnelRowType;
 
     // Checkpoint id from engine is start with 1
@@ -111,13 +112,18 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
     @Override
     public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException 
{
         if (currentBatchSize >= batchSize) {
-            this.partId++;
+            newFilePart();
             currentBatchSize = 0;
-            beingWrittenFile.clear();
         }
         currentBatchSize++;
     }
 
+    public synchronized void newFilePart() {
+        this.partId++;
+        beingWrittenFile.clear();
+        log.debug("new file part: {}", partId);
+    }
+
     protected SeaTunnelRowType buildSchemaWithRowType(
             SeaTunnelRowType seaTunnelRowType, List<Integer> sinkColumnsIndex) 
{
         SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
@@ -177,9 +183,9 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
      * @return the map of partition directory
      */
     @Override
-    public Map<String, List<String>> generatorPartitionDir(SeaTunnelRow 
seaTunnelRow) {
+    public LinkedHashMap<String, List<String>> 
generatorPartitionDir(SeaTunnelRow seaTunnelRow) {
         List<Integer> partitionFieldsIndexInRow = 
fileSinkConfig.getPartitionFieldsIndexInRow();
-        Map<String, List<String>> partitionDirAndValuesMap = new HashMap<>(1);
+        LinkedHashMap<String, List<String>> partitionDirAndValuesMap = new 
LinkedHashMap<>(1);
         if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
             partitionDirAndValuesMap.put(BaseSinkConfig.NON_PARTITION, null);
             return partitionDirAndValuesMap;
@@ -258,12 +264,15 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
     @Override
     public Optional<FileCommitInfo> prepareCommit() {
         this.finishAndCloseFile();
-        Map<String, String> commitMap = new HashMap<>(this.needMoveFiles);
-        Map<String, List<String>> copyMap =
+        LinkedHashMap<String, String> commitMap = new 
LinkedHashMap<>(this.needMoveFiles);
+        LinkedHashMap<String, List<String>> copyMap =
                 this.partitionDirAndValuesMap.entrySet().stream()
                         .collect(
                                 Collectors.toMap(
-                                        Map.Entry::getKey, e -> new 
ArrayList<>(e.getValue())));
+                                        Map.Entry::getKey,
+                                        e -> new ArrayList<>(e.getValue()),
+                                        (e1, e2) -> e1,
+                                        LinkedHashMap::new));
         return Optional.of(new FileCommitInfo(commitMap, copyMap, 
transactionDirectory));
     }
 
@@ -301,8 +310,8 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
         this.checkpointId = checkpointId;
         this.transactionId = getTransactionId(checkpointId);
         this.transactionDirectory = getTransactionDir(this.transactionId);
-        this.needMoveFiles = new HashMap<>();
-        this.partitionDirAndValuesMap = new HashMap<>();
+        this.needMoveFiles = new LinkedHashMap<>();
+        this.partitionDirAndValuesMap = new LinkedHashMap<>();
     }
 
     private String getTransactionId(Long checkpointId) {
@@ -325,18 +334,21 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
      */
     @Override
     public List<FileSinkState> snapshotState(long checkpointId) {
-        Map<String, List<String>> commitMap =
+        LinkedHashMap<String, List<String>> commitMap =
                 this.partitionDirAndValuesMap.entrySet().stream()
                         .collect(
                                 Collectors.toMap(
-                                        Map.Entry::getKey, e -> new 
ArrayList<>(e.getValue())));
+                                        Map.Entry::getKey,
+                                        e -> new ArrayList<>(e.getValue()),
+                                        (e1, e2) -> e1,
+                                        LinkedHashMap::new));
         ArrayList<FileSinkState> fileState =
                 Lists.newArrayList(
                         new FileSinkState(
                                 this.transactionId,
                                 this.uuidPrefix,
                                 this.checkpointId,
-                                new HashMap<>(this.needMoveFiles),
+                                new LinkedHashMap<>(this.needMoveFiles),
                                 commitMap,
                                 this.getTransactionDir(transactionId)));
         this.beingWrittenFile.clear();
@@ -363,7 +375,7 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
     }
 
     public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow 
seaTunnelRow) {
-        Map<String, List<String>> dataPartitionDirAndValuesMap =
+        LinkedHashMap<String, List<String>> dataPartitionDirAndValuesMap =
                 generatorPartitionDir(seaTunnelRow);
         String beingWrittenFileKey = 
dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
         // get filePath from beingWrittenFile
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
index bb8d09d30f..d5786ea2f8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java
@@ -28,15 +28,14 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import lombok.NonNull;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.LinkedHashMap;
 
 public class ExcelWriteStrategy extends AbstractWriteStrategy {
-    private final Map<String, ExcelGenerator> beingWrittenWriter;
+    private final LinkedHashMap<String, ExcelGenerator> beingWrittenWriter;
 
     public ExcelWriteStrategy(FileSinkConfig fileSinkConfig) {
         super(fileSinkConfig);
-        this.beingWrittenWriter = new HashMap<>();
+        this.beingWrittenWriter = new LinkedHashMap<>();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index c16f613577..c72a38068d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -33,17 +33,18 @@ import lombok.NonNull;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 public class JsonWriteStrategy extends AbstractWriteStrategy {
     private final byte[] rowDelimiter;
     private SerializationSchema serializationSchema;
-    private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
+    private final LinkedHashMap<String, FSDataOutputStream> 
beingWrittenOutputStream;
     private final Map<String, Boolean> isFirstWrite;
 
     public JsonWriteStrategy(FileSinkConfig textFileSinkConfig) {
         super(textFileSinkConfig);
-        this.beingWrittenOutputStream = new HashMap<>();
+        this.beingWrittenOutputStream = new LinkedHashMap<>();
         this.isFirstWrite = new HashMap<>();
         this.rowDelimiter = textFileSinkConfig.getRowDelimiter().getBytes();
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 551d02f5b9..0e55b46e26 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -55,16 +55,16 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.temporal.ChronoField;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 public class OrcWriteStrategy extends AbstractWriteStrategy {
-    private final Map<String, Writer> beingWrittenWriter;
+    private final LinkedHashMap<String, Writer> beingWrittenWriter;
 
     public OrcWriteStrategy(FileSinkConfig fileSinkConfig) {
         super(fileSinkConfig);
-        this.beingWrittenWriter = new HashMap<>();
+        this.beingWrittenWriter = new LinkedHashMap<>();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index ce104da800..8c2c938200 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -57,15 +57,14 @@ import java.nio.ByteBuffer;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 @SuppressWarnings("checkstyle:MagicNumber")
 public class ParquetWriteStrategy extends AbstractWriteStrategy {
-    private final Map<String, ParquetWriter<GenericRecord>> beingWrittenWriter;
+    private final LinkedHashMap<String, ParquetWriter<GenericRecord>> 
beingWrittenWriter;
     private AvroSchemaConverter schemaConverter;
     private Schema schema;
     public static final int[] PRECISION_TO_BYTE_COUNT = new int[38];
@@ -80,7 +79,7 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
 
     public ParquetWriteStrategy(FileSinkConfig fileSinkConfig) {
         super(fileSinkConfig);
-        this.beingWrittenWriter = new HashMap<>();
+        this.beingWrittenWriter = new LinkedHashMap<>();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 7e94e13c96..f309edb70f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -36,10 +36,11 @@ import lombok.NonNull;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 public class TextWriteStrategy extends AbstractWriteStrategy {
-    private final Map<String, FSDataOutputStream> beingWrittenOutputStream;
+    private final LinkedHashMap<String, FSDataOutputStream> 
beingWrittenOutputStream;
     private final Map<String, Boolean> isFirstWrite;
     private final String fieldDelimiter;
     private final String rowDelimiter;
@@ -50,7 +51,7 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
 
     public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
         super(fileSinkConfig);
-        this.beingWrittenOutputStream = new HashMap<>();
+        this.beingWrittenOutputStream = new LinkedHashMap<>();
         this.isFirstWrite = new HashMap<>();
         this.fieldDelimiter = fileSinkConfig.getFieldDelimiter();
         this.rowDelimiter = fileSinkConfig.getRowDelimiter();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index 6d75de29c6..a64af87d06 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -27,8 +27,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import java.io.Serializable;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
 public interface WriteStrategy extends Transaction, Serializable {
     /**
@@ -67,7 +67,7 @@ public interface WriteStrategy extends Transaction, 
Serializable {
      * @param seaTunnelRow seaTunnelRow
      * @return the map of partition directory
      */
-    Map<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow);
+    LinkedHashMap<String, List<String>> generatorPartitionDir(SeaTunnelRow 
seaTunnelRow);
 
     /**
      * use transaction id generate file name
diff --git 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
index 97476fafc5..620bea134b 100644
--- 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
@@ -35,6 +35,7 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -58,7 +59,7 @@ public class S3RedshiftSinkAggregatedCommitter extends 
FileSinkAggregatedCommitt
         aggregatedCommitInfos.forEach(
                 aggregatedCommitInfo -> {
                     try {
-                        for (Map.Entry<String, Map<String, String>> entry :
+                        for (Map.Entry<String, LinkedHashMap<String, String>> 
entry :
                                 
aggregatedCommitInfo.getTransactionMap().entrySet()) {
                             for (Map.Entry<String, String> mvFileEntry :
                                     entry.getValue().entrySet()) {
@@ -92,7 +93,7 @@ public class S3RedshiftSinkAggregatedCommitter extends 
FileSinkAggregatedCommitt
         aggregatedCommitInfos.forEach(
                 aggregatedCommitInfo -> {
                     try {
-                        for (Map.Entry<String, Map<String, String>> entry :
+                        for (Map.Entry<String, LinkedHashMap<String, String>> 
entry :
                                 
aggregatedCommitInfo.getTransactionMap().entrySet()) {
                             // delete the transaction dir
                             fileSystemUtils.deleteFile(entry.getKey());

Reply via email to