This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fe33422161 [Improve][CDC] Optimize split state memory allocation in 
increment phase (#6554)
fe33422161 is described below

commit fe33422161b4ad7f210b1f3d9eb3de49c4657cc3
Author: hailin0 <[email protected]>
AuthorDate: Tue Apr 2 16:09:18 2024 +0800

    [Improve][CDC] Optimize split state memory allocation in increment phase 
(#6554)
---
 pom.xml                                            |  14 +
 .../base/relational/JdbcSourceEventDispatcher.java |  18 +
 .../source/enumerator/HybridSplitAssigner.java     |  22 ++
 .../enumerator/IncrementalSourceEnumerator.java    |  12 +
 .../enumerator/IncrementalSplitAssigner.java       |  23 ++
 .../source/enumerator/SnapshotSplitAssigner.java   |  28 ++
 .../source/event/CompletedSnapshotPhaseEvent.java  |  34 ++
 .../source/reader/IncrementalSourceReader.java     |  15 +-
 .../reader/IncrementalSourceRecordEmitter.java     |  28 ++
 .../external/IncrementalSourceStreamFetcher.java   |  15 +-
 .../source/split/state/IncrementalSplitState.java  |  41 +++
 .../cdc/base/utils/SourceRecordUtils.java          |   7 +
 .../source/enumerator/HybridSplitAssignerTest.java | 132 ++++++++
 .../IncrementalSourceStreamFetcherTest.java        | 367 +++++++++++++++++++++
 .../split/state/IncrementalSplitStateTest.java     | 169 ++++++++++
 .../LogMinerStreamingChangeEventSource.java        |   1 +
 .../SqlServerStreamingChangeEventSource.java       |   1 +
 .../common/source/reader/SourceReaderBase.java     |   7 +-
 18 files changed, 921 insertions(+), 13 deletions(-)

diff --git a/pom.xml b/pom.xml
index 4d4a3731ae..0f59747f72 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,6 +117,7 @@
         <jcommander.version>1.81</jcommander.version>
         <junit4.version>4.13.2</junit4.version>
         <junit5.version>5.9.0</junit5.version>
+        <mockito.version>4.11.0</mockito.version>
         <config.version>1.3.3</config.version>
         <maven-shade-plugin.version>3.3.0</maven-shade-plugin.version>
         <maven-helper-plugin.version>3.2.0</maven-helper-plugin.version>
@@ -357,6 +358,13 @@
                 <version>${junit4.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.mockito</groupId>
+                <artifactId>mockito-junit-jupiter</artifactId>
+                <version>${mockito.version}</version>
+                <scope>test</scope>
+            </dependency>
+
             <dependency>
                 <groupId>com.fasterxml.jackson.core</groupId>
                 <artifactId>jackson-annotations</artifactId>
@@ -521,6 +529,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
index 23dfabd9fa..90cc8126f4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
@@ -25,7 +25,9 @@ import 
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.Watermar
 import org.apache.kafka.connect.source.SourceRecord;
 
 import io.debezium.config.CommonConnectorConfig;
+import io.debezium.config.Configuration;
 import io.debezium.connector.base.ChangeEventQueue;
+import io.debezium.heartbeat.Heartbeat;
 import io.debezium.pipeline.DataChangeEvent;
 import io.debezium.pipeline.EventDispatcher;
 import io.debezium.pipeline.source.spi.EventMetadataProvider;
@@ -37,6 +39,8 @@ import io.debezium.schema.DatabaseSchema;
 import io.debezium.schema.TopicSelector;
 import io.debezium.util.SchemaNameAdjuster;
 
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.Map;
 
 /**
@@ -71,6 +75,10 @@ public class JdbcSourceEventDispatcher extends 
EventDispatcher<TableId> {
                 filter,
                 changeEventCreator,
                 metadataProvider,
+                Heartbeat.create(
+                        getHeartbeatInterval(connectorConfig),
+                        topicSelector.getHeartbeatTopic(),
+                        connectorConfig.getLogicalName()),
                 schemaNameAdjuster);
         this.queue = queue;
         this.topic = topicSelector.getPrimaryTopic();
@@ -92,4 +100,14 @@ public class JdbcSourceEventDispatcher extends 
EventDispatcher<TableId> {
                         sourcePartition, topic, sourceSplit.splitId(), 
watermarkKind, watermark);
         queue.enqueue(new DataChangeEvent(sourceRecord));
     }
+
+    private static Duration getHeartbeatInterval(CommonConnectorConfig 
connectorConfig) {
+        Configuration configuration = connectorConfig.getConfig();
+        Duration heartbeatInterval =
+                configuration.getDuration(Heartbeat.HEARTBEAT_INTERVAL, 
ChronoUnit.MILLIS);
+        if (heartbeatInterval.isZero()) {
+            return Duration.ofMillis(5000);
+        }
+        return heartbeatInterval;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
index 9070e2fb88..d6b0bdb96c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.cdc.base.source.enumerator;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
@@ -31,9 +33,11 @@ import org.slf4j.LoggerFactory;
 import io.debezium.relational.TableId;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Predicate;
 
 /** Assigner for Hybrid split which contains snapshot splits and incremental 
splits. */
 public class HybridSplitAssigner<C extends SourceConfig> implements 
SplitAssigner {
@@ -146,4 +150,22 @@ public class HybridSplitAssigner<C extends SourceConfig> 
implements SplitAssigne
         snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
         incrementalSplitAssigner.notifyCheckpointComplete(checkpointId);
     }
+
+    @VisibleForTesting
+    IncrementalSplitAssigner<C> getIncrementalSplitAssigner() {
+        return incrementalSplitAssigner;
+    }
+
+    @VisibleForTesting
+    SnapshotSplitAssigner<C> getSnapshotSplitAssigner() {
+        return snapshotSplitAssigner;
+    }
+
+    public boolean completedSnapshotPhase(List<TableId> tableIds) {
+        return Arrays.asList(
+                        snapshotSplitAssigner.completedSnapshotPhase(tableIds),
+                        
incrementalSplitAssigner.completedSnapshotPhase(tableIds))
+                .stream()
+                .allMatch(Predicate.isEqual(true));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
index 86f7ac42de..b17b910e5d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.cdc.base.source.enumerator;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
+import 
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
 import 
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsAckEvent;
 import 
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
 import 
org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
@@ -120,6 +121,17 @@ public class IncrementalSourceEnumerator
                                     .map(SnapshotSplitWatermark::getSplitId)
                                     .collect(Collectors.toList()));
             context.sendEventToSourceReader(subtaskId, ackEvent);
+        } else if (sourceEvent instanceof CompletedSnapshotPhaseEvent) {
+            LOG.debug(
+                    "The enumerator receives completed snapshot phase event {} 
from subtask {}.",
+                    sourceEvent,
+                    subtaskId);
+            CompletedSnapshotPhaseEvent event = (CompletedSnapshotPhaseEvent) 
sourceEvent;
+            if (splitAssigner instanceof HybridSplitAssigner) {
+                ((HybridSplitAssigner) 
splitAssigner).completedSnapshotPhase(event.getTableIds());
+                LOG.info(
+                        "Clean the 
SnapshotSplitAssigner#assignedSplits/splitCompletedOffsets to empty.");
+            }
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
index fe8204f6cd..7b45ee1ef6 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.cdc.base.source.enumerator;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
@@ -45,6 +47,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
+
 /** Assigner for incremental split. */
 public class IncrementalSplitAssigner<C extends SourceConfig> implements 
SplitAssigner {
 
@@ -255,4 +259,23 @@ public class IncrementalSplitAssigner<C extends 
SourceConfig> implements SplitAs
                 completedSnapshotSplitInfos,
                 checkpointDataType);
     }
+
+    @VisibleForTesting
+    void setSplitAssigned(boolean assigned) {
+        this.splitAssigned = assigned;
+    }
+
+    public boolean completedSnapshotPhase(List<TableId> tableIds) {
+        checkArgument(splitAssigned && noMoreSplits());
+
+        for (String splitKey : new 
ArrayList<>(context.getAssignedSnapshotSplit().keySet())) {
+            SnapshotSplit assignedSplit = 
context.getAssignedSnapshotSplit().get(splitKey);
+            if (tableIds.contains(assignedSplit.getTableId())) {
+                context.getAssignedSnapshotSplit().remove(splitKey);
+                
context.getSplitCompletedOffsets().remove(assignedSplit.splitId());
+            }
+        }
+        return context.getAssignedSnapshotSplit().isEmpty()
+                && context.getSplitCompletedOffsets().isEmpty();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
index 443343947c..c16dd81102 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.cdc.base.source.enumerator;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
@@ -45,6 +47,8 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
+
 /** Assigner for snapshot split. */
 public class SnapshotSplitAssigner<C extends SourceConfig> implements 
SplitAssigner {
     private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotSplitAssigner.class);
@@ -278,4 +282,28 @@ public class SnapshotSplitAssigner<C extends SourceConfig> 
implements SplitAssig
     private boolean allSplitsCompleted() {
         return noMoreSplits() && assignedSplits.size() == 
splitCompletedOffsets.size();
     }
+
+    @VisibleForTesting
+    Map<String, SnapshotSplit> getAssignedSplits() {
+        return assignedSplits;
+    }
+
+    @VisibleForTesting
+    Map<String, SnapshotSplitWatermark> getSplitCompletedOffsets() {
+        return splitCompletedOffsets;
+    }
+
+    public boolean completedSnapshotPhase(List<TableId> tableIds) {
+        checkArgument(isCompleted() && allSplitsCompleted());
+
+        for (String splitKey : new ArrayList<>(assignedSplits.keySet())) {
+            SnapshotSplit assignedSplit = assignedSplits.get(splitKey);
+            if (tableIds.contains(assignedSplit.getTableId())) {
+                assignedSplits.remove(splitKey);
+                splitCompletedOffsets.remove(assignedSplit.splitId());
+            }
+        }
+
+        return assignedSplits.isEmpty() && splitCompletedOffsets.isEmpty();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotPhaseEvent.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotPhaseEvent.java
new file mode 100644
index 0000000000..623bf1c1a9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotPhaseEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.event;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+
+import io.debezium.relational.TableId;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class CompletedSnapshotPhaseEvent implements SourceEvent {
+    private static final long serialVersionUID = 1L;
+
+    private List<TableId> tableIds;
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 7f9d890197..829f68763d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import 
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
 import 
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
 import 
org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
 import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
@@ -207,7 +208,19 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
                 debeziumDeserializationSchema.restoreCheckpointProducedType(
                         incrementalSplit.getCheckpointDataType());
             }
-            return new IncrementalSplitState(split.asIncrementalSplit());
+            IncrementalSplitState splitState = new 
IncrementalSplitState(incrementalSplit);
+            if (splitState.autoEnterPureIncrementPhaseIfAllowed()) {
+                log.info(
+                        "The incremental split[{}] startup position {} is 
equal the maxSnapshotSplitsHighWatermark {}, auto enter pure increment phase.",
+                        incrementalSplit.splitId(),
+                        splitState.getStartupOffset(),
+                        splitState.getMaxSnapshotSplitsHighWatermark());
+                log.info("Clean the 
IncrementalSplit#completedSnapshotSplitInfos to empty.");
+                CompletedSnapshotPhaseEvent event =
+                        new 
CompletedSnapshotPhaseEvent(splitState.getTableIds());
+                context.sendSourceEventToEnumerator(event);
+            }
+            return splitState;
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
index 65474a0d9f..a98a9d0959 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -22,9 +22,11 @@ import org.apache.seatunnel.api.event.EventListener;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
+import 
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
+import 
org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
 import 
org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
 import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
@@ -65,6 +67,7 @@ public class IncrementalSourceRecordEmitter<T>
 
     protected final OffsetFactory offsetFactory;
 
+    protected final SourceReader.Context context;
     protected final Counter recordFetchDelay;
     protected final Counter recordEmitDelay;
     protected final EventListener eventListener;
@@ -76,6 +79,7 @@ public class IncrementalSourceRecordEmitter<T>
         this.debeziumDeserializationSchema = debeziumDeserializationSchema;
         this.outputCollector = new OutputCollector<>();
         this.offsetFactory = offsetFactory;
+        this.context = context;
         this.recordFetchDelay = 
context.getMetricsContext().counter(CDC_RECORD_FETCH_DELAY);
         this.recordEmitDelay = 
context.getMetricsContext().counter(CDC_RECORD_EMIT_DELAY);
         this.eventListener = context.getEventListener();
@@ -90,6 +94,7 @@ public class IncrementalSourceRecordEmitter<T>
             SourceRecord next = elementIterator.next();
             reportMetrics(next);
             processElement(next, collector, splitState);
+            markEnterPureIncrementPhase(next, splitState);
         }
     }
 
@@ -138,6 +143,29 @@ public class IncrementalSourceRecordEmitter<T>
         }
     }
 
+    private void markEnterPureIncrementPhase(
+            SourceRecord element, SourceSplitStateBase splitState) {
+        if (splitState.isIncrementalSplitState()) {
+            IncrementalSplitState incrementalSplitState = 
splitState.asIncrementalSplitState();
+            if (incrementalSplitState.isEnterPureIncrementPhase()) {
+                return;
+            }
+            Offset position = getOffsetPosition(element);
+            if 
(incrementalSplitState.markEnterPureIncrementPhaseIfNeed(position)) {
+                log.info(
+                        "The current record position {} is after the 
maxSnapshotSplitsHighWatermark {}, "
+                                + "mark enter pure increment phase.",
+                        position,
+                        
incrementalSplitState.getMaxSnapshotSplitsHighWatermark());
+                log.info("Clean the 
IncrementalSplit#completedSnapshotSplitInfos to empty.");
+
+                CompletedSnapshotPhaseEvent completedSnapshotPhaseEvent =
+                        new 
CompletedSnapshotPhaseEvent(incrementalSplitState.getTableIds());
+                
context.sendSourceEventToEnumerator(completedSnapshotPhaseEvent);
+            }
+        }
+    }
+
     private Offset getWatermark(SourceRecord watermarkEvent) {
         return getOffsetPosition(watermarkEvent.sourceOffset());
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index e34970054b..4cad739ac6 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -170,7 +170,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
      * <p>After event batch: [checkpoint-before] [SchemaChangeEvent-1, 
SchemaChangeEvent-2,
      * checkpoint-after] [a, b, c, d, e]
      */
-    private Iterator<SourceRecords> 
splitSchemaChangeStream(List<DataChangeEvent> batchEvents) {
+    Iterator<SourceRecords> splitSchemaChangeStream(List<DataChangeEvent> 
batchEvents) {
         List<SourceRecords> sourceRecordsSet = new ArrayList<>();
 
         List<SourceRecord> sourceRecordList = new ArrayList<>();
@@ -181,11 +181,6 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
             if (!shouldEmit(currentRecord)) {
                 continue;
             }
-            if (!SourceRecordUtils.isDataChangeRecord(currentRecord)
-                    && !SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
-                sourceRecordList.add(currentRecord);
-                continue;
-            }
 
             if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
                 if (!schemaChangeResolver.support(currentRecord)) {
@@ -208,9 +203,11 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
                     sourceRecordList = new ArrayList<>();
                     sourceRecordList.add(currentRecord);
                 }
-            } else if (SourceRecordUtils.isDataChangeRecord(currentRecord)) {
+            } else if (SourceRecordUtils.isDataChangeRecord(currentRecord)
+                    || SourceRecordUtils.isHeartbeatRecord(currentRecord)) {
                 if (previousRecord == null
-                        || 
SourceRecordUtils.isDataChangeRecord(previousRecord)) {
+                        || SourceRecordUtils.isDataChangeRecord(previousRecord)
+                        || 
SourceRecordUtils.isHeartbeatRecord(previousRecord)) {
                     sourceRecordList.add(currentRecord);
                 } else {
                     sourceRecordList.add(
@@ -274,7 +271,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
     }
 
     /** Returns the record should emit or not. */
-    private boolean shouldEmit(SourceRecord sourceRecord) {
+    boolean shouldEmit(SourceRecord sourceRecord) {
         if (taskContext.isDataChangeRecord(sourceRecord)) {
             Offset position = taskContext.getStreamOffset(sourceRecord);
             TableId tableId = getTableId(sourceRecord);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
index 4157569766..c04026bf1e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
@@ -24,6 +24,7 @@ import io.debezium.relational.TableId;
 import lombok.Getter;
 import lombok.Setter;
 
+import java.util.Comparator;
 import java.util.List;
 
 /** The state of split to describe the change log of table(s). */
@@ -39,11 +40,27 @@ public class IncrementalSplitState extends 
SourceSplitStateBase {
     /** Obtained by configuration, may not end */
     private Offset stopOffset;
 
+    private Offset maxSnapshotSplitsHighWatermark;
+    private volatile boolean enterPureIncrementPhase;
+
     public IncrementalSplitState(IncrementalSplit split) {
         super(split);
         this.tableIds = split.getTableIds();
         this.startupOffset = split.getStartupOffset();
         this.stopOffset = split.getStopOffset();
+
+        if (split.getCompletedSnapshotSplitInfos().isEmpty()) {
+            this.maxSnapshotSplitsHighWatermark = null;
+            this.enterPureIncrementPhase = true;
+        } else {
+            this.maxSnapshotSplitsHighWatermark =
+                    split.getCompletedSnapshotSplitInfos().stream()
+                            .filter(e -> e.getWatermark() != null)
+                            .max(Comparator.comparing(o -> 
o.getWatermark().getHighWatermark()))
+                            .map(e -> e.getWatermark().getHighWatermark())
+                            .get();
+            this.enterPureIncrementPhase = false;
+        }
     }
 
     @Override
@@ -56,4 +73,28 @@ public class IncrementalSplitState extends 
SourceSplitStateBase {
                 getStopOffset(),
                 incrementalSplit.getCompletedSnapshotSplitInfos());
     }
+
+    public synchronized boolean markEnterPureIncrementPhaseIfNeed(Offset 
currentRecordPosition) {
+        if (enterPureIncrementPhase) {
+            return false;
+        }
+
+        if (currentRecordPosition.isAtOrAfter(maxSnapshotSplitsHighWatermark)) 
{
+            
split.asIncrementalSplit().getCompletedSnapshotSplitInfos().clear();
+            this.enterPureIncrementPhase = true;
+            return true;
+        }
+
+        return false;
+    }
+
+    public synchronized boolean autoEnterPureIncrementPhaseIfAllowed() {
+        if (!enterPureIncrementPhase
+                && maxSnapshotSplitsHighWatermark.compareTo(startupOffset) == 
0) {
+            
split.asIncrementalSplit().getCompletedSnapshotSplitInfos().clear();
+            enterPureIncrementPhase = true;
+            return true;
+        }
+        return false;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
index abbdb5b76a..e06213b06d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java
@@ -47,6 +47,8 @@ public class SourceRecordUtils {
 
     public static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
             "io.debezium.connector.mysql.SchemaChangeKey";
+    public static final String HEARTBEAT_VALUE_SCHEMA_KEY_NAME =
+            "io.debezium.connector.common.Heartbeat";
     private static final DocumentReader DOCUMENT_READER = 
DocumentReader.defaultReader();
 
     /** Converts a {@link ResultSet} row to an array of Objects. */
@@ -105,6 +107,11 @@ public class SourceRecordUtils {
                 && value.getString(Envelope.FieldName.OPERATION) != null;
     }
 
+    public static boolean isHeartbeatRecord(SourceRecord record) {
+        Schema valueSchema = record.valueSchema();
+        return valueSchema != null && 
valueSchema.name().equals(HEARTBEAT_VALUE_SCHEMA_KEY_NAME);
+    }
+
     public static TableId getTableId(SourceRecord dataRecord) {
         Struct value = (Struct) dataRecord.value();
         Struct source = value.getStruct(Envelope.FieldName.SOURCE);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssignerTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssignerTest.java
new file mode 100644
index 0000000000..2c931eb9e4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssignerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.enumerator;
+
+import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
+import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
+import 
org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HybridSplitAssignerTest {
+    @Test
+    public void testCompletedSnapshotPhase() {
+        Map<String, SnapshotSplit> assignedSplits = createAssignedSplits();
+        Map<String, SnapshotSplitWatermark> splitCompletedOffsets = 
createSplitCompletedOffsets();
+        SnapshotPhaseState snapshotPhaseState =
+                new SnapshotPhaseState(
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        assignedSplits,
+                        splitCompletedOffsets,
+                        true,
+                        Collections.emptyList(),
+                        false,
+                        false);
+        HybridPendingSplitsState checkpointState =
+                new HybridPendingSplitsState(snapshotPhaseState, null);
+        SplitAssigner.Context context =
+                new SplitAssigner.Context<>(
+                        null,
+                        Collections.emptySet(),
+                        
checkpointState.getSnapshotPhaseState().getAssignedSplits(),
+                        
checkpointState.getSnapshotPhaseState().getSplitCompletedOffsets());
+        HybridSplitAssigner splitAssigner =
+                new HybridSplitAssigner<>(context, 1, 1, checkpointState, 
null, null);
+        splitAssigner.getIncrementalSplitAssigner().setSplitAssigned(true);
+
+        Assertions.assertFalse(
+                
splitAssigner.completedSnapshotPhase(Arrays.asList(TableId.parse("db1.table1"))));
+        Assertions.assertFalse(
+                
splitAssigner.getSnapshotSplitAssigner().getAssignedSplits().isEmpty());
+        Assertions.assertFalse(
+                
splitAssigner.getSnapshotSplitAssigner().getSplitCompletedOffsets().isEmpty());
+        Assertions.assertFalse(context.getAssignedSnapshotSplit().isEmpty());
+        Assertions.assertFalse(context.getSplitCompletedOffsets().isEmpty());
+
+        Assertions.assertTrue(
+                
splitAssigner.completedSnapshotPhase(Arrays.asList(TableId.parse("db1.table2"))));
+        Assertions.assertTrue(
+                
splitAssigner.getSnapshotSplitAssigner().getAssignedSplits().isEmpty());
+        Assertions.assertTrue(
+                
splitAssigner.getSnapshotSplitAssigner().getSplitCompletedOffsets().isEmpty());
+        Assertions.assertTrue(context.getAssignedSnapshotSplit().isEmpty());
+        Assertions.assertTrue(context.getSplitCompletedOffsets().isEmpty());
+    }
+
+    private static Map<String, SnapshotSplit> createAssignedSplits() {
+        return Stream.of(
+                        new AbstractMap.SimpleEntry<>(
+                                "db1.table1.1",
+                                new SnapshotSplit(
+                                        "db1.table1.1",
+                                        TableId.parse("db1.table1"),
+                                        null,
+                                        null,
+                                        null)),
+                        new AbstractMap.SimpleEntry<>(
+                                "db1.table1.2",
+                                new SnapshotSplit(
+                                        "db1.table1.2",
+                                        TableId.parse("db1.table1"),
+                                        null,
+                                        null,
+                                        null)),
+                        new AbstractMap.SimpleEntry<>(
+                                "db1.table2.1",
+                                new SnapshotSplit(
+                                        "db1.table2.1",
+                                        TableId.parse("db1.table2"),
+                                        null,
+                                        null,
+                                        null)),
+                        new AbstractMap.SimpleEntry<>(
+                                "db1.table2.2",
+                                new SnapshotSplit(
+                                        "db1.table2.2",
+                                        TableId.parse("db1.table2"),
+                                        null,
+                                        null,
+                                        null)))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
+    private static Map<String, SnapshotSplitWatermark> 
createSplitCompletedOffsets() {
+        return Stream.of(
+                        new AbstractMap.SimpleEntry<>(
+                                "db1.table1.1", new 
SnapshotSplitWatermark(null, null, null)),
+                        new AbstractMap.SimpleEntry<>(
+                                "db1.table1.2", new 
SnapshotSplitWatermark(null, null, null)),
+                        new AbstractMap.SimpleEntry<>(
+                                "db1.table2.1", new 
SnapshotSplitWatermark(null, null, null)),
+                        new AbstractMap.SimpleEntry<>(
+                                "db1.table2.2", new 
SnapshotSplitWatermark(null, null, null)))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
new file mode 100644
index 0000000000..64ac4f4a0c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.reader.external;
+
+import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
+import 
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
+import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.data.Envelope;
+import io.debezium.heartbeat.Heartbeat;
+import io.debezium.pipeline.DataChangeEvent;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class IncrementalSourceStreamFetcherTest {
+
+    @Test
+    public void testSplitSchemaChangeStream() throws Exception {
+        IncrementalSourceStreamFetcher fetcher = createFetcher();
+
+        List<DataChangeEvent> inputEvents = new ArrayList<>();
+        List<SourceRecords> records = new ArrayList<>();
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        Iterator<SourceRecords> outputEvents = 
fetcher.splitSchemaChangeStream(inputEvents);
+        outputEvents.forEachRemaining(records::add);
+
+        Assertions.assertEquals(1, records.size());
+        Assertions.assertEquals(2, 
records.get(0).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(1)));
+
+        inputEvents = new ArrayList<>();
+        records = new ArrayList<>();
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+        outputEvents.forEachRemaining(records::add);
+
+        Assertions.assertEquals(2, records.size());
+        Assertions.assertEquals(1, 
records.get(0).getSourceRecordList().size());
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+                        records.get(0).getSourceRecordList().get(0)));
+        Assertions.assertEquals(3, 
records.get(1).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+                        records.get(1).getSourceRecordList().get(2)));
+
+        inputEvents = new ArrayList<>();
+        records = new ArrayList<>();
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+        outputEvents.forEachRemaining(records::add);
+
+        Assertions.assertEquals(2, records.size());
+        Assertions.assertEquals(3, 
records.get(0).getSourceRecordList().size());
+        Assertions.assertEquals(3, 
records.get(1).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+                        records.get(0).getSourceRecordList().get(2)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+                        records.get(1).getSourceRecordList().get(2)));
+
+        inputEvents = new ArrayList<>();
+        records = new ArrayList<>();
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+        outputEvents.forEachRemaining(records::add);
+
+        Assertions.assertEquals(3, records.size());
+        Assertions.assertEquals(1, 
records.get(0).getSourceRecordList().size());
+        Assertions.assertEquals(3, 
records.get(1).getSourceRecordList().size());
+        Assertions.assertEquals(2, 
records.get(2).getSourceRecordList().size());
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+                        records.get(0).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+                        records.get(1).getSourceRecordList().get(2)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(2).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(2).getSourceRecordList().get(1)));
+
+        inputEvents = new ArrayList<>();
+        records = new ArrayList<>();
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+        outputEvents.forEachRemaining(records::add);
+
+        Assertions.assertEquals(3, records.size());
+        Assertions.assertEquals(2, 
records.get(0).getSourceRecordList().size());
+        Assertions.assertEquals(3, 
records.get(1).getSourceRecordList().size());
+        Assertions.assertEquals(1, 
records.get(2).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+                        records.get(0).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+                        records.get(1).getSourceRecordList().get(2)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(2).getSourceRecordList().get(0)));
+
+        inputEvents = new ArrayList<>();
+        records = new ArrayList<>();
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+        outputEvents.forEachRemaining(records::add);
+
+        Assertions.assertEquals(4, records.size());
+        Assertions.assertEquals(2, 
records.get(0).getSourceRecordList().size());
+        Assertions.assertEquals(2, 
records.get(1).getSourceRecordList().size());
+        Assertions.assertEquals(2, 
records.get(2).getSourceRecordList().size());
+        Assertions.assertEquals(2, 
records.get(3).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+                        records.get(0).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+                        records.get(1).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(2).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+                        records.get(2).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(3).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+                        records.get(3).getSourceRecordList().get(1)));
+
+        inputEvents = new ArrayList<>();
+        records = new ArrayList<>();
+        inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createHeartbeatEvent()));
+        outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
+        outputEvents.forEachRemaining(records::add);
+
+        Assertions.assertEquals(11, records.size());
+        Assertions.assertEquals(3, 
records.get(0).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isHeartbeatRecord(records.get(0).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(0).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+                        records.get(0).getSourceRecordList().get(2)));
+        Assertions.assertEquals(2, 
records.get(1).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(1).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+                        records.get(1).getSourceRecordList().get(1)));
+        Assertions.assertEquals(2, 
records.get(2).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isHeartbeatRecord(records.get(2).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+                        records.get(2).getSourceRecordList().get(1)));
+        Assertions.assertEquals(2, 
records.get(3).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(3).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+                        records.get(3).getSourceRecordList().get(1)));
+        Assertions.assertEquals(3, 
records.get(4).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(4).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(4).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+                        records.get(4).getSourceRecordList().get(2)));
+        Assertions.assertEquals(2, 
records.get(5).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(5).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+                        records.get(5).getSourceRecordList().get(1)));
+        Assertions.assertEquals(4, 
records.get(6).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isHeartbeatRecord(records.get(6).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(6).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isHeartbeatRecord(records.get(6).getSourceRecordList().get(2)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+                        records.get(6).getSourceRecordList().get(3)));
+        Assertions.assertEquals(3, 
records.get(7).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(7).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(7).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+                        records.get(7).getSourceRecordList().get(2)));
+        Assertions.assertEquals(3, 
records.get(8).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isHeartbeatRecord(records.get(8).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(8).getSourceRecordList().get(1)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(
+                        records.get(8).getSourceRecordList().get(2)));
+        Assertions.assertEquals(2, 
records.get(9).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isSchemaChangeEvent(records.get(9).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                WatermarkEvent.isSchemaChangeAfterWatermarkEvent(
+                        records.get(9).getSourceRecordList().get(1)));
+        Assertions.assertEquals(2, 
records.get(10).getSourceRecordList().size());
+        Assertions.assertTrue(
+                
SourceRecordUtils.isDataChangeRecord(records.get(10).getSourceRecordList().get(0)));
+        Assertions.assertTrue(
+                
SourceRecordUtils.isHeartbeatRecord(records.get(10).getSourceRecordList().get(1)));
+    }
+
+    static SourceRecord createSchemaChangeEvent() {
+        Schema keySchema =
+                
SchemaBuilder.struct().name(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME).build();
+        SourceRecord record =
+                new SourceRecord(
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        null,
+                        keySchema,
+                        null,
+                        null,
+                        null);
+        Assertions.assertTrue(SourceRecordUtils.isSchemaChangeEvent(record));
+        return record;
+    }
+
+    static SourceRecord createDataEvent() {
+        Schema valueSchema =
+                SchemaBuilder.struct()
+                        .field(Envelope.FieldName.OPERATION, 
Schema.STRING_SCHEMA)
+                        .build();
+        Struct value = new Struct(valueSchema);
+        value.put(valueSchema.field(Envelope.FieldName.OPERATION), "c");
+        SourceRecord record =
+                new SourceRecord(
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        null,
+                        null,
+                        null,
+                        valueSchema,
+                        value);
+        Assertions.assertTrue(SourceRecordUtils.isDataChangeRecord(record));
+        return record;
+    }
+
+    static SourceRecord createHeartbeatEvent() throws InterruptedException {
+        Heartbeat heartbeat = Heartbeat.create(Duration.ofNanos(1), "test", 
"test");
+        AtomicReference<SourceRecord> eventRef = new AtomicReference<>();
+        heartbeat.forcedBeat(
+                Collections.singletonMap("heartbeat", "heartbeat"),
+                Collections.singletonMap("heartbeat", "heartbeat"),
+                sourceRecord -> eventRef.set(sourceRecord));
+        return eventRef.get();
+    }
+
+    static IncrementalSourceStreamFetcher createFetcher() {
+        SchemaChangeResolver schemaChangeResolver = 
mock(SchemaChangeResolver.class);
+        when(schemaChangeResolver.support(any())).thenReturn(true);
+        IncrementalSourceStreamFetcher fetcher =
+                new IncrementalSourceStreamFetcher(null, 0, 
schemaChangeResolver);
+        IncrementalSourceStreamFetcher spy = spy(fetcher);
+        doReturn(true).when(spy).shouldEmit(any());
+        return spy;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitStateTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitStateTest.java
new file mode 100644
index 0000000000..4a0b40852a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitStateTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.split.state;
+
+import 
org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
+import 
org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
+import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+import lombok.AllArgsConstructor;
+import lombok.ToString;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class IncrementalSplitStateTest {
+
+    @Test
+    public void testMarkEnterPureIncrementPhaseIfNeed() {
+        Offset startupOffset = new TestOffset(100);
+        List<CompletedSnapshotSplitInfo> snapshotSplits = 
Collections.emptyList();
+        IncrementalSplit split = createIncrementalSplit(startupOffset, 
snapshotSplits);
+        IncrementalSplitState splitState = new IncrementalSplitState(split);
+        Assertions.assertNull(splitState.getMaxSnapshotSplitsHighWatermark());
+        Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+        
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(null));
+
+        startupOffset = new TestOffset(100);
+        snapshotSplits =
+                Stream.of(
+                                createCompletedSnapshotSplitInfo(
+                                        "test1", new TestOffset(100), new 
TestOffset(100)),
+                                createCompletedSnapshotSplitInfo(
+                                        "test2", new TestOffset(100), new 
TestOffset(100)))
+                        .collect(Collectors.toList());
+        split = createIncrementalSplit(startupOffset, snapshotSplits);
+        splitState = new IncrementalSplitState(split);
+        Assertions.assertEquals(startupOffset, 
splitState.getMaxSnapshotSplitsHighWatermark());
+        Assertions.assertFalse(splitState.isEnterPureIncrementPhase());
+        
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new 
TestOffset(99)));
+        Assertions.assertFalse(splitState.isEnterPureIncrementPhase());
+        Assertions.assertFalse(snapshotSplits.isEmpty());
+        Assertions.assertTrue(splitState.markEnterPureIncrementPhaseIfNeed(new 
TestOffset(100)));
+        Assertions.assertTrue(snapshotSplits.isEmpty());
+        
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new 
TestOffset(100)));
+        
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new 
TestOffset(101)));
+
+        startupOffset = new TestOffset(100);
+        snapshotSplits =
+                Stream.of(
+                                createCompletedSnapshotSplitInfo(
+                                        "test1", new TestOffset(1), new 
TestOffset(50)),
+                                createCompletedSnapshotSplitInfo(
+                                        "test2", new TestOffset(50), new 
TestOffset(200)))
+                        .collect(Collectors.toList());
+        split = createIncrementalSplit(startupOffset, snapshotSplits);
+        splitState = new IncrementalSplitState(split);
+        Assertions.assertEquals(
+                new TestOffset(200), 
splitState.getMaxSnapshotSplitsHighWatermark());
+        Assertions.assertFalse(splitState.isEnterPureIncrementPhase());
+        Assertions.assertTrue(splitState.markEnterPureIncrementPhaseIfNeed(new 
TestOffset(201)));
+        Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+        Assertions.assertTrue(snapshotSplits.isEmpty());
+        
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new 
TestOffset(200)));
+        Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+        
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new 
TestOffset(201)));
+        
Assertions.assertFalse(splitState.markEnterPureIncrementPhaseIfNeed(new 
TestOffset(202)));
+    }
+
+    @Test
+    public void testAutoEnterPureIncrementPhaseIfAllowed() {
+        Offset startupOffset = new TestOffset(100);
+        List<CompletedSnapshotSplitInfo> snapshotSplits = 
Collections.emptyList();
+        IncrementalSplit split = createIncrementalSplit(startupOffset, 
snapshotSplits);
+        IncrementalSplitState splitState = new IncrementalSplitState(split);
+        Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+        
Assertions.assertFalse(splitState.autoEnterPureIncrementPhaseIfAllowed());
+
+        startupOffset = new TestOffset(100);
+        snapshotSplits =
+                Stream.of(
+                                createCompletedSnapshotSplitInfo(
+                                        "test1", new TestOffset(100), new 
TestOffset(100)),
+                                createCompletedSnapshotSplitInfo(
+                                        "test2", new TestOffset(100), new 
TestOffset(100)))
+                        .collect(Collectors.toList());
+        split = createIncrementalSplit(startupOffset, snapshotSplits);
+        splitState = new IncrementalSplitState(split);
+
+        Assertions.assertFalse(splitState.isEnterPureIncrementPhase());
+        
Assertions.assertTrue(splitState.autoEnterPureIncrementPhaseIfAllowed());
+        Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+        
Assertions.assertFalse(splitState.autoEnterPureIncrementPhaseIfAllowed());
+        Assertions.assertTrue(splitState.isEnterPureIncrementPhase());
+
+        startupOffset = new TestOffset(100);
+        snapshotSplits =
+                Stream.of(
+                                createCompletedSnapshotSplitInfo(
+                                        "test1", new TestOffset(100), new 
TestOffset(100)),
+                                createCompletedSnapshotSplitInfo(
+                                        "test2", new TestOffset(100), new 
TestOffset(101)))
+                        .collect(Collectors.toList());
+        split = createIncrementalSplit(startupOffset, snapshotSplits);
+        splitState = new IncrementalSplitState(split);
+        Assertions.assertFalse(splitState.isEnterPureIncrementPhase());
+        
Assertions.assertFalse(splitState.autoEnterPureIncrementPhaseIfAllowed());
+    }
+
+    private static IncrementalSplit createIncrementalSplit(
+            Offset startupOffset, List<CompletedSnapshotSplitInfo> 
snapshotSplits) {
+        return new IncrementalSplit(
+                "test",
+                Arrays.asList(new TableId("db", "schema", "table")),
+                startupOffset,
+                null,
+                snapshotSplits,
+                null);
+    }
+
+    private static CompletedSnapshotSplitInfo createCompletedSnapshotSplitInfo(
+            String splitId, Offset lowWatermark, Offset highWatermark) {
+        return new CompletedSnapshotSplitInfo(
+                splitId,
+                new TableId("db", "schema", "table"),
+                null,
+                null,
+                null,
+                new SnapshotSplitWatermark(null, lowWatermark, highWatermark));
+    }
+
+    @ToString
+    @AllArgsConstructor
+    static class TestOffset extends Offset {
+        private int offset;
+
+        @Override
+        public int compareTo(Offset o) {
+            return Integer.compare(offset, ((TestOffset) o).offset);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return o instanceof TestOffset && offset == ((TestOffset) 
o).offset;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
index 4ce4d17a02..0e9e936865 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java
@@ -241,6 +241,7 @@ public class LogMinerStreamingChangeEventSource
                             // log before proceeding.
                             if (archiveLogOnlyMode && startScn.equals(endScn)) 
{
                                 pauseBetweenMiningSessions();
+                                
dispatcher.dispatchHeartbeatEvent(offsetContext);
                                 continue;
                             }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java
index 3053598c66..05eba0d96f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java
@@ -181,6 +181,7 @@ public class SqlServerStreamingChangeEventSource
                         && shouldIncreaseFromLsn) {
                     LOGGER.debug("No change in the database");
                     metronome.pause();
+                    dispatcher.dispatchHeartbeatEvent(offsetContext);
                     continue;
                 }
 
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
index 668ed78dca..29dd2ff6f5 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
@@ -62,7 +63,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
     protected final SourceReader.Context context;
 
     private RecordsWithSplitIds<E> currentFetch;
-    private SplitContext<T, SplitStateT> currentSplitContext;
+    protected SplitContext<T, SplitStateT> currentSplitContext;
     private Collector<T> currentSplitOutput;
     private boolean noMoreSplitsAssignment;
 
@@ -234,9 +235,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
     protected abstract SplitT toSplitType(String splitId, SplitStateT 
splitState);
 
     @RequiredArgsConstructor
-    private static final class SplitContext<T, SplitStateT> {
+    protected static final class SplitContext<T, SplitStateT> {
         final String splitId;
-        final SplitStateT state;
+        @Getter final SplitStateT state;
         Collector<T> splitOutput;
 
         Collector<T> getOrCreateSplitOutput(Collector<T> output) {

Reply via email to