This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e4052e95c9 [Improve][Connectors-v2] Add support for re-signaling 
NoMoreSplitsEvent after reader re-registration (#10208)
e4052e95c9 is described below

commit e4052e95c9500443da2b48532914f51351bca36d
Author: Adam Wang <[email protected]>
AuthorDate: Fri Feb 6 19:06:09 2026 +0800

    [Improve][Connectors-v2] Add support for re-signaling NoMoreSplitsEvent 
after reader re-registration (#10208)
    
    Co-authored-by: wangxiaogang <[email protected]>
---
 .../jdbc/source/JdbcSourceSplitEnumeratorTest.java | 214 +++++++++++++++++++++
 .../server/task/SourceSplitEnumeratorTask.java     |   9 +-
 .../context/SeaTunnelSplitEnumeratorContext.java   |   8 +
 .../server/task/SourceSplitEnumeratorTaskTest.java |  58 ++++++
 .../translation/flink/source/FlinkSource.java      |  14 +-
 .../flink/source/FlinkSourceEnumerator.java        |  12 +-
 .../source/FlinkSourceSplitEnumeratorContext.java  |  12 ++
 .../flink/source/FlinkSourceEnumeratorTest.java    |  62 ++++++
 8 files changed, 383 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..4e90b41e70
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumeratorTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class JdbcSourceSplitEnumeratorTest {
+
+    @Test
+    void testRunSignalsNoMoreSplitsOnce() throws Exception {
+        int parallelism = 1;
+        TablePath tablePath = TablePath.of("db", "schema", "table");
+
+        Map<TablePath, JdbcSourceTable> tables = new HashMap<>();
+        tables.put(tablePath, createJdbcSourceTable(tablePath));
+
+        List<Integer> assignTargets = new ArrayList<>();
+        Set<Integer> noMoreSplitsReaders = new HashSet<>();
+        AtomicInteger noMoreSplitsCallCount = new AtomicInteger();
+
+        SourceSplitEnumerator.Context<JdbcSourceSplit> context =
+                new SourceSplitEnumerator.Context<JdbcSourceSplit>() {
+                    @Override
+                    public int currentParallelism() {
+                        return parallelism;
+                    }
+
+                    @Override
+                    public Set<Integer> registeredReaders() {
+                        return Collections.singleton(0);
+                    }
+
+                    @Override
+                    public void assignSplit(int subtaskId, 
List<JdbcSourceSplit> splits) {
+                        assignTargets.add(subtaskId);
+                    }
+
+                    @Override
+                    public void signalNoMoreSplits(int subtask) {
+                        noMoreSplitsCallCount.incrementAndGet();
+                        noMoreSplitsReaders.add(subtask);
+                    }
+
+                    @Override
+                    public void sendEventToSourceReader(int subtaskId, 
SourceEvent event) {}
+
+                    @Override
+                    public MetricsContext getMetricsContext() {
+                        return null;
+                    }
+
+                    @Override
+                    public EventListener getEventListener() {
+                        return null;
+                    }
+                };
+
+        JdbcSourceConfig sourceConfig =
+                JdbcSourceConfig.builder()
+                        .jdbcConnectionConfig(
+                                JdbcConnectionConfig.builder()
+                                        .url("jdbc:generic://localhost:0/test")
+                                        .driverName("org.example.Driver")
+                                        .build())
+                        .build();
+
+        JdbcSourceSplitEnumerator enumerator =
+                new JdbcSourceSplitEnumerator(context, sourceConfig, tables, 
null);
+
+        enumerator.open();
+        enumerator.run();
+
+        Assertions.assertEquals(Collections.singletonList(0), assignTargets);
+        Assertions.assertEquals(Collections.singleton(0), noMoreSplitsReaders);
+        Assertions.assertEquals(1, noMoreSplitsCallCount.get());
+
+        // NoMoreSplitsEvent is only sent once at the end of run().
+        enumerator.addSplitsBack(Collections.emptyList(), 0);
+        enumerator.registerReader(0);
+
+        Assertions.assertEquals(1, noMoreSplitsCallCount.get());
+    }
+
+    @Test
+    void 
testRunSignalsNoMoreSplitsForAllRegisteredReadersWithHighParallelism() throws 
Exception {
+        int parallelism = 8;
+
+        Set<Integer> registeredReaders = new HashSet<>();
+        for (int i = 0; i < parallelism; i++) {
+            registeredReaders.add(i);
+        }
+
+        Map<TablePath, JdbcSourceTable> tables = new HashMap<>();
+        for (int i = 0; i < 3; i++) {
+            TablePath tablePath = TablePath.of("db", "schema", "table_" + i);
+            tables.put(tablePath, createJdbcSourceTable(tablePath));
+        }
+
+        Map<String, Integer> assignedSplitOwners = new HashMap<>();
+        Set<Integer> noMoreSplitsReaders = ConcurrentHashMap.newKeySet();
+        AtomicInteger noMoreSplitsCallCount = new AtomicInteger();
+
+        SourceSplitEnumerator.Context<JdbcSourceSplit> context =
+                new SourceSplitEnumerator.Context<JdbcSourceSplit>() {
+                    @Override
+                    public int currentParallelism() {
+                        return parallelism;
+                    }
+
+                    @Override
+                    public Set<Integer> registeredReaders() {
+                        return new HashSet<>(registeredReaders);
+                    }
+
+                    @Override
+                    public void assignSplit(int subtaskId, 
List<JdbcSourceSplit> splits) {
+                        for (JdbcSourceSplit split : splits) {
+                            assignedSplitOwners.put(split.splitId(), 
subtaskId);
+                        }
+                    }
+
+                    @Override
+                    public void signalNoMoreSplits(int subtask) {
+                        noMoreSplitsCallCount.incrementAndGet();
+                        noMoreSplitsReaders.add(subtask);
+                    }
+
+                    @Override
+                    public void sendEventToSourceReader(int subtaskId, 
SourceEvent event) {}
+
+                    @Override
+                    public MetricsContext getMetricsContext() {
+                        return null;
+                    }
+
+                    @Override
+                    public EventListener getEventListener() {
+                        return null;
+                    }
+                };
+
+        JdbcSourceConfig sourceConfig =
+                JdbcSourceConfig.builder()
+                        .jdbcConnectionConfig(
+                                JdbcConnectionConfig.builder()
+                                        .url("jdbc:generic://localhost:0/test")
+                                        .driverName("org.example.Driver")
+                                        .build())
+                        .build();
+
+        JdbcSourceSplitEnumerator enumerator =
+                new JdbcSourceSplitEnumerator(context, sourceConfig, tables, 
null);
+
+        enumerator.open();
+        enumerator.run();
+
+        Assertions.assertEquals(tables.size(), assignedSplitOwners.size());
+        assignedSplitOwners.forEach(
+                (splitId, owner) -> {
+                    int expectedOwner = (splitId.hashCode() & 
Integer.MAX_VALUE) % parallelism;
+                    Assertions.assertEquals(expectedOwner, owner);
+                });
+
+        Assertions.assertEquals(registeredReaders, noMoreSplitsReaders);
+        Assertions.assertEquals(parallelism, noMoreSplitsCallCount.get());
+        Assertions.assertEquals(0, enumerator.currentUnassignedSplitSize());
+    }
+
+    private JdbcSourceTable createJdbcSourceTable(TablePath tablePath) {
+        TableIdentifier tableId = TableIdentifier.of("default", tablePath);
+        TableSchema tableSchema = 
TableSchema.builder().columns(Collections.emptyList()).build();
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        tableId, tableSchema, Collections.emptyMap(), 
Collections.emptyList(), "");
+        return 
JdbcSourceTable.builder().tablePath(tablePath).catalogTable(catalogTable).build();
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 2cfd76c549..b22ca022d6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -223,9 +223,16 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         log.info("received reader register, readerID: " + readerId);
 
         SourceSplitEnumerator<SplitT, Serializable> enumerator = 
getEnumerator();
+        int readerIndex = readerId.getTaskIndex();
         this.addTaskMemberMapping(readerId, memberAddr);
         synchronized (this) {
-            enumerator.registerReader(readerId.getTaskIndex());
+            enumerator.registerReader(readerIndex);
+            if (enumeratorContext.hasNoMoreSplitsSignaled(readerIndex)) {
+                log.info(
+                        "Reader [{}] re-registered after failover. 
Re-signaling NoMoreSplitsEvent.",
+                        readerIndex);
+                enumeratorContext.signalNoMoreSplits(readerIndex);
+            }
         }
         int taskSize = taskMemberMapping.size();
         if (maxReaderSize == taskSize) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index 7b587283d5..b4e3de60fb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -31,6 +31,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
@@ -46,6 +47,8 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends 
SourceSplit>
     private final MetricsContext metricsContext;
     private final EventListener eventListener;
 
+    private final Set<Integer> noMoreSplitsSignaledReaders = 
ConcurrentHashMap.newKeySet();
+
     public SeaTunnelSplitEnumeratorContext(
             int parallelism,
             SourceSplitEnumeratorTask<SplitT> task,
@@ -88,6 +91,7 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends 
SourceSplit>
 
     @Override
     public void signalNoMoreSplits(int subtaskIndex) {
+        noMoreSplitsSignaledReaders.add(subtaskIndex);
         List<byte[]> emptySplits = Collections.emptyList();
         task.getExecutionContext()
                 .sendToMember(
@@ -109,4 +113,8 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends 
SourceSplit>
     public EventListener getEventListener() {
         return eventListener;
     }
+
+    public boolean hasNoMoreSplitsSignaled(int subtaskIndex) {
+        return noMoreSplitsSignaledReaders.contains(subtaskIndex);
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTaskTest.java
index 7beeedba53..b74c8d1dba 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTaskTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTaskTest.java
@@ -25,6 +25,7 @@ import 
org.apache.seatunnel.engine.server.TaskExecutionService;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import 
org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -37,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class SourceSplitEnumeratorTaskTest {
 
@@ -102,4 +104,60 @@ public class SourceSplitEnumeratorTaskTest {
 
         Assertions.assertTrue(openTime.get() < registerReaderTime.get());
     }
+
+    @Test
+    void testResignalNoMoreSplitsAfterReaderReregister() throws Exception {
+        SeaTunnelSource source = Mockito.mock(SeaTunnelSource.class);
+        SourceSplitEnumerator enumerator = 
Mockito.mock(SourceSplitEnumerator.class);
+
+        AtomicReference<SeaTunnelSplitEnumeratorContext> enumeratorContextRef =
+                new AtomicReference<>();
+        Mockito.when(source.createEnumerator(Mockito.any()))
+                .thenAnswer(
+                        invocation -> {
+                            enumeratorContextRef.set(
+                                    (SeaTunnelSplitEnumeratorContext) 
invocation.getArgument(0));
+                            return enumerator;
+                        });
+
+        SourceAction action =
+                new SourceAction<>(1, "fake", source, new HashSet<>(), 
Collections.emptySet());
+        SourceSplitEnumeratorTask enumeratorTask =
+                new SourceSplitEnumeratorTask<>(
+                        1, new TaskLocation(new TaskGroupLocation(1, 1, 1), 1, 
1), action);
+
+        TaskExecutionContext context = 
Mockito.mock(TaskExecutionContext.class);
+        InvocationFuture future = Mockito.mock(InvocationFuture.class);
+        
Mockito.when(context.getOrCreateMetricsContext(Mockito.any())).thenReturn(null);
+        Mockito.when(context.sendToMaster(Mockito.any())).thenReturn(future);
+        Mockito.when(context.sendToMember(Mockito.any(), 
Mockito.any())).thenReturn(future);
+        Mockito.when(future.join()).thenReturn(null);
+        TaskExecutionService taskExecutionService = 
Mockito.mock(TaskExecutionService.class);
+        
Mockito.when(context.getTaskExecutionService()).thenReturn(taskExecutionService);
+
+        enumeratorTask.setTaskExecutionContext(context);
+        enumeratorTask.init();
+        enumeratorTask.restoreState(new ArrayList<>());
+
+        TaskLocation readerLocation = new TaskLocation(new 
TaskGroupLocation(1, 1, 1), 1, 1);
+        Address address = Address.createUnresolvedAddress("localhost", 5701);
+
+        // Initial register
+        enumeratorTask.receivedReader(readerLocation, address);
+
+        SeaTunnelSplitEnumeratorContext enumeratorContext = 
enumeratorContextRef.get();
+        Assertions.assertNotNull(enumeratorContext);
+
+        Mockito.clearInvocations(context);
+
+        // Simulate that NoMoreSplitsEvent has been signaled once.
+        enumeratorContext.signalNoMoreSplits(readerLocation.getTaskIndex());
+        Assertions.assertTrue(
+                
enumeratorContext.hasNoMoreSplitsSignaled(readerLocation.getTaskIndex()));
+
+        // Reader re-registers after failover, framework should re-signal.
+        enumeratorTask.receivedReader(readerLocation, address);
+
+        Mockito.verify(context, Mockito.times(2)).sendToMember(Mockito.any(), 
Mockito.any());
+    }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
index 12adc8b8f1..98a207f2c0 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
@@ -38,6 +38,8 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.Serializable;
 import java.sql.DriverManager;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * The source implementation of {@link Source}, used for proxy all {@link 
SeaTunnelSource} in flink.
@@ -91,21 +93,25 @@ public class FlinkSource<SplitT extends SourceSplit, 
EnumStateT extends Serializ
     @Override
     public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> createEnumerator(
             SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) throws 
Exception {
+        Set<Integer> noMoreSplitsSignaledReaders = 
ConcurrentHashMap.newKeySet();
         SourceSplitEnumerator.Context<SplitT> context =
-                new FlinkSourceSplitEnumeratorContext<>(enumContext);
+                new FlinkSourceSplitEnumeratorContext<>(
+                        enumContext, noMoreSplitsSignaledReaders::add);
         SourceSplitEnumerator<SplitT, EnumStateT> enumerator = 
source.createEnumerator(context);
-        return new FlinkSourceEnumerator<>(enumerator, enumContext);
+        return new FlinkSourceEnumerator<>(enumerator, enumContext, 
noMoreSplitsSignaledReaders);
     }
 
     @Override
     public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> restoreEnumerator(
             SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext, 
EnumStateT checkpoint)
             throws Exception {
+        Set<Integer> noMoreSplitsSignaledReaders = 
ConcurrentHashMap.newKeySet();
         FlinkSourceSplitEnumeratorContext<SplitT> context =
-                new FlinkSourceSplitEnumeratorContext<>(enumContext);
+                new FlinkSourceSplitEnumeratorContext<>(
+                        enumContext, noMoreSplitsSignaledReaders::add);
         SourceSplitEnumerator<SplitT, EnumStateT> enumerator =
                 source.restoreEnumerator(context, checkpoint);
-        return new FlinkSourceEnumerator<>(enumerator, enumContext);
+        return new FlinkSourceEnumerator<>(enumerator, enumContext, 
noMoreSplitsSignaledReaders);
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
index c73cd863da..bd3dba9c28 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -54,6 +55,7 @@ public class FlinkSourceEnumerator<SplitT extends 
SourceSplit, EnumStateT>
 
     private final SourceSplitEnumerator.Context<SplitT> context;
     private final int parallelism;
+    private final Set<Integer> noMoreSplitsSignaledReaders;
 
     private final Object lock = new Object();
 
@@ -63,11 +65,13 @@ public class FlinkSourceEnumerator<SplitT extends 
SourceSplit, EnumStateT>
 
     public FlinkSourceEnumerator(
             SourceSplitEnumerator<SplitT, EnumStateT> enumerator,
-            SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
+            SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext,
+            Set<Integer> noMoreSplitsSignaledReaders) {
         this.sourceSplitEnumerator = enumerator;
         this.enumeratorContext = enumContext;
         this.context = new 
FlinkSourceSplitEnumeratorContext<>(enumeratorContext);
         this.parallelism = enumeratorContext.currentParallelism();
+        this.noMoreSplitsSignaledReaders = noMoreSplitsSignaledReaders;
     }
 
     @Override
@@ -95,6 +99,12 @@ public class FlinkSourceEnumerator<SplitT extends 
SourceSplit, EnumStateT>
         synchronized (lock) {
             sourceSplitEnumerator.registerReader(subtaskId);
             currentRegisterReaders++;
+            if (noMoreSplitsSignaledReaders.contains(subtaskId)) {
+                LOGGER.info(
+                        "Reader [{}] re-registered after failover. 
Re-signaling NoMoreSplitsEvent.",
+                        subtaskId);
+                enumeratorContext.signalNoMoreSplits(subtaskId);
+            }
         }
         if (currentRegisterReaders == parallelism && !isRun.getAndSet(true)) {
             try {
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java
index f03d1e7e69..ee1c885558 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.IntConsumer;
 
 /**
  * The implementation of {@link 
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context} for
@@ -50,11 +51,19 @@ public class FlinkSourceSplitEnumeratorContext<SplitT 
extends SourceSplit>
 
     private final SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext;
     protected final EventListener eventListener;
+    private final IntConsumer noMoreSplitsSignalListener;
 
     public FlinkSourceSplitEnumeratorContext(
             SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
+        this(enumContext, null);
+    }
+
+    public FlinkSourceSplitEnumeratorContext(
+            SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext,
+            IntConsumer noMoreSplitsSignalListener) {
         this.enumContext = enumContext;
         this.eventListener = new 
DefaultEventProcessor(getFlinkJobId(enumContext));
+        this.noMoreSplitsSignalListener = noMoreSplitsSignalListener;
     }
 
     @Override
@@ -77,6 +86,9 @@ public class FlinkSourceSplitEnumeratorContext<SplitT extends 
SourceSplit>
 
     @Override
     public void signalNoMoreSplits(int subtask) {
+        if (noMoreSplitsSignalListener != null) {
+            noMoreSplitsSignalListener.accept(subtask);
+        }
         enumContext.signalNoMoreSplits(subtask);
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumeratorTest.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumeratorTest.java
new file mode 100644
index 0000000000..949a66f838
--- /dev/null
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumeratorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+class FlinkSourceEnumeratorTest {
+
+    private static final class DummySplit implements SourceSplit {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public String splitId() {
+            return "dummy";
+        }
+    }
+
+    @Test
+    void testResignalNoMoreSplitsAfterReaderReregister() {
+        SourceSplitEnumerator<DummySplit, Serializable> sourceSplitEnumerator =
+                Mockito.mock(SourceSplitEnumerator.class);
+        SplitEnumeratorContext<SplitWrapper<DummySplit>> enumeratorContext =
+                Mockito.mock(SplitEnumeratorContext.class);
+        Mockito.when(enumeratorContext.currentParallelism()).thenReturn(2);
+
+        Set<Integer> noMoreSplitsSignaledReaders = 
ConcurrentHashMap.newKeySet();
+        noMoreSplitsSignaledReaders.add(0);
+
+        FlinkSourceEnumerator<DummySplit, Serializable> enumerator =
+                new FlinkSourceEnumerator<>(
+                        sourceSplitEnumerator, enumeratorContext, 
noMoreSplitsSignaledReaders);
+
+        enumerator.addReader(0);
+
+        Mockito.verify(enumeratorContext).signalNoMoreSplits(0);
+    }
+}

Reply via email to