This is an automated email from the ASF dual-hosted git repository.
davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 59a3d1cea8 [Improve][connector-elasticsearch] Improved elasticsearch
source enumerator splits allocation algorithm for subtasks (#10869)
59a3d1cea8 is described below
commit 59a3d1cea8ebc4addc3067fed65779bbab0748a2
Author: JeremyXin <[email protected]>
AuthorDate: Thu Jun 11 21:17:38 2026 +0800
[Improve][connector-elasticsearch] Improved elasticsearch source enumerator
splits allocation algorithm for subtasks (#10869)
---
.../source/ElasticsearchSourceSplitEnumerator.java | 22 +++-
.../ElasticsearchSourceSplitEnumeratorTest.java | 134 +++++++++++++++++++++
2 files changed, 151 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index ea1118734f..c62fbe09da 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -37,6 +37,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@Slf4j
@@ -56,6 +57,7 @@ public class ElasticsearchSourceSplitEnumerator
private final List<ElasticsearchConfig> elasticsearchConfigs;
private volatile boolean shouldEnumerate;
+ private final AtomicInteger assignCount = new AtomicInteger(0);
public ElasticsearchSourceSplitEnumerator(
SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context,
@@ -106,15 +108,25 @@ public class ElasticsearchSourceSplitEnumerator
private void addPendingSplit(Collection<ElasticsearchSourceSplit> splits) {
int readerCount = context.currentParallelism();
- for (ElasticsearchSourceSplit split : splits) {
- int ownerReader = getSplitOwner(split.splitId(), readerCount);
+
+ List<ElasticsearchSourceSplit> sortedSplits =
+ splits.stream()
+
.sorted(Comparator.comparing(ElasticsearchSourceSplit::splitId))
+ .collect(Collectors.toList());
+
+ for (ElasticsearchSourceSplit split : sortedSplits) {
+ int ownerReader = getSplitOwner(assignCount.getAndIncrement(),
readerCount);
log.info("Assigning {} to {} reader.", split, ownerReader);
pendingSplit.computeIfAbsent(ownerReader, r -> new
ArrayList<>()).add(split);
}
}
- private static int getSplitOwner(String tp, int numReaders) {
- return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+ private void addPendingSplit(Collection<ElasticsearchSourceSplit> splits,
int ownerReader) {
+ pendingSplit.computeIfAbsent(ownerReader, r -> new
ArrayList<>()).addAll(splits);
+ }
+
+ private static int getSplitOwner(int assignCount, int numReaders) {
+ return assignCount % numReaders;
}
private void assignSplit(Collection<Integer> readers) {
@@ -177,7 +189,7 @@ public class ElasticsearchSourceSplitEnumerator
@Override
public void addSplitsBack(List<ElasticsearchSourceSplit> splits, int
subtaskId) {
if (!splits.isEmpty()) {
- addPendingSplit(splits);
+ addPendingSplit(splits, subtaskId);
assignSplit(Collections.singletonList(subtaskId));
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..5f17658ecd
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumeratorTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ElasticsearchSourceSplitEnumeratorTest {
+
+ @Test
+ public void shouldBalanceSplitsEvenlyAcrossReaders() throws Exception {
+ TestingContext context = new TestingContext(4);
+ ElasticsearchSourceSplitEnumerator enumerator =
+ new ElasticsearchSourceSplitEnumerator(context, null,
Collections.emptyList());
+
+ Method addPendingSplit =
+ ElasticsearchSourceSplitEnumerator.class.getDeclaredMethod(
+ "addPendingSplit", java.util.Collection.class);
+ addPendingSplit.setAccessible(true);
+ addPendingSplit.invoke(enumerator, buildSplits(10));
+
+ ElasticsearchSourceState state = enumerator.snapshotState(1L);
+ Map<Integer, List<ElasticsearchSourceSplit>> pendingSplits =
state.getPendingSplit();
+
+ Assertions.assertEquals(4, pendingSplits.size());
+ Assertions.assertEquals(3, pendingSplits.get(0).size());
+ Assertions.assertEquals(3, pendingSplits.get(1).size());
+ Assertions.assertEquals(2, pendingSplits.get(2).size());
+ Assertions.assertEquals(2, pendingSplits.get(3).size());
+ }
+
+ @Test
+ public void shouldReassignReturnedSplitsToOriginalReader() throws
Exception {
+ TestingContext context = new TestingContext(3);
+ ElasticsearchSourceSplitEnumerator enumerator =
+ new ElasticsearchSourceSplitEnumerator(context, null,
Collections.emptyList());
+
+ enumerator.addSplitsBack(buildSplits(3), 2);
+
+ Assertions.assertEquals(0, context.getAssignedSplitCount(0));
+ Assertions.assertEquals(0, context.getAssignedSplitCount(1));
+ Assertions.assertEquals(3, context.getAssignedSplitCount(2));
+
+ ElasticsearchSourceState state = enumerator.snapshotState(1L);
+ Assertions.assertTrue(state.getPendingSplit().isEmpty());
+ }
+
+ private List<ElasticsearchSourceSplit> buildSplits(int size) {
+ List<ElasticsearchSourceSplit> splits = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ splits.add(new ElasticsearchSourceSplit("split-" + i, null));
+ }
+ return splits;
+ }
+
+ private static class TestingContext
+ implements SourceSplitEnumerator.Context<ElasticsearchSourceSplit>
{
+ private final int parallelism;
+ private final Set<Integer> readers;
+ private final Map<Integer, List<ElasticsearchSourceSplit>> assignments
= new HashMap<>();
+
+ private TestingContext(int parallelism) {
+ this.parallelism = parallelism;
+ this.readers = new LinkedHashSet<>();
+ for (int i = 0; i < parallelism; i++) {
+ readers.add(i);
+ }
+ }
+
+ @Override
+ public int currentParallelism() {
+ return parallelism;
+ }
+
+ @Override
+ public Set<Integer> registeredReaders() {
+ return readers;
+ }
+
+ @Override
+ public void assignSplit(int subtaskId, List<ElasticsearchSourceSplit>
splits) {
+ assignments.computeIfAbsent(subtaskId, ignored -> new
ArrayList<>()).addAll(splits);
+ }
+
+ @Override
+ public void signalNoMoreSplits(int subtask) {}
+
+ @Override
+ public void sendEventToSourceReader(int subtaskId, SourceEvent event)
{}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return null;
+ }
+
+ @Override
+ public EventListener getEventListener() {
+ return null;
+ }
+
+ private int getAssignedSplitCount(int subtaskId) {
+ return assignments.getOrDefault(subtaskId,
Collections.emptyList()).size();
+ }
+ }
+}