This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 44a12cc55c [Improve][connector-file-base] Improved multiple table file
source allocation algorithm for subtasks (#8878)
44a12cc55c is described below
commit 44a12cc55c7ca06c902d68a3df2d646f266f544c
Author: JeremyXin <[email protected]>
AuthorDate: Thu Mar 6 09:57:51 2025 +0800
[Improve][connector-file-base] Improved multiple table file source
allocation algorithm for subtasks (#8878)
---
.../MultipleTableFileSourceSplitEnumerator.java | 35 +++---
...MultipleTableFileSourceSplitEnumeratorTest.java | 124 +++++++++++++++++++++
2 files changed, 144 insertions(+), 15 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
index 64864dd7ac..e659109b41 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
@@ -28,10 +28,13 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@Slf4j
@@ -39,9 +42,10 @@ public class MultipleTableFileSourceSplitEnumerator
implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
private final Context<FileSourceSplit> context;
- private final Set<FileSourceSplit> pendingSplit;
+ private final Set<FileSourceSplit> allSplit;
private final Set<FileSourceSplit> assignedSplit;
private final Map<String, List<String>> filePathMap;
+ private final AtomicInteger assignCount = new AtomicInteger(0);
public MultipleTableFileSourceSplitEnumerator(
Context<FileSourceSplit> context,
@@ -59,7 +63,7 @@ public class MultipleTableFileSourceSplitEnumerator
.toString(),
BaseFileSourceConfig::getFilePaths));
this.assignedSplit = new HashSet<>();
- this.pendingSplit = new HashSet<>();
+ this.allSplit = new
TreeSet<>(Comparator.comparing(FileSourceSplit::splitId));
}
public MultipleTableFileSourceSplitEnumerator(
@@ -75,13 +79,13 @@ public class MultipleTableFileSourceSplitEnumerator
if (CollectionUtils.isEmpty(splits)) {
return;
}
- pendingSplit.addAll(splits);
+ allSplit.addAll(splits);
assignSplit(subtaskId);
}
@Override
public int currentUnassignedSplitSize() {
- return pendingSplit.size();
+ return allSplit.size() - assignedSplit.size();
}
@Override
@@ -93,7 +97,7 @@ public class MultipleTableFileSourceSplitEnumerator
String tableId = filePathEntry.getKey();
List<String> filePaths = filePathEntry.getValue();
for (String filePath : filePaths) {
- pendingSplit.add(new FileSourceSplit(tableId, filePath));
+ allSplit.add(new FileSourceSplit(tableId, filePath));
}
}
assignSplit(subtaskId);
@@ -113,13 +117,14 @@ public class MultipleTableFileSourceSplitEnumerator
List<FileSourceSplit> currentTaskSplits = new ArrayList<>();
if (context.currentParallelism() == 1) {
// if parallelism == 1, we should assign all the splits to reader
- currentTaskSplits.addAll(pendingSplit);
+ currentTaskSplits.addAll(allSplit);
} else {
- // if parallelism > 1, according to hashCode of split's id to
determine whether to
+ // if parallelism > 1, according to polling strategy to determine
whether to
// allocate the current task
- for (FileSourceSplit fileSourceSplit : pendingSplit) {
+ assignCount.set(0);
+ for (FileSourceSplit fileSourceSplit : allSplit) {
int splitOwner =
- getSplitOwner(fileSourceSplit.splitId(),
context.currentParallelism());
+ getSplitOwner(assignCount.getAndIncrement(),
context.currentParallelism());
if (splitOwner == taskId) {
currentTaskSplits.add(fileSourceSplit);
}
@@ -129,19 +134,19 @@ public class MultipleTableFileSourceSplitEnumerator
context.assignSplit(taskId, currentTaskSplits);
// save the state of assigned splits
assignedSplit.addAll(currentTaskSplits);
- // remove the assigned splits from pending splits
- currentTaskSplits.forEach(pendingSplit::remove);
+
log.info(
- "SubTask {} is assigned to [{}]",
+ "SubTask {} is assigned to [{}], size {}",
taskId,
currentTaskSplits.stream()
.map(FileSourceSplit::splitId)
- .collect(Collectors.joining(",")));
+ .collect(Collectors.joining(",")),
+ currentTaskSplits.size());
context.signalNoMoreSplits(taskId);
}
- private static int getSplitOwner(String tp, int numReaders) {
- return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+ private static int getSplitOwner(int assignCount, int numReaders) {
+ return assignCount % numReaders;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..80658b0576
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumeratorTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.shade.com.google.common.collect.Maps;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+@Slf4j
+public class MultipleTableFileSourceSplitEnumeratorTest {
+
+ @Test
+ void assignSplitTest() {
+ int parallelism = 4;
+ int fileSize = 50;
+
+ Map<String, List<String>> filePathMap = new HashMap<>();
+ List<String> filePaths = new ArrayList<>();
+ IntStream.range(0, fileSize).forEach(i -> filePaths.add("filePath" +
i));
+ filePathMap.put("table1", filePaths);
+
+ BaseFileSourceConfig baseFileSourceConfig =
Mockito.mock(BaseFileSourceConfig.class);
+
+
Mockito.when(baseFileSourceConfig.getFilePaths()).thenReturn(filePaths);
+
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", "test", "hive_table1"),
+ null,
+ Maps.newHashMap(),
+ Lists.newArrayList(),
+ null);
+
Mockito.when(baseFileSourceConfig.getCatalogTable()).thenReturn(catalogTable);
+
+ BaseMultipleTableFileSourceConfig baseMultipleTableFileSourceConfig =
+ Mockito.mock(BaseMultipleTableFileSourceConfig.class);
+
+ Mockito.when(baseMultipleTableFileSourceConfig.getFileSourceConfigs())
+ .thenReturn(Arrays.asList(baseFileSourceConfig));
+
+ SourceSplitEnumerator.Context<FileSourceSplit> context =
+ Mockito.mock(SourceSplitEnumerator.Context.class);
+
+ Mockito.when(context.currentParallelism()).thenReturn(parallelism);
+ MultipleTableFileSourceSplitEnumerator enumerator =
+ new MultipleTableFileSourceSplitEnumerator(
+ context, baseMultipleTableFileSourceConfig);
+
+ AtomicInteger unAssignedSplitSize = new AtomicInteger(fileSize);
+ IntStream.range(0, parallelism)
+ .forEach(
+ id -> {
+ enumerator.registerReader(id);
+
+ // check the number of files assigned each time
+ Assertions.assertEquals(
+ enumerator.currentUnassignedSplitSize(),
+ unAssignedSplitSize.get()
+ - allocateFiles(id, parallelism,
fileSize));
+
unAssignedSplitSize.set(enumerator.currentUnassignedSplitSize());
+
+ log.info(
+ "unAssigned splits => {}, allocate files
=> {}",
+ enumerator.currentUnassignedSplitSize(),
+ allocateFiles(id, parallelism, fileSize));
+ });
+
+ // check no duplicate file assigned
+ Assertions.assertEquals(0, enumerator.currentUnassignedSplitSize());
+ }
+
+ /**
+ * calculate the number of files assigned each time
+ *
+ * @param id id
+ * @param parallelism parallelism
+ * @param fileSize file size
+ * @return
+ */
+ public int allocateFiles(int id, int parallelism, int fileSize) {
+ int filesPerIteration = fileSize / parallelism;
+ int remainder = fileSize % parallelism;
+
+ if (id < remainder) {
+ return filesPerIteration + 1;
+ } else {
+ return filesPerIteration;
+ }
+ }
+}