This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 275adcce13 [Fix][connector-cdc] Fix NPE when finishing snapshot split 
due to null splitId (#10404)
275adcce13 is described below

commit 275adcce139605bf3340dcc5e807a734fbca4b66
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Feb 5 14:14:13 2026 +0800

    [Fix][connector-cdc] Fix NPE when finishing snapshot split due to null 
splitId (#10404)
    
    Co-authored-by: zengyi <[email protected]>
---
 .../reader/IncrementalSourceSplitReader.java       | 103 +++++++++++--
 .../cdc/base/source/split/ChangeEventRecords.java  |  10 ++
 .../reader/IncrementalSourceSplitReaderTest.java   | 160 +++++++++++++++++++++
 3 files changed, 262 insertions(+), 11 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
index 53f9736273..d6db68e8cc 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
@@ -37,10 +37,22 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Queue;
+import java.util.Set;
 
 @Slf4j
+/**
+ * Split reader for incremental source (snapshot + incremental phase).
+ *
+ * <p><b>Thread safety:</b> This class is NOT thread-safe and is expected to 
be used from a single
+ * thread. The {@link #fetch()} method should be called sequentially without 
concurrent access. The
+ * {@link #close()} method should be called from the same thread or after all 
fetch calls have
+ * completed.
+ *
+ * @param <C> The type of source configuration.
+ */
 public class IncrementalSourceSplitReader<C extends SourceConfig>
         implements SplitReader<SourceRecords, SourceSplitBase> {
     private final Queue<SourceSplitBase> splits;
@@ -49,6 +61,7 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
     private Fetcher<SourceRecords, SourceSplitBase> currentFetcher;
 
     private String currentSplitId;
+    private String emittedFinishedSplitId;
     private final DataSourceDialect<C> dataSourceDialect;
     private final C sourceConfig;
     private final SchemaChangeResolver schemaChangeResolver;
@@ -70,6 +83,9 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
 
         checkSplitOrStartNext();
         checkNeedStopBinlogReader();
+        if (hasEmittedCurrentSplitFinished()) {
+            return NoSplitRecords.INSTANCE;
+        }
         Iterator<SourceRecords> dataIt = null;
         try {
             dataIt = currentFetcher.pollSplitRecords();
@@ -77,9 +93,27 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
             log.warn("fetch data failed.", e);
             throw new IOException(e);
         }
-        return dataIt == null
-                ? finishedSnapshotSplit()
-                : ChangeEventRecords.forRecords(currentSplitId, dataIt);
+        if (dataIt == null) {
+            return finishedSnapshotSplit();
+        }
+        if (currentSplitId == null) {
+            log.warn(
+                    "Invalid state: currentSplitId is null when emitting 
records. "
+                            + "emittedFinishedSplitId={}, currentFetcher={}, 
isFinished={}",
+                    emittedFinishedSplitId,
+                    currentFetcher != null ? 
currentFetcher.getClass().getSimpleName() : "null",
+                    currentFetcher != null && currentFetcher.isFinished());
+            throw new IOException(
+                    String.format(
+                            "Invalid state: currentSplitId is null when 
emitting records. "
+                                    + "emittedFinishedSplitId=%s, 
currentFetcher=%s, isFinished=%s",
+                            emittedFinishedSplitId,
+                            currentFetcher != null
+                                    ? currentFetcher.getClass().getSimpleName()
+                                    : "null",
+                            currentFetcher != null && 
currentFetcher.isFinished()));
+        }
+        return ChangeEventRecords.forRecords(currentSplitId, dataIt);
     }
 
     @Override
@@ -100,10 +134,14 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
 
     @Override
     public void close() throws Exception {
-        if (currentFetcher != null) {
-            log.info("Close current fetcher {}", 
currentFetcher.getClass().getCanonicalName());
-            currentFetcher.close();
+        try {
+            if (currentFetcher != null) {
+                log.info("Close current fetcher {}", 
currentFetcher.getClass().getCanonicalName());
+                currentFetcher.close();
+            }
+        } finally {
             currentSplitId = null;
+            emittedFinishedSplitId = null;
         }
     }
 
@@ -123,6 +161,7 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
                 throw new IOException("Cannot fetch from another split - no 
split remaining.");
             }
             currentSplitId = nextSplit.splitId();
+            emittedFinishedSplitId = null;
 
             if (nextSplit.isSnapshotSplit()) {
                 if (currentFetcher == null) {
@@ -152,10 +191,52 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
         return currentFetcher == null || currentFetcher.isFinished();
     }
 
-    private ChangeEventRecords finishedSnapshotSplit() {
-        final ChangeEventRecords finishedRecords =
-                ChangeEventRecords.forFinishedSplit(currentSplitId);
-        currentSplitId = null;
-        return finishedRecords;
+    private boolean hasEmittedCurrentSplitFinished() {
+        return currentSplitId != null && 
currentSplitId.equals(emittedFinishedSplitId);
+    }
+
+    private RecordsWithSplitIds<SourceRecords> finishedSnapshotSplit() throws 
IOException {
+        final String splitId = currentSplitId;
+        if (splitId == null) {
+            log.warn(
+                    "Invalid state: currentSplitId is null when finishing 
snapshot split. "
+                            + "emittedFinishedSplitId={}, currentFetcher={}, 
isFinished={}",
+                    emittedFinishedSplitId,
+                    currentFetcher != null ? 
currentFetcher.getClass().getSimpleName() : "null",
+                    currentFetcher != null && currentFetcher.isFinished());
+            throw new IOException(
+                    String.format(
+                            "Invalid state: currentSplitId is null when 
finishing snapshot split. "
+                                    + "emittedFinishedSplitId=%s, 
currentFetcher=%s, isFinished=%s",
+                            emittedFinishedSplitId,
+                            currentFetcher != null
+                                    ? currentFetcher.getClass().getSimpleName()
+                                    : "null",
+                            currentFetcher != null && 
currentFetcher.isFinished()));
+        }
+        if (splitId.equals(emittedFinishedSplitId)) {
+            return NoSplitRecords.INSTANCE;
+        }
+        emittedFinishedSplitId = splitId;
+        return ChangeEventRecords.forFinishedSplit(splitId);
+    }
+
+    private static final class NoSplitRecords implements 
RecordsWithSplitIds<SourceRecords> {
+        private static final NoSplitRecords INSTANCE = new NoSplitRecords();
+
+        @Override
+        public String nextSplit() {
+            return null;
+        }
+
+        @Override
+        public SourceRecords nextRecordFromSplit() {
+            throw new IllegalStateException("No split assigned");
+        }
+
+        @Override
+        public Set<String> finishedSplits() {
+            return Collections.emptySet();
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java
index 44c80a5ec5..cd81883d11 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java
@@ -74,7 +74,17 @@ public final class ChangeEventRecords implements 
RecordsWithSplitIds<SourceRecor
         return new ChangeEventRecords(splitId, recordsForSplit, 
Collections.emptySet());
     }
 
+    /**
+     * Creates a {@link ChangeEventRecords} that only indicates a split is 
finished.
+     *
+     * @param splitId the ID of the finished split, must not be null
+     * @return a new {@link ChangeEventRecords} instance
+     * @throws IllegalArgumentException if splitId is null
+     */
     public static ChangeEventRecords forFinishedSplit(final String splitId) {
+        if (splitId == null) {
+            throw new IllegalArgumentException("splitId must not be null");
+        }
         return new ChangeEventRecords(null, null, 
Collections.singleton(splitId));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java
new file mode 100644
index 0000000000..11b452fd3a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.reader;
+
+import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
+import org.apache.seatunnel.connectors.cdc.base.source.reader.external.Fetcher;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Collections;
+
+class IncrementalSourceSplitReaderTest {
+
+    @Test
+    void testFetchFinishedSnapshotSplitEmitsFinishedOnlyOnce() throws 
Exception {
+        DataSourceDialect<SourceConfig> dialect = 
Mockito.mock(DataSourceDialect.class);
+        SourceConfig config = Mockito.mock(SourceConfig.class);
+        SchemaChangeResolver resolver = 
Mockito.mock(SchemaChangeResolver.class);
+
+        IncrementalSourceSplitReader<SourceConfig> reader =
+                new IncrementalSourceSplitReader<SourceConfig>(0, dialect, 
config, resolver) {
+                    @Override
+                    protected void checkSplitOrStartNext() {}
+                };
+
+        @SuppressWarnings("unchecked")
+        Fetcher<SourceRecords, SourceSplitBase> fetcher = 
Mockito.mock(Fetcher.class);
+        Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);
+
+        setField(reader, "currentFetcher", fetcher);
+        setField(reader, "currentSplitId", "split-1");
+
+        RecordsWithSplitIds<SourceRecords> first = reader.fetch();
+        RecordsWithSplitIds<SourceRecords> second = reader.fetch();
+
+        Assertions.assertEquals(Collections.singleton("split-1"), 
first.finishedSplits());
+        Assertions.assertFalse(first.finishedSplits().contains(null));
+        Assertions.assertEquals(Collections.emptySet(), 
second.finishedSplits());
+        Assertions.assertFalse(second.finishedSplits().contains(null));
+        Mockito.verify(fetcher, Mockito.times(1)).pollSplitRecords();
+    }
+
+    @Test
+    void testFetchFinishedSnapshotSplitFailFastWhenCurrentSplitIdIsNull() 
throws Exception {
+        DataSourceDialect<SourceConfig> dialect = 
Mockito.mock(DataSourceDialect.class);
+        SourceConfig config = Mockito.mock(SourceConfig.class);
+        SchemaChangeResolver resolver = 
Mockito.mock(SchemaChangeResolver.class);
+
+        IncrementalSourceSplitReader<SourceConfig> reader =
+                new IncrementalSourceSplitReader<SourceConfig>(0, dialect, 
config, resolver) {
+                    @Override
+                    protected void checkSplitOrStartNext() {}
+                };
+
+        @SuppressWarnings("unchecked")
+        Fetcher<SourceRecords, SourceSplitBase> fetcher = 
Mockito.mock(Fetcher.class);
+        Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);
+
+        setField(reader, "currentFetcher", fetcher);
+        setField(reader, "currentSplitId", null);
+
+        Assertions.assertThrows(IOException.class, reader::fetch);
+    }
+
+    @Test
+    void testFetchFinishedSnapshotSplitSupportsNextSplitAfterIdChanges() 
throws Exception {
+        DataSourceDialect<SourceConfig> dialect = 
Mockito.mock(DataSourceDialect.class);
+        SourceConfig config = Mockito.mock(SourceConfig.class);
+        SchemaChangeResolver resolver = 
Mockito.mock(SchemaChangeResolver.class);
+
+        IncrementalSourceSplitReader<SourceConfig> reader =
+                new IncrementalSourceSplitReader<SourceConfig>(0, dialect, 
config, resolver) {
+                    @Override
+                    protected void checkSplitOrStartNext() {}
+                };
+
+        @SuppressWarnings("unchecked")
+        Fetcher<SourceRecords, SourceSplitBase> fetcher = 
Mockito.mock(Fetcher.class);
+        Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);
+
+        setField(reader, "currentFetcher", fetcher);
+        setField(reader, "currentSplitId", "split-1");
+
+        RecordsWithSplitIds<SourceRecords> first = reader.fetch();
+        RecordsWithSplitIds<SourceRecords> idle = reader.fetch();
+
+        setField(reader, "currentSplitId", "split-2");
+        RecordsWithSplitIds<SourceRecords> second = reader.fetch();
+
+        Assertions.assertEquals(Collections.singleton("split-1"), 
first.finishedSplits());
+        Assertions.assertEquals(Collections.emptySet(), idle.finishedSplits());
+        Assertions.assertEquals(Collections.singleton("split-2"), 
second.finishedSplits());
+        Mockito.verify(fetcher, Mockito.times(2)).pollSplitRecords();
+    }
+
+    @Test
+    void testCloseClearsState() throws Exception {
+        DataSourceDialect<SourceConfig> dialect = 
Mockito.mock(DataSourceDialect.class);
+        SourceConfig config = Mockito.mock(SourceConfig.class);
+        SchemaChangeResolver resolver = 
Mockito.mock(SchemaChangeResolver.class);
+
+        IncrementalSourceSplitReader<SourceConfig> reader =
+                new IncrementalSourceSplitReader<SourceConfig>(0, dialect, 
config, resolver) {
+                    @Override
+                    protected void checkSplitOrStartNext() {}
+                };
+
+        @SuppressWarnings("unchecked")
+        Fetcher<SourceRecords, SourceSplitBase> fetcher = 
Mockito.mock(Fetcher.class);
+
+        setField(reader, "currentFetcher", fetcher);
+        setField(reader, "currentSplitId", "split-1");
+        setField(reader, "emittedFinishedSplitId", "split-1");
+
+        reader.close();
+
+        Assertions.assertNull(getField(reader, "currentSplitId"));
+        Assertions.assertNull(getField(reader, "emittedFinishedSplitId"));
+        Mockito.verify(fetcher, Mockito.times(1)).close();
+    }
+
+    private static void setField(
+            IncrementalSourceSplitReader<?> reader, String fieldName, Object 
value)
+            throws Exception {
+        Field field = 
IncrementalSourceSplitReader.class.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(reader, value);
+    }
+
+    private static Object getField(IncrementalSourceSplitReader<?> reader, 
String fieldName)
+            throws Exception {
+        Field field = 
IncrementalSourceSplitReader.class.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return field.get(reader);
+    }
+}

Reply via email to