This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d61cba233e [Improve][Connector-file-base] Improved file allocation 
algorithm for subtasks. (#8453)
d61cba233e is described below

commit d61cba233ee3a266f6b6aaca1635f4ca75baa48a
Author: Jeremy <[email protected]>
AuthorDate: Fri Jan 10 21:13:38 2025 +0800

    [Improve][Connector-file-base] Improved file allocation algorithm for 
subtasks. (#8453)
---
 .../source/split/FileSourceSplitEnumerator.java    |  29 +++--
 .../split/FileSourceSplitEnumeratorTest.java       | 108 ++++++++++++++++++
 .../seatunnel-translation-base/pom.xml             |   6 +
 .../translation/source/ParallelSourceTest.java     | 122 +++++++++++++++++++++
 4 files changed, 253 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
index 1adffd2505..544c759e21 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
@@ -25,9 +25,12 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 public class FileSourceSplitEnumerator
@@ -36,9 +39,11 @@ public class FileSourceSplitEnumerator
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSourceSplitEnumerator.class);
 
     private final Context<FileSourceSplit> context;
-    private final Set<FileSourceSplit> pendingSplit = new HashSet<>();
+    private final Set<FileSourceSplit> allSplit =
+            new TreeSet<>(Comparator.comparing(FileSourceSplit::splitId));
     private Set<FileSourceSplit> assignedSplit;
     private final List<String> filePaths;
+    private final AtomicInteger assignCount = new AtomicInteger(0);
 
     public FileSourceSplitEnumerator(
             SourceSplitEnumerator.Context<FileSourceSplit> context, 
List<String> filePaths) {
@@ -57,7 +62,7 @@ public class FileSourceSplitEnumerator
 
     @Override
     public void open() {
-        this.pendingSplit.addAll(discoverySplits());
+        this.allSplit.addAll(discoverySplits());
     }
 
     @Override
@@ -82,7 +87,7 @@ public class FileSourceSplitEnumerator
     @Override
     public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
-            pendingSplit.addAll(splits);
+            allSplit.addAll(splits);
             assignSplit(subtaskId);
         }
     }
@@ -91,13 +96,14 @@ public class FileSourceSplitEnumerator
         ArrayList<FileSourceSplit> currentTaskSplits = new ArrayList<>();
         if (context.currentParallelism() == 1) {
             // if parallelism == 1, we should assign all the splits to reader
-            currentTaskSplits.addAll(pendingSplit);
+            currentTaskSplits.addAll(allSplit);
         } else {
-            // if parallelism > 1, according to hashCode of split's id to 
determine whether to
+            // if parallelism > 1, according to polling strategy to determine 
whether to
             // allocate the current task
-            for (FileSourceSplit fileSourceSplit : pendingSplit) {
+            assignCount.set(0);
+            for (FileSourceSplit fileSourceSplit : allSplit) {
                 int splitOwner =
-                        getSplitOwner(fileSourceSplit.splitId(), 
context.currentParallelism());
+                        getSplitOwner(assignCount.getAndIncrement(), 
context.currentParallelism());
                 if (splitOwner == taskId) {
                     currentTaskSplits.add(fileSourceSplit);
                 }
@@ -107,8 +113,7 @@ public class FileSourceSplitEnumerator
         context.assignSplit(taskId, currentTaskSplits);
         // save the state of assigned splits
         assignedSplit.addAll(currentTaskSplits);
-        // remove the assigned splits from pending splits
-        currentTaskSplits.forEach(split -> pendingSplit.remove(split));
+
         LOGGER.info(
                 "SubTask {} is assigned to [{}]",
                 taskId,
@@ -118,13 +123,13 @@ public class FileSourceSplitEnumerator
         context.signalNoMoreSplits(taskId);
     }
 
-    private static int getSplitOwner(String tp, int numReaders) {
-        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    private static int getSplitOwner(int assignCount, int numReaders) {
+        return assignCount % numReaders;
     }
 
     @Override
     public int currentUnassignedSplitSize() {
-        return pendingSplit.size();
+        return allSplit.size() - assignedSplit.size();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..0c81f69c9b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumeratorTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class FileSourceSplitEnumeratorTest {
+
+    @Test
+    void assignSplitRoundTest() {
+        List<String> filePaths = new ArrayList<>();
+        int fileSize = 10;
+        int parallelism = 4;
+
+        for (int i = 0; i < fileSize; i++) {
+            filePaths.add("file" + i + ".txt");
+        }
+
+        Map<Integer, List<FileSourceSplit>> assignSplitMap = new HashMap<>();
+
+        SourceSplitEnumerator.Context<FileSourceSplit> context =
+                new SourceSplitEnumerator.Context<FileSourceSplit>() {
+                    @Override
+                    public int currentParallelism() {
+                        return parallelism;
+                    }
+
+                    @Override
+                    public Set<Integer> registeredReaders() {
+                        return null;
+                    }
+
+                    @Override
+                    public void assignSplit(int subtaskId, 
List<FileSourceSplit> splits) {
+                        assignSplitMap.put(subtaskId, splits);
+                    }
+
+                    @Override
+                    public void signalNoMoreSplits(int subtask) {}
+
+                    @Override
+                    public void sendEventToSourceReader(int subtaskId, 
SourceEvent event) {}
+
+                    @Override
+                    public MetricsContext getMetricsContext() {
+                        return null;
+                    }
+
+                    @Override
+                    public EventListener getEventListener() {
+                        return null;
+                    }
+                };
+
+        FileSourceSplitEnumerator fileSourceSplitEnumerator =
+                new FileSourceSplitEnumerator(context, filePaths);
+        fileSourceSplitEnumerator.open();
+
+        fileSourceSplitEnumerator.run();
+
+        // check all files are assigned
+        
Assertions.assertEquals(fileSourceSplitEnumerator.currentUnassignedSplitSize(), 
0);
+
+        Set<FileSourceSplit> valueSet =
+                
assignSplitMap.values().stream().flatMap(List::stream).collect(Collectors.toSet());
+
+        // check no duplicated assigned split
+        Assertions.assertEquals(valueSet.size(), fileSize);
+
+        // check file allocation balance
+        for (int i = 1; i < parallelism; i++) {
+            Assertions.assertTrue(
+                    Math.abs(assignSplitMap.get(i).size() - 
assignSplitMap.get(i - 1).size()) <= 1,
+                    "The number of files assigned to adjacent subtasks is more 
than 1.");
+        }
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/pom.xml 
b/seatunnel-translation/seatunnel-translation-base/pom.xml
index 87636af0bb..ac37c22c7c 100644
--- a/seatunnel-translation/seatunnel-translation-base/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-base/pom.xml
@@ -31,5 +31,11 @@
             <artifactId>seatunnel-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-base</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/test/java/org/apache/seatunnel/translation/source/ParallelSourceTest.java
 
b/seatunnel-translation/seatunnel-translation-base/src/test/java/org/apache/seatunnel/translation/source/ParallelSourceTest.java
new file mode 100644
index 0000000000..2f0b151330
--- /dev/null
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/test/java/org/apache/seatunnel/translation/source/ParallelSourceTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSourceReader;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class ParallelSourceTest {
+
+    @Test
+    void testParallelSourceForPollingFileAllocation() throws Exception {
+        int fileSize = 15;
+        int parallelism = 4;
+
+        // create file source
+        BaseFileSource baseFileSource =
+                new BaseFileSource() {
+                    @Override
+                    public void prepare(Config pluginConfig) throws 
PrepareFailException {
+                        filePaths = new ArrayList<>();
+                        for (int i = 0; i < fileSize; i++) {
+                            filePaths.add("file" + i + ".txt");
+                        }
+                    }
+
+                    @Override
+                    public String getPluginName() {
+                        return FileSystemType.HDFS.getFileSystemPluginName();
+                    }
+                };
+
+        // prepare files
+        baseFileSource.prepare(null);
+
+        ParallelSource parallelSource =
+                new ParallelSource(baseFileSource, null, parallelism, 
"parallel-source-test", 0);
+        ParallelSource parallelSource2 =
+                new ParallelSource(baseFileSource, null, parallelism, 
"parallel-source-test2", 1);
+        ParallelSource parallelSource3 =
+                new ParallelSource(baseFileSource, null, parallelism, 
"parallel-source-test3", 2);
+        ParallelSource parallelSource4 =
+                new ParallelSource(baseFileSource, null, parallelism, 
"parallel-source-test4", 3);
+
+        parallelSource.open();
+        parallelSource2.open();
+        parallelSource3.open();
+        parallelSource4.open();
+
+        // execute file allocation process
+        parallelSource.splitEnumerator.run();
+        parallelSource2.splitEnumerator.run();
+        parallelSource3.splitEnumerator.run();
+        parallelSource4.splitEnumerator.run();
+
+        // Gets the splits assigned for each reader
+        List<FileSourceSplit> sourceSplits =
+                ((BaseFileSourceReader) 
parallelSource.reader).snapshotState(0);
+        List<FileSourceSplit> sourceSplits2 =
+                ((BaseFileSourceReader) 
parallelSource2.reader).snapshotState(0);
+        List<FileSourceSplit> sourceSplits3 =
+                ((BaseFileSourceReader) 
parallelSource3.reader).snapshotState(0);
+        List<FileSourceSplit> sourceSplits4 =
+                ((BaseFileSourceReader) 
parallelSource4.reader).snapshotState(0);
+
+        log.info(
+                "parallel source1 splits => {}",
+                
sourceSplits.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));
+
+        log.info(
+                "parallel source2 splits => {}",
+                
sourceSplits2.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));
+
+        log.info(
+                "parallel source3 splits => {}",
+                
sourceSplits3.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));
+
+        log.info(
+                "parallel source4 splits => {}",
+                
sourceSplits4.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));
+
+        // check that there are no duplicate file assignments
+        Set<FileSourceSplit> splitSet = new HashSet<>();
+        splitSet.addAll(sourceSplits);
+        splitSet.addAll(sourceSplits2);
+        splitSet.addAll(sourceSplits3);
+        splitSet.addAll(sourceSplits4);
+
+        Assertions.assertEquals(splitSet.size(), fileSize);
+    }
+}

Reply via email to