This is an automated email from the ASF dual-hosted git repository.

davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 59a3d1cea8 [Improve][connector-elasticsearch] Improved elasticsearch 
source enumerator splits allocation algorithm for subtasks (#10869)
59a3d1cea8 is described below

commit 59a3d1cea8ebc4addc3067fed65779bbab0748a2
Author: JeremyXin <[email protected]>
AuthorDate: Thu Jun 11 21:17:38 2026 +0800

    [Improve][connector-elasticsearch] Improved elasticsearch source enumerator 
splits allocation algorithm for subtasks (#10869)
---
 .../source/ElasticsearchSourceSplitEnumerator.java |  22 +++-
 .../ElasticsearchSourceSplitEnumeratorTest.java    | 134 +++++++++++++++++++++
 2 files changed, 151 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index ea1118734f..c62fbe09da 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -37,6 +37,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -56,6 +57,7 @@ public class ElasticsearchSourceSplitEnumerator
     private final List<ElasticsearchConfig> elasticsearchConfigs;
 
     private volatile boolean shouldEnumerate;
+    private final AtomicInteger assignCount = new AtomicInteger(0);
 
     public ElasticsearchSourceSplitEnumerator(
             SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context,
@@ -106,15 +108,25 @@ public class ElasticsearchSourceSplitEnumerator
 
     private void addPendingSplit(Collection<ElasticsearchSourceSplit> splits) {
         int readerCount = context.currentParallelism();
-        for (ElasticsearchSourceSplit split : splits) {
-            int ownerReader = getSplitOwner(split.splitId(), readerCount);
+
+        List<ElasticsearchSourceSplit> sortedSplits =
+                splits.stream()
+                        
.sorted(Comparator.comparing(ElasticsearchSourceSplit::splitId))
+                        .collect(Collectors.toList());
+
+        for (ElasticsearchSourceSplit split : sortedSplits) {
+            int ownerReader = getSplitOwner(assignCount.getAndIncrement(), 
readerCount);
             log.info("Assigning {} to {} reader.", split, ownerReader);
             pendingSplit.computeIfAbsent(ownerReader, r -> new 
ArrayList<>()).add(split);
         }
     }
 
-    private static int getSplitOwner(String tp, int numReaders) {
-        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    private void addPendingSplit(Collection<ElasticsearchSourceSplit> splits, 
int ownerReader) {
+        pendingSplit.computeIfAbsent(ownerReader, r -> new 
ArrayList<>()).addAll(splits);
+    }
+
+    private static int getSplitOwner(int assignCount, int numReaders) {
+        return assignCount % numReaders;
     }
 
     private void assignSplit(Collection<Integer> readers) {
@@ -177,7 +189,7 @@ public class ElasticsearchSourceSplitEnumerator
     @Override
     public void addSplitsBack(List<ElasticsearchSourceSplit> splits, int 
subtaskId) {
         if (!splits.isEmpty()) {
-            addPendingSplit(splits);
+            addPendingSplit(splits, subtaskId);
             assignSplit(Collections.singletonList(subtaskId));
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..5f17658ecd
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumeratorTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ElasticsearchSourceSplitEnumeratorTest {
+
+    @Test
+    public void shouldBalanceSplitsEvenlyAcrossReaders() throws Exception {
+        TestingContext context = new TestingContext(4);
+        ElasticsearchSourceSplitEnumerator enumerator =
+                new ElasticsearchSourceSplitEnumerator(context, null, 
Collections.emptyList());
+
+        Method addPendingSplit =
+                ElasticsearchSourceSplitEnumerator.class.getDeclaredMethod(
+                        "addPendingSplit", java.util.Collection.class);
+        addPendingSplit.setAccessible(true);
+        addPendingSplit.invoke(enumerator, buildSplits(10));
+
+        ElasticsearchSourceState state = enumerator.snapshotState(1L);
+        Map<Integer, List<ElasticsearchSourceSplit>> pendingSplits = 
state.getPendingSplit();
+
+        Assertions.assertEquals(4, pendingSplits.size());
+        Assertions.assertEquals(3, pendingSplits.get(0).size());
+        Assertions.assertEquals(3, pendingSplits.get(1).size());
+        Assertions.assertEquals(2, pendingSplits.get(2).size());
+        Assertions.assertEquals(2, pendingSplits.get(3).size());
+    }
+
+    @Test
+    public void shouldReassignReturnedSplitsToOriginalReader() throws 
Exception {
+        TestingContext context = new TestingContext(3);
+        ElasticsearchSourceSplitEnumerator enumerator =
+                new ElasticsearchSourceSplitEnumerator(context, null, 
Collections.emptyList());
+
+        enumerator.addSplitsBack(buildSplits(3), 2);
+
+        Assertions.assertEquals(0, context.getAssignedSplitCount(0));
+        Assertions.assertEquals(0, context.getAssignedSplitCount(1));
+        Assertions.assertEquals(3, context.getAssignedSplitCount(2));
+
+        ElasticsearchSourceState state = enumerator.snapshotState(1L);
+        Assertions.assertTrue(state.getPendingSplit().isEmpty());
+    }
+
+    private List<ElasticsearchSourceSplit> buildSplits(int size) {
+        List<ElasticsearchSourceSplit> splits = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            splits.add(new ElasticsearchSourceSplit("split-" + i, null));
+        }
+        return splits;
+    }
+
+    private static class TestingContext
+            implements SourceSplitEnumerator.Context<ElasticsearchSourceSplit> 
{
+        private final int parallelism;
+        private final Set<Integer> readers;
+        private final Map<Integer, List<ElasticsearchSourceSplit>> assignments 
= new HashMap<>();
+
+        private TestingContext(int parallelism) {
+            this.parallelism = parallelism;
+            this.readers = new LinkedHashSet<>();
+            for (int i = 0; i < parallelism; i++) {
+                readers.add(i);
+            }
+        }
+
+        @Override
+        public int currentParallelism() {
+            return parallelism;
+        }
+
+        @Override
+        public Set<Integer> registeredReaders() {
+            return readers;
+        }
+
+        @Override
+        public void assignSplit(int subtaskId, List<ElasticsearchSourceSplit> 
splits) {
+            assignments.computeIfAbsent(subtaskId, ignored -> new 
ArrayList<>()).addAll(splits);
+        }
+
+        @Override
+        public void signalNoMoreSplits(int subtask) {}
+
+        @Override
+        public void sendEventToSourceReader(int subtaskId, SourceEvent event) 
{}
+
+        @Override
+        public MetricsContext getMetricsContext() {
+            return null;
+        }
+
+        @Override
+        public EventListener getEventListener() {
+            return null;
+        }
+
+        private int getAssignedSplitCount(int subtaskId) {
+            return assignments.getOrDefault(subtaskId, 
Collections.emptyList()).size();
+        }
+    }
+}

Reply via email to