This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e4052e95c9 [Improve][Connectors-v2] Add support for re-signaling
NoMoreSplitsEvent after reader re-registration (#10208)
e4052e95c9 is described below
commit e4052e95c9500443da2b48532914f51351bca36d
Author: Adam Wang <[email protected]>
AuthorDate: Fri Feb 6 19:06:09 2026 +0800
[Improve][Connectors-v2] Add support for re-signaling NoMoreSplitsEvent
after reader re-registration (#10208)
Co-authored-by: wangxiaogang <[email protected]>
---
.../jdbc/source/JdbcSourceSplitEnumeratorTest.java | 214 +++++++++++++++++++++
.../server/task/SourceSplitEnumeratorTask.java | 9 +-
.../context/SeaTunnelSplitEnumeratorContext.java | 8 +
.../server/task/SourceSplitEnumeratorTaskTest.java | 58 ++++++
.../translation/flink/source/FlinkSource.java | 14 +-
.../flink/source/FlinkSourceEnumerator.java | 12 +-
.../source/FlinkSourceSplitEnumeratorContext.java | 12 ++
.../flink/source/FlinkSourceEnumeratorTest.java | 62 ++++++
8 files changed, 383 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..4e90b41e70
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumeratorTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class JdbcSourceSplitEnumeratorTest {
+
+ @Test
+ void testRunSignalsNoMoreSplitsOnce() throws Exception {
+ int parallelism = 1;
+ TablePath tablePath = TablePath.of("db", "schema", "table");
+
+ Map<TablePath, JdbcSourceTable> tables = new HashMap<>();
+ tables.put(tablePath, createJdbcSourceTable(tablePath));
+
+ List<Integer> assignTargets = new ArrayList<>();
+ Set<Integer> noMoreSplitsReaders = new HashSet<>();
+ AtomicInteger noMoreSplitsCallCount = new AtomicInteger();
+
+ SourceSplitEnumerator.Context<JdbcSourceSplit> context =
+ new SourceSplitEnumerator.Context<JdbcSourceSplit>() {
+ @Override
+ public int currentParallelism() {
+ return parallelism;
+ }
+
+ @Override
+ public Set<Integer> registeredReaders() {
+ return Collections.singleton(0);
+ }
+
+ @Override
+ public void assignSplit(int subtaskId,
List<JdbcSourceSplit> splits) {
+ assignTargets.add(subtaskId);
+ }
+
+ @Override
+ public void signalNoMoreSplits(int subtask) {
+ noMoreSplitsCallCount.incrementAndGet();
+ noMoreSplitsReaders.add(subtask);
+ }
+
+ @Override
+ public void sendEventToSourceReader(int subtaskId,
SourceEvent event) {}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return null;
+ }
+
+ @Override
+ public EventListener getEventListener() {
+ return null;
+ }
+ };
+
+ JdbcSourceConfig sourceConfig =
+ JdbcSourceConfig.builder()
+ .jdbcConnectionConfig(
+ JdbcConnectionConfig.builder()
+ .url("jdbc:generic://localhost:0/test")
+ .driverName("org.example.Driver")
+ .build())
+ .build();
+
+ JdbcSourceSplitEnumerator enumerator =
+ new JdbcSourceSplitEnumerator(context, sourceConfig, tables,
null);
+
+ enumerator.open();
+ enumerator.run();
+
+ Assertions.assertEquals(Collections.singletonList(0), assignTargets);
+ Assertions.assertEquals(Collections.singleton(0), noMoreSplitsReaders);
+ Assertions.assertEquals(1, noMoreSplitsCallCount.get());
+
+ // NoMoreSplitsEvent is only sent once at the end of run().
+ enumerator.addSplitsBack(Collections.emptyList(), 0);
+ enumerator.registerReader(0);
+
+ Assertions.assertEquals(1, noMoreSplitsCallCount.get());
+ }
+
+ @Test
+ void
testRunSignalsNoMoreSplitsForAllRegisteredReadersWithHighParallelism() throws
Exception {
+ int parallelism = 8;
+
+ Set<Integer> registeredReaders = new HashSet<>();
+ for (int i = 0; i < parallelism; i++) {
+ registeredReaders.add(i);
+ }
+
+ Map<TablePath, JdbcSourceTable> tables = new HashMap<>();
+ for (int i = 0; i < 3; i++) {
+ TablePath tablePath = TablePath.of("db", "schema", "table_" + i);
+ tables.put(tablePath, createJdbcSourceTable(tablePath));
+ }
+
+ Map<String, Integer> assignedSplitOwners = new HashMap<>();
+ Set<Integer> noMoreSplitsReaders = ConcurrentHashMap.newKeySet();
+ AtomicInteger noMoreSplitsCallCount = new AtomicInteger();
+
+ SourceSplitEnumerator.Context<JdbcSourceSplit> context =
+ new SourceSplitEnumerator.Context<JdbcSourceSplit>() {
+ @Override
+ public int currentParallelism() {
+ return parallelism;
+ }
+
+ @Override
+ public Set<Integer> registeredReaders() {
+ return new HashSet<>(registeredReaders);
+ }
+
+ @Override
+ public void assignSplit(int subtaskId,
List<JdbcSourceSplit> splits) {
+ for (JdbcSourceSplit split : splits) {
+ assignedSplitOwners.put(split.splitId(),
subtaskId);
+ }
+ }
+
+ @Override
+ public void signalNoMoreSplits(int subtask) {
+ noMoreSplitsCallCount.incrementAndGet();
+ noMoreSplitsReaders.add(subtask);
+ }
+
+ @Override
+ public void sendEventToSourceReader(int subtaskId,
SourceEvent event) {}
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return null;
+ }
+
+ @Override
+ public EventListener getEventListener() {
+ return null;
+ }
+ };
+
+ JdbcSourceConfig sourceConfig =
+ JdbcSourceConfig.builder()
+ .jdbcConnectionConfig(
+ JdbcConnectionConfig.builder()
+ .url("jdbc:generic://localhost:0/test")
+ .driverName("org.example.Driver")
+ .build())
+ .build();
+
+ JdbcSourceSplitEnumerator enumerator =
+ new JdbcSourceSplitEnumerator(context, sourceConfig, tables,
null);
+
+ enumerator.open();
+ enumerator.run();
+
+ Assertions.assertEquals(tables.size(), assignedSplitOwners.size());
+ assignedSplitOwners.forEach(
+ (splitId, owner) -> {
+ int expectedOwner = (splitId.hashCode() &
Integer.MAX_VALUE) % parallelism;
+ Assertions.assertEquals(expectedOwner, owner);
+ });
+
+ Assertions.assertEquals(registeredReaders, noMoreSplitsReaders);
+ Assertions.assertEquals(parallelism, noMoreSplitsCallCount.get());
+ Assertions.assertEquals(0, enumerator.currentUnassignedSplitSize());
+ }
+
+ private JdbcSourceTable createJdbcSourceTable(TablePath tablePath) {
+ TableIdentifier tableId = TableIdentifier.of("default", tablePath);
+ TableSchema tableSchema =
TableSchema.builder().columns(Collections.emptyList()).build();
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ tableId, tableSchema, Collections.emptyMap(),
Collections.emptyList(), "");
+ return
JdbcSourceTable.builder().tablePath(tablePath).catalogTable(catalogTable).build();
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 2cfd76c549..b22ca022d6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -223,9 +223,16 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
log.info("received reader register, readerID: " + readerId);
SourceSplitEnumerator<SplitT, Serializable> enumerator =
getEnumerator();
+ int readerIndex = readerId.getTaskIndex();
this.addTaskMemberMapping(readerId, memberAddr);
synchronized (this) {
- enumerator.registerReader(readerId.getTaskIndex());
+ enumerator.registerReader(readerIndex);
+ if (enumeratorContext.hasNoMoreSplitsSignaled(readerIndex)) {
+ log.info(
+ "Reader [{}] re-registered after failover.
Re-signaling NoMoreSplitsEvent.",
+ readerIndex);
+ enumeratorContext.signalNoMoreSplits(readerIndex);
+ }
}
int taskSize = taskMemberMapping.size();
if (maxReaderSize == taskSize) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index 7b587283d5..b4e3de60fb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -31,6 +31,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
@@ -46,6 +47,8 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends
SourceSplit>
private final MetricsContext metricsContext;
private final EventListener eventListener;
+ private final Set<Integer> noMoreSplitsSignaledReaders =
ConcurrentHashMap.newKeySet();
+
public SeaTunnelSplitEnumeratorContext(
int parallelism,
SourceSplitEnumeratorTask<SplitT> task,
@@ -88,6 +91,7 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends
SourceSplit>
@Override
public void signalNoMoreSplits(int subtaskIndex) {
+ noMoreSplitsSignaledReaders.add(subtaskIndex);
List<byte[]> emptySplits = Collections.emptyList();
task.getExecutionContext()
.sendToMember(
@@ -109,4 +113,8 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends
SourceSplit>
public EventListener getEventListener() {
return eventListener;
}
+
+ public boolean hasNoMoreSplitsSignaled(int subtaskIndex) {
+ return noMoreSplitsSignaledReaders.contains(subtaskIndex);
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTaskTest.java
index 7beeedba53..b74c8d1dba 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTaskTest.java
@@ -25,6 +25,7 @@ import
org.apache.seatunnel.engine.server.TaskExecutionService;
import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import
org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -37,6 +38,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
public class SourceSplitEnumeratorTaskTest {
@@ -102,4 +104,60 @@ public class SourceSplitEnumeratorTaskTest {
Assertions.assertTrue(openTime.get() < registerReaderTime.get());
}
+
+ @Test
+ void testResignalNoMoreSplitsAfterReaderReregister() throws Exception {
+ SeaTunnelSource source = Mockito.mock(SeaTunnelSource.class);
+ SourceSplitEnumerator enumerator =
Mockito.mock(SourceSplitEnumerator.class);
+
+ AtomicReference<SeaTunnelSplitEnumeratorContext> enumeratorContextRef =
+ new AtomicReference<>();
+ Mockito.when(source.createEnumerator(Mockito.any()))
+ .thenAnswer(
+ invocation -> {
+ enumeratorContextRef.set(
+ (SeaTunnelSplitEnumeratorContext)
invocation.getArgument(0));
+ return enumerator;
+ });
+
+ SourceAction action =
+ new SourceAction<>(1, "fake", source, new HashSet<>(),
Collections.emptySet());
+ SourceSplitEnumeratorTask enumeratorTask =
+ new SourceSplitEnumeratorTask<>(
+ 1, new TaskLocation(new TaskGroupLocation(1, 1, 1), 1,
1), action);
+
+ TaskExecutionContext context =
Mockito.mock(TaskExecutionContext.class);
+ InvocationFuture future = Mockito.mock(InvocationFuture.class);
+
Mockito.when(context.getOrCreateMetricsContext(Mockito.any())).thenReturn(null);
+ Mockito.when(context.sendToMaster(Mockito.any())).thenReturn(future);
+ Mockito.when(context.sendToMember(Mockito.any(),
Mockito.any())).thenReturn(future);
+ Mockito.when(future.join()).thenReturn(null);
+ TaskExecutionService taskExecutionService =
Mockito.mock(TaskExecutionService.class);
+
Mockito.when(context.getTaskExecutionService()).thenReturn(taskExecutionService);
+
+ enumeratorTask.setTaskExecutionContext(context);
+ enumeratorTask.init();
+ enumeratorTask.restoreState(new ArrayList<>());
+
+ TaskLocation readerLocation = new TaskLocation(new
TaskGroupLocation(1, 1, 1), 1, 1);
+ Address address = Address.createUnresolvedAddress("localhost", 5701);
+
+ // Initial register
+ enumeratorTask.receivedReader(readerLocation, address);
+
+ SeaTunnelSplitEnumeratorContext enumeratorContext =
enumeratorContextRef.get();
+ Assertions.assertNotNull(enumeratorContext);
+
+ Mockito.clearInvocations(context);
+
+ // Simulate that NoMoreSplitsEvent has been signaled once.
+ enumeratorContext.signalNoMoreSplits(readerLocation.getTaskIndex());
+ Assertions.assertTrue(
+
enumeratorContext.hasNoMoreSplitsSignaled(readerLocation.getTaskIndex()));
+
+ // Reader re-registers after failover, framework should re-signal.
+ enumeratorTask.receivedReader(readerLocation, address);
+
+ Mockito.verify(context, Mockito.times(2)).sendToMember(Mockito.any(),
Mockito.any());
+ }
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
index 12adc8b8f1..98a207f2c0 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
@@ -38,6 +38,8 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.Serializable;
import java.sql.DriverManager;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
/**
* The source implementation of {@link Source}, used for proxy all {@link
SeaTunnelSource} in flink.
@@ -91,21 +93,25 @@ public class FlinkSource<SplitT extends SourceSplit,
EnumStateT extends Serializ
@Override
public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> createEnumerator(
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) throws
Exception {
+ Set<Integer> noMoreSplitsSignaledReaders =
ConcurrentHashMap.newKeySet();
SourceSplitEnumerator.Context<SplitT> context =
- new FlinkSourceSplitEnumeratorContext<>(enumContext);
+ new FlinkSourceSplitEnumeratorContext<>(
+ enumContext, noMoreSplitsSignaledReaders::add);
SourceSplitEnumerator<SplitT, EnumStateT> enumerator =
source.createEnumerator(context);
- return new FlinkSourceEnumerator<>(enumerator, enumContext);
+ return new FlinkSourceEnumerator<>(enumerator, enumContext,
noMoreSplitsSignaledReaders);
}
@Override
public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> restoreEnumerator(
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext,
EnumStateT checkpoint)
throws Exception {
+ Set<Integer> noMoreSplitsSignaledReaders =
ConcurrentHashMap.newKeySet();
FlinkSourceSplitEnumeratorContext<SplitT> context =
- new FlinkSourceSplitEnumeratorContext<>(enumContext);
+ new FlinkSourceSplitEnumeratorContext<>(
+ enumContext, noMoreSplitsSignaledReaders::add);
SourceSplitEnumerator<SplitT, EnumStateT> enumerator =
source.restoreEnumerator(context, checkpoint);
- return new FlinkSourceEnumerator<>(enumerator, enumContext);
+ return new FlinkSourceEnumerator<>(enumerator, enumContext,
noMoreSplitsSignaledReaders);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
index c73cd863da..bd3dba9c28 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -54,6 +55,7 @@ public class FlinkSourceEnumerator<SplitT extends
SourceSplit, EnumStateT>
private final SourceSplitEnumerator.Context<SplitT> context;
private final int parallelism;
+ private final Set<Integer> noMoreSplitsSignaledReaders;
private final Object lock = new Object();
@@ -63,11 +65,13 @@ public class FlinkSourceEnumerator<SplitT extends
SourceSplit, EnumStateT>
public FlinkSourceEnumerator(
SourceSplitEnumerator<SplitT, EnumStateT> enumerator,
- SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
+ SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext,
+ Set<Integer> noMoreSplitsSignaledReaders) {
this.sourceSplitEnumerator = enumerator;
this.enumeratorContext = enumContext;
this.context = new
FlinkSourceSplitEnumeratorContext<>(enumeratorContext);
this.parallelism = enumeratorContext.currentParallelism();
+ this.noMoreSplitsSignaledReaders = noMoreSplitsSignaledReaders;
}
@Override
@@ -95,6 +99,12 @@ public class FlinkSourceEnumerator<SplitT extends
SourceSplit, EnumStateT>
synchronized (lock) {
sourceSplitEnumerator.registerReader(subtaskId);
currentRegisterReaders++;
+ if (noMoreSplitsSignaledReaders.contains(subtaskId)) {
+ LOGGER.info(
+ "Reader [{}] re-registered after failover.
Re-signaling NoMoreSplitsEvent.",
+ subtaskId);
+ enumeratorContext.signalNoMoreSplits(subtaskId);
+ }
}
if (currentRegisterReaders == parallelism && !isRun.getAndSet(true)) {
try {
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java
index f03d1e7e69..ee1c885558 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.function.IntConsumer;
/**
* The implementation of {@link
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context} for
@@ -50,11 +51,19 @@ public class FlinkSourceSplitEnumeratorContext<SplitT
extends SourceSplit>
private final SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext;
protected final EventListener eventListener;
+ private final IntConsumer noMoreSplitsSignalListener;
public FlinkSourceSplitEnumeratorContext(
SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
+ this(enumContext, null);
+ }
+
+ public FlinkSourceSplitEnumeratorContext(
+ SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext,
+ IntConsumer noMoreSplitsSignalListener) {
this.enumContext = enumContext;
this.eventListener = new
DefaultEventProcessor(getFlinkJobId(enumContext));
+ this.noMoreSplitsSignalListener = noMoreSplitsSignalListener;
}
@Override
@@ -77,6 +86,9 @@ public class FlinkSourceSplitEnumeratorContext<SplitT extends
SourceSplit>
@Override
public void signalNoMoreSplits(int subtask) {
+ if (noMoreSplitsSignalListener != null) {
+ noMoreSplitsSignalListener.accept(subtask);
+ }
enumContext.signalNoMoreSplits(subtask);
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumeratorTest.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumeratorTest.java
new file mode 100644
index 0000000000..949a66f838
--- /dev/null
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumeratorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+class FlinkSourceEnumeratorTest {
+
+ private static final class DummySplit implements SourceSplit {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String splitId() {
+ return "dummy";
+ }
+ }
+
+ @Test
+ void testResignalNoMoreSplitsAfterReaderReregister() {
+ SourceSplitEnumerator<DummySplit, Serializable> sourceSplitEnumerator =
+ Mockito.mock(SourceSplitEnumerator.class);
+ SplitEnumeratorContext<SplitWrapper<DummySplit>> enumeratorContext =
+ Mockito.mock(SplitEnumeratorContext.class);
+ Mockito.when(enumeratorContext.currentParallelism()).thenReturn(2);
+
+ Set<Integer> noMoreSplitsSignaledReaders =
ConcurrentHashMap.newKeySet();
+ noMoreSplitsSignaledReaders.add(0);
+
+ FlinkSourceEnumerator<DummySplit, Serializable> enumerator =
+ new FlinkSourceEnumerator<>(
+ sourceSplitEnumerator, enumeratorContext,
noMoreSplitsSignaledReaders);
+
+ enumerator.addReader(0);
+
+ Mockito.verify(enumeratorContext).signalNoMoreSplits(0);
+ }
+}