This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 275adcce13 [Fix][connector-cdc] Fix NPE when finishing snapshot split
due to null splitId (#10404)
275adcce13 is described below
commit 275adcce139605bf3340dcc5e807a734fbca4b66
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Feb 5 14:14:13 2026 +0800
[Fix][connector-cdc] Fix NPE when finishing snapshot split due to null
splitId (#10404)
Co-authored-by: zengyi <[email protected]>
---
.../reader/IncrementalSourceSplitReader.java | 103 +++++++++++--
.../cdc/base/source/split/ChangeEventRecords.java | 10 ++
.../reader/IncrementalSourceSplitReaderTest.java | 160 +++++++++++++++++++++
3 files changed, 262 insertions(+), 11 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
index 53f9736273..d6db68e8cc 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
@@ -37,10 +37,22 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
+import java.util.Set;
@Slf4j
+/**
+ * Split reader for incremental source (snapshot + incremental phase).
+ *
+ * <p><b>Thread safety:</b> This class is NOT thread-safe and is expected to
be used from a single
+ * thread. The {@link #fetch()} method should be called sequentially without
concurrent access. The
+ * {@link #close()} method should be called from the same thread or after all
fetch calls have
+ * completed.
+ *
+ * @param <C> The type of source configuration.
+ */
public class IncrementalSourceSplitReader<C extends SourceConfig>
implements SplitReader<SourceRecords, SourceSplitBase> {
private final Queue<SourceSplitBase> splits;
@@ -49,6 +61,7 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
private Fetcher<SourceRecords, SourceSplitBase> currentFetcher;
private String currentSplitId;
+ private String emittedFinishedSplitId;
private final DataSourceDialect<C> dataSourceDialect;
private final C sourceConfig;
private final SchemaChangeResolver schemaChangeResolver;
@@ -70,6 +83,9 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
checkSplitOrStartNext();
checkNeedStopBinlogReader();
+ if (hasEmittedCurrentSplitFinished()) {
+ return NoSplitRecords.INSTANCE;
+ }
Iterator<SourceRecords> dataIt = null;
try {
dataIt = currentFetcher.pollSplitRecords();
@@ -77,9 +93,27 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
log.warn("fetch data failed.", e);
throw new IOException(e);
}
- return dataIt == null
- ? finishedSnapshotSplit()
- : ChangeEventRecords.forRecords(currentSplitId, dataIt);
+ if (dataIt == null) {
+ return finishedSnapshotSplit();
+ }
+ if (currentSplitId == null) {
+ log.warn(
+ "Invalid state: currentSplitId is null when emitting
records. "
+ + "emittedFinishedSplitId={}, currentFetcher={},
isFinished={}",
+ emittedFinishedSplitId,
+ currentFetcher != null ?
currentFetcher.getClass().getSimpleName() : "null",
+ currentFetcher != null && currentFetcher.isFinished());
+ throw new IOException(
+ String.format(
+ "Invalid state: currentSplitId is null when
emitting records. "
+ + "emittedFinishedSplitId=%s,
currentFetcher=%s, isFinished=%s",
+ emittedFinishedSplitId,
+ currentFetcher != null
+ ? currentFetcher.getClass().getSimpleName()
+ : "null",
+ currentFetcher != null &&
currentFetcher.isFinished()));
+ }
+ return ChangeEventRecords.forRecords(currentSplitId, dataIt);
}
@Override
@@ -100,10 +134,14 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
@Override
public void close() throws Exception {
- if (currentFetcher != null) {
- log.info("Close current fetcher {}",
currentFetcher.getClass().getCanonicalName());
- currentFetcher.close();
+ try {
+ if (currentFetcher != null) {
+ log.info("Close current fetcher {}",
currentFetcher.getClass().getCanonicalName());
+ currentFetcher.close();
+ }
+ } finally {
currentSplitId = null;
+ emittedFinishedSplitId = null;
}
}
@@ -123,6 +161,7 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
throw new IOException("Cannot fetch from another split - no
split remaining.");
}
currentSplitId = nextSplit.splitId();
+ emittedFinishedSplitId = null;
if (nextSplit.isSnapshotSplit()) {
if (currentFetcher == null) {
@@ -152,10 +191,52 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
return currentFetcher == null || currentFetcher.isFinished();
}
- private ChangeEventRecords finishedSnapshotSplit() {
- final ChangeEventRecords finishedRecords =
- ChangeEventRecords.forFinishedSplit(currentSplitId);
- currentSplitId = null;
- return finishedRecords;
+ private boolean hasEmittedCurrentSplitFinished() {
+ return currentSplitId != null &&
currentSplitId.equals(emittedFinishedSplitId);
+ }
+
+ private RecordsWithSplitIds<SourceRecords> finishedSnapshotSplit() throws
IOException {
+ final String splitId = currentSplitId;
+ if (splitId == null) {
+ log.warn(
+ "Invalid state: currentSplitId is null when finishing
snapshot split. "
+ + "emittedFinishedSplitId={}, currentFetcher={},
isFinished={}",
+ emittedFinishedSplitId,
+ currentFetcher != null ?
currentFetcher.getClass().getSimpleName() : "null",
+ currentFetcher != null && currentFetcher.isFinished());
+ throw new IOException(
+ String.format(
+ "Invalid state: currentSplitId is null when
finishing snapshot split. "
+ + "emittedFinishedSplitId=%s,
currentFetcher=%s, isFinished=%s",
+ emittedFinishedSplitId,
+ currentFetcher != null
+ ? currentFetcher.getClass().getSimpleName()
+ : "null",
+ currentFetcher != null &&
currentFetcher.isFinished()));
+ }
+ if (splitId.equals(emittedFinishedSplitId)) {
+ return NoSplitRecords.INSTANCE;
+ }
+ emittedFinishedSplitId = splitId;
+ return ChangeEventRecords.forFinishedSplit(splitId);
+ }
+
+ private static final class NoSplitRecords implements
RecordsWithSplitIds<SourceRecords> {
+ private static final NoSplitRecords INSTANCE = new NoSplitRecords();
+
+ @Override
+ public String nextSplit() {
+ return null;
+ }
+
+ @Override
+ public SourceRecords nextRecordFromSplit() {
+ throw new IllegalStateException("No split assigned");
+ }
+
+ @Override
+ public Set<String> finishedSplits() {
+ return Collections.emptySet();
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java
index 44c80a5ec5..cd81883d11 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java
@@ -74,7 +74,17 @@ public final class ChangeEventRecords implements
RecordsWithSplitIds<SourceRecor
return new ChangeEventRecords(splitId, recordsForSplit,
Collections.emptySet());
}
+ /**
+ * Creates a {@link ChangeEventRecords} that only indicates a split is
finished.
+ *
+ * @param splitId the ID of the finished split, must not be null
+ * @return a new {@link ChangeEventRecords} instance
+ * @throws IllegalArgumentException if splitId is null
+ */
public static ChangeEventRecords forFinishedSplit(final String splitId) {
+ if (splitId == null) {
+ throw new IllegalArgumentException("splitId must not be null");
+ }
return new ChangeEventRecords(null, null,
Collections.singleton(splitId));
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java
new file mode 100644
index 0000000000..11b452fd3a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.reader;
+
+import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
+import org.apache.seatunnel.connectors.cdc.base.source.reader.external.Fetcher;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Collections;
+
+class IncrementalSourceSplitReaderTest {
+
+ @Test
+ void testFetchFinishedSnapshotSplitEmitsFinishedOnlyOnce() throws
Exception {
+ DataSourceDialect<SourceConfig> dialect =
Mockito.mock(DataSourceDialect.class);
+ SourceConfig config = Mockito.mock(SourceConfig.class);
+ SchemaChangeResolver resolver =
Mockito.mock(SchemaChangeResolver.class);
+
+ IncrementalSourceSplitReader<SourceConfig> reader =
+ new IncrementalSourceSplitReader<SourceConfig>(0, dialect,
config, resolver) {
+ @Override
+ protected void checkSplitOrStartNext() {}
+ };
+
+ @SuppressWarnings("unchecked")
+ Fetcher<SourceRecords, SourceSplitBase> fetcher =
Mockito.mock(Fetcher.class);
+ Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);
+
+ setField(reader, "currentFetcher", fetcher);
+ setField(reader, "currentSplitId", "split-1");
+
+ RecordsWithSplitIds<SourceRecords> first = reader.fetch();
+ RecordsWithSplitIds<SourceRecords> second = reader.fetch();
+
+ Assertions.assertEquals(Collections.singleton("split-1"),
first.finishedSplits());
+ Assertions.assertFalse(first.finishedSplits().contains(null));
+ Assertions.assertEquals(Collections.emptySet(),
second.finishedSplits());
+ Assertions.assertFalse(second.finishedSplits().contains(null));
+ Mockito.verify(fetcher, Mockito.times(1)).pollSplitRecords();
+ }
+
+ @Test
+ void testFetchFinishedSnapshotSplitFailFastWhenCurrentSplitIdIsNull()
throws Exception {
+ DataSourceDialect<SourceConfig> dialect =
Mockito.mock(DataSourceDialect.class);
+ SourceConfig config = Mockito.mock(SourceConfig.class);
+ SchemaChangeResolver resolver =
Mockito.mock(SchemaChangeResolver.class);
+
+ IncrementalSourceSplitReader<SourceConfig> reader =
+ new IncrementalSourceSplitReader<SourceConfig>(0, dialect,
config, resolver) {
+ @Override
+ protected void checkSplitOrStartNext() {}
+ };
+
+ @SuppressWarnings("unchecked")
+ Fetcher<SourceRecords, SourceSplitBase> fetcher =
Mockito.mock(Fetcher.class);
+ Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);
+
+ setField(reader, "currentFetcher", fetcher);
+ setField(reader, "currentSplitId", null);
+
+ Assertions.assertThrows(IOException.class, reader::fetch);
+ }
+
+ @Test
+ void testFetchFinishedSnapshotSplitSupportsNextSplitAfterIdChanges()
throws Exception {
+ DataSourceDialect<SourceConfig> dialect =
Mockito.mock(DataSourceDialect.class);
+ SourceConfig config = Mockito.mock(SourceConfig.class);
+ SchemaChangeResolver resolver =
Mockito.mock(SchemaChangeResolver.class);
+
+ IncrementalSourceSplitReader<SourceConfig> reader =
+ new IncrementalSourceSplitReader<SourceConfig>(0, dialect,
config, resolver) {
+ @Override
+ protected void checkSplitOrStartNext() {}
+ };
+
+ @SuppressWarnings("unchecked")
+ Fetcher<SourceRecords, SourceSplitBase> fetcher =
Mockito.mock(Fetcher.class);
+ Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);
+
+ setField(reader, "currentFetcher", fetcher);
+ setField(reader, "currentSplitId", "split-1");
+
+ RecordsWithSplitIds<SourceRecords> first = reader.fetch();
+ RecordsWithSplitIds<SourceRecords> idle = reader.fetch();
+
+ setField(reader, "currentSplitId", "split-2");
+ RecordsWithSplitIds<SourceRecords> second = reader.fetch();
+
+ Assertions.assertEquals(Collections.singleton("split-1"),
first.finishedSplits());
+ Assertions.assertEquals(Collections.emptySet(), idle.finishedSplits());
+ Assertions.assertEquals(Collections.singleton("split-2"),
second.finishedSplits());
+ Mockito.verify(fetcher, Mockito.times(2)).pollSplitRecords();
+ }
+
+ @Test
+ void testCloseClearsState() throws Exception {
+ DataSourceDialect<SourceConfig> dialect =
Mockito.mock(DataSourceDialect.class);
+ SourceConfig config = Mockito.mock(SourceConfig.class);
+ SchemaChangeResolver resolver =
Mockito.mock(SchemaChangeResolver.class);
+
+ IncrementalSourceSplitReader<SourceConfig> reader =
+ new IncrementalSourceSplitReader<SourceConfig>(0, dialect,
config, resolver) {
+ @Override
+ protected void checkSplitOrStartNext() {}
+ };
+
+ @SuppressWarnings("unchecked")
+ Fetcher<SourceRecords, SourceSplitBase> fetcher =
Mockito.mock(Fetcher.class);
+
+ setField(reader, "currentFetcher", fetcher);
+ setField(reader, "currentSplitId", "split-1");
+ setField(reader, "emittedFinishedSplitId", "split-1");
+
+ reader.close();
+
+ Assertions.assertNull(getField(reader, "currentSplitId"));
+ Assertions.assertNull(getField(reader, "emittedFinishedSplitId"));
+ Mockito.verify(fetcher, Mockito.times(1)).close();
+ }
+
+ private static void setField(
+ IncrementalSourceSplitReader<?> reader, String fieldName, Object
value)
+ throws Exception {
+ Field field =
IncrementalSourceSplitReader.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(reader, value);
+ }
+
+ private static Object getField(IncrementalSourceSplitReader<?> reader,
String fieldName)
+ throws Exception {
+ Field field =
IncrementalSourceSplitReader.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(reader);
+ }
+}