This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9b1821f47 [feature][connector][cdc] add CDC enumerator base classes 
(#3419)
9b1821f47 is described below

commit 9b1821f476e69b77ef958204afcc528c633d5738
Author: Zongwen Li <[email protected]>
AuthorDate: Mon Nov 14 14:03:10 2022 +0800

    [feature][connector][cdc] add CDC enumerator base classes (#3419)
    
    * [feature][connector][cdc] add CDC enumerator base classes
    
    * [chore] stream split replace to incremental split
---
 .../cdc/base/config/BaseSourceConfig.java          |  31 +--
 .../connectors/cdc/base/config/SourceConfig.java   |   6 +-
 .../connectors/cdc/base/config/StartupConfig.java  |  53 +++++
 .../connectors/cdc/base/config/StopConfig.java     |  54 +++++
 .../cdc/base/dialect/DataSourceDialect.java        |  23 +-
 .../connectors/cdc/base/option/SourceOptions.java  |  81 +++++--
 .../cdc/base/source/IncrementalSource.java         | 147 ++++++++----
 .../source/enumerator/HybridSplitAssigner.java     | 155 ++++++++++++
 .../enumerator/IncrementalSourceEnumerator.java    | 164 +++++++++++++
 .../enumerator/IncrementalSplitAssigner.java       | 219 +++++++++++++++++
 .../source/enumerator/SnapshotSplitAssigner.java   | 264 +++++++++++++++++++++
 .../cdc/base/source/enumerator/SplitAssigner.java  | 121 ++++++++++
 .../source/enumerator/splitter/ChunkRange.java     |  61 +++++
 .../splitter/ChunkSplitter.java}                   |  18 +-
 .../splitter/JdbcSourceChunkSplitter.java          | 144 +++++++++++
 .../state/HybridPendingSplitsState.java}           |  13 +-
 .../state/IncrementalPhaseState.java}              |  13 +-
 .../state/PendingSplitsState.java}                 |  17 +-
 .../enumerator/state/SnapshotPhaseState.java       |  91 +++++++
 ...t.java => CompletedSnapshotSplitsAckEvent.java} |  13 +-
 ...ava => CompletedSnapshotSplitsReportEvent.java} |   2 +-
 .../cdc/base/source/offset/OffsetFactory.java      |   2 +
 .../source/reader/IncrementalSourceReader.java     |  30 +--
 .../reader/IncrementalSourceRecordEmitter.java     |   6 +-
 .../reader/IncrementalSourceSplitReader.java       |   4 +-
 .../cdc/base/source/reader/external/Fetcher.java   |   4 +-
 .../external/IncrementalSourceStreamFetcher.java   |  24 +-
 .../source/split/CompletedSnapshotSplitInfo.java   |   9 +
 .../split/{LogSplit.java => IncrementalSplit.java} |  15 +-
 .../cdc/base/source/split/SourceSplitBase.java     |  12 +-
 ...gSplitState.java => IncrementalSplitState.java} |  18 +-
 .../source/split/state/SourceSplitStateBase.java   |  12 +-
 32 files changed, 1612 insertions(+), 214 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
index 6fc5f33a9..dfadfdf3e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
@@ -20,7 +20,6 @@ package org.seatunnel.connectors.cdc.base.config;
 import io.debezium.config.Configuration;
 import lombok.Getter;
 import org.seatunnel.connectors.cdc.base.source.IncrementalSource;
-import org.seatunnel.connectors.cdc.base.source.offset.Offset;
 
 import java.util.Properties;
 
@@ -31,10 +30,13 @@ public abstract class BaseSourceConfig implements 
SourceConfig {
 
     private static final long serialVersionUID = 1L;
 
-    protected final Offset startupOffset;
+    @Getter
+    protected final StartupConfig startupConfig;
 
-    protected final Offset stopOffset;
+    @Getter
+    protected final StopConfig stopConfig;
 
+    @Getter
     protected final int splitSize;
 
     @Getter
@@ -48,35 +50,20 @@ public abstract class BaseSourceConfig implements 
SourceConfig {
     protected final Properties dbzProperties;
 
     public BaseSourceConfig(
-        Offset startupOffset,
-        Offset stopOffset,
+        StartupConfig startupConfig,
+        StopConfig stopConfig,
         int splitSize,
         double distributionFactorUpper,
         double distributionFactorLower,
         Properties dbzProperties) {
-        this.startupOffset = startupOffset;
-        this.stopOffset = stopOffset;
+        this.startupConfig = startupConfig;
+        this.stopConfig = stopConfig;
         this.splitSize = splitSize;
         this.distributionFactorUpper = distributionFactorUpper;
         this.distributionFactorLower = distributionFactorLower;
         this.dbzProperties = dbzProperties;
     }
 
-    @Override
-    public Offset getStartupOffset() {
-        return this.startupOffset;
-    }
-
-    @Override
-    public Offset getStopOffset() {
-        return this.stopOffset;
-    }
-
-    @Override
-    public int getSplitSize() {
-        return splitSize;
-    }
-
     public Configuration getDbzConfiguration() {
         return Configuration.from(dbzProperties);
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/SourceConfig.java
index 6d943dac0..96ce851ed 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/SourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/SourceConfig.java
@@ -17,16 +17,14 @@
 
 package org.seatunnel.connectors.cdc.base.config;
 
-import org.seatunnel.connectors.cdc.base.source.offset.Offset;
-
 import java.io.Serializable;
 
 /** The source configuration which offers basic source configuration. */
 public interface SourceConfig extends Serializable {
 
-    Offset getStartupOffset();
+    StartupConfig getStartupConfig();
 
-    Offset getStopOffset();
+    StopConfig getStopConfig();
 
     int getSplitSize();
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StartupConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StartupConfig.java
new file mode 100644
index 000000000..049c963d9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StartupConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.config;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+
+import java.io.Serializable;
+
+@AllArgsConstructor
+@EqualsAndHashCode
+public final class StartupConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
+    @Getter
+    private final StartupMode startupMode;
+    private final String specificOffsetFile;
+    private final Long specificOffsetPos;
+    private final Long timestamp;
+
+    public Offset getStartupOffset(OffsetFactory offsetFactory) {
+        switch (startupMode) {
+            case EARLIEST:
+                return offsetFactory.earliest();
+            case LATEST:
+                return offsetFactory.latest();
+            case INITIAL:
+                return null;
+            case TIMESTAMP:
+                return offsetFactory.timstamp(timestamp);
+            default:
+                throw new IllegalArgumentException(String.format("The %s mode 
is not supported.", startupMode));
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StopConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StopConfig.java
new file mode 100644
index 000000000..f6cd95fc1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StopConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.config;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.seatunnel.connectors.cdc.base.option.StopMode;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+
+import java.io.Serializable;
+
+@AllArgsConstructor
+@EqualsAndHashCode
+public final class StopConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    @Getter
+    private final StopMode stopMode;
+    private final String specificOffsetFile;
+    private final Long specificOffsetPos;
+    private final Long timestamp;
+
+    public Offset getStopOffset(OffsetFactory offsetFactory) {
+        switch (stopMode) {
+            case LATEST:
+                return offsetFactory.latest();
+            case NEVER:
+                return offsetFactory.neverStop();
+            case SPECIFIC:
+                return offsetFactory.specific(specificOffsetFile, 
specificOffsetPos);
+            case TIMESTAMP:
+                return offsetFactory.timstamp(timestamp);
+            default:
+                throw new IllegalArgumentException(String.format("The %s mode 
is not supported.", stopMode));
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
index 0f42a4039..9088fbe3b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
@@ -18,22 +18,19 @@
 package org.seatunnel.connectors.cdc.base.dialect;
 
 import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges;
 import org.seatunnel.connectors.cdc.base.config.SourceConfig;
-import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
 import org.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
 import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 
 import java.io.Serializable;
 import java.util.List;
-import java.util.Map;
 
 /**
  * The dialect of data source.
  *
  * @param <C> The source config of data source.
  */
-
 public interface DataSourceDialect<C extends SourceConfig> extends 
Serializable {
 
     /** Get the name of dialect. */
@@ -42,23 +39,13 @@ public interface DataSourceDialect<C extends SourceConfig> 
extends Serializable
     /** Discovers the list of data collection to capture. */
     List<TableId> discoverDataCollections(C sourceConfig);
 
-    /**
-     * Discovers the captured data collections' schema by {@link SourceConfig}.
-     *
-     * @param sourceConfig a basic source configuration.
-     */
-    Map<TableId, TableChanges.TableChange> discoverDataCollectionSchemas(C 
sourceConfig);
-
-    /**
-     * Displays current offset from the database e.g. query Mysql binary logs 
by query <code>
-     * SHOW MASTER STATUS</code>.
-     */
-    Offset displayCurrentOffset(C sourceConfig);
-
     /** Check if the CollectionId is case-sensitive or not. */
     boolean isDataCollectionIdCaseSensitive(C sourceConfig);
 
-    /** The fetch task used to fetch data of a snapshot split or stream split. 
*/
+    /** Returns the {@link ChunkSplitter} which used to split collection to 
splits. */
+    ChunkSplitter createChunkSplitter(C sourceConfig);
+
+    /** The fetch task used to fetch data of a snapshot split or incremental 
split. */
     FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase 
sourceSplitBase);
 
     /** The task context used for fetch task to fetch data from external 
systems. */
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/SourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/SourceOptions.java
index afd296947..6cf299e28 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -19,39 +19,76 @@ package org.seatunnel.connectors.cdc.base.option;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
 
 @SuppressWarnings("MagicNumber")
 public class SourceOptions {
 
     public static final Option<Integer> SNAPSHOT_SPLIT_SIZE = 
Options.key("snapshot.split.size")
-            .intType()
-            .defaultValue(8096)
-            .withDescription("The split size (number of rows) of table 
snapshot, captured tables are split into multiple splits when read the snapshot 
of table.");
+        .intType()
+        .defaultValue(8096)
+        .withDescription("The split size (number of rows) of table snapshot, 
captured tables are split into multiple splits when read the snapshot of 
table.");
 
     public static final Option<Integer> SNAPSHOT_FETCH_SIZE = 
Options.key("snapshot.fetch.size")
-            .intType()
-            .defaultValue(1024)
-            .withDescription("The maximum fetch size for per poll when read 
table snapshot.");
+        .intType()
+        .defaultValue(1024)
+        .withDescription("The maximum fetch size for per poll when read table 
snapshot.");
 
     public static final Option<StartupMode> STARTUP_MODE = 
Options.key("startup.mode")
-            .enumType(StartupMode.class)
-            .defaultValue(StartupMode.INITIAL)
-            .withDescription("Optional startup mode for CDC source, valid 
enumerations are "
-                    + "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n 
or \"specific\"");
-
-    public static final Option<StopMode> STOP_MODE = Options.key("stop.mode")
-            .enumType(StopMode.class)
-            .defaultValue(StopMode.NEVER)
-            .withDescription("Optional stop mode for CDC source, valid 
enumerations are "
-                    + "\"never\", \"latest\", \"timestamp\"\n or 
\"specific\"");
+        .enumType(StartupMode.class)
+        .defaultValue(StartupMode.INITIAL)
+        .withDescription("Optional startup mode for CDC source, valid 
enumerations are "
+            + "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or 
\"specific\"");
 
     public static final Option<Long> STARTUP_TIMESTAMP = 
Options.key("startup.timestamp")
-            .longType()
-            .noDefaultValue()
-            .withDescription("Optional timestamp(mills) used in case of 
\"timestamp\" startup mode");
+        .longType()
+        .noDefaultValue()
+        .withDescription("Optional timestamp(mills) used in case of 
\"timestamp\" startup mode");
+
+    public static final Option<String> STARTUP_SPECIFIC_OFFSET_FILE = 
Options.key("startup.specific-offset.file")
+        .stringType()
+        .noDefaultValue()
+        .withDescription(
+            "Optional offsets used in case of \"specific\" startup mode");
+
+    public static final Option<Long> STARTUP_SPECIFIC_OFFSET_POS = 
Options.key("startup.specific-offset.pos")
+        .longType()
+        .noDefaultValue()
+        .withDescription(
+            "Optional offsets used in case of \"specific\" startup mode");
+
+    public static final Option<Integer> INCREMENTAL_PARALLELISM = 
Options.key("incremental.parallelism")
+        .intType()
+        .defaultValue(1)
+        .withDescription("The number of parallel readers in the incremental 
phase.");
+
+    public static final Option<StopMode> STOP_MODE = Options.key("stop.mode")
+        .enumType(StopMode.class)
+        .defaultValue(StopMode.NEVER)
+        .withDescription("Optional stop mode for CDC source, valid 
enumerations are "
+            + "\"never\", \"latest\", \"timestamp\"\n or \"specific\"");
 
     public static final Option<Long> STOP_TIMESTAMP = 
Options.key("stop.timestamp")
-            .longType()
-            .noDefaultValue()
-            .withDescription("Optional timestamp(mills) used in case of 
\"timestamp\" stop mode");
+        .longType()
+        .noDefaultValue()
+        .withDescription("Optional timestamp(mills) used in case of 
\"timestamp\" stop mode");
+
+    public static final Option<String> STOP_SPECIFIC_OFFSET_FILE = 
Options.key("stop.specific-offset.file")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("Optional offsets used in case of \"specific\" stop 
mode");
+
+    public static final Option<Long> STOP_SPECIFIC_OFFSET_POS = 
Options.key("stop.specific-offset.pos")
+        .longType()
+        .noDefaultValue()
+        .withDescription("Optional offsets used in case of \"specific\" stop 
mode");
+
+    public static final OptionRule.Builder BASE_RULE = OptionRule.builder()
+        .optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
+        .optional(INCREMENTAL_PARALLELISM)
+        .optional(STARTUP_MODE, STOP_MODE)
+        .conditional(STARTUP_MODE, StartupMode.TIMESTAMP, STARTUP_TIMESTAMP)
+        .conditional(STARTUP_MODE, StartupMode.SPECIFIC, 
STARTUP_SPECIFIC_OFFSET_FILE, STARTUP_SPECIFIC_OFFSET_POS)
+        .conditional(STOP_MODE, StopMode.TIMESTAMP, STOP_TIMESTAMP)
+        .conditional(STOP_MODE, StopMode.SPECIFIC, STOP_SPECIFIC_OFFSET_FILE, 
STOP_SPECIFIC_OFFSET_POS);
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 4804fdd50..163288151 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -22,28 +22,44 @@ import 
org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import io.debezium.relational.TableId;
 import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.seatunnel.connectors.cdc.base.config.StartupConfig;
+import org.seatunnel.connectors.cdc.base.config.StopConfig;
+import org.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
 import org.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.seatunnel.connectors.cdc.base.option.StartupMode;
 import org.seatunnel.connectors.cdc.base.option.StopMode;
-import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.enumerator.HybridSplitAssigner;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner;
+import org.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
 import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 import org.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
 
-import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 
-public abstract class IncrementalSource<T, C extends SourceConfig> implements 
SeaTunnelSource<T, SourceSplit, Serializable> {
+public abstract class IncrementalSource<T, C extends SourceConfig> implements 
SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
 
+    protected SourceConfig.Factory<C> configFactory;
     protected OffsetFactory offsetFactory;
-    protected Offset startupOffset;
 
-    protected Offset stopOffset;
+    protected DataSourceDialect<C> dataSourceDialect;
+    protected StartupConfig startupConfig;
+
+    protected int incrementalParallelism;
+    protected StopConfig stopConfig;
 
     protected StopMode stopMode;
     protected DebeziumDeserializationSchema<T> deserializationSchema;
@@ -52,43 +68,39 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig> implements Se
     public final void prepare(Config pluginConfig) throws PrepareFailException 
{
         ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
 
-        this.offsetFactory = createOffsetFactory();
-        this.startupOffset = getStartupOffset(config);
-        this.stopOffset = getStopOffset(config);
-        this.stopMode = config.get(SourceOptions.STOP_MODE);
+        this.startupConfig = getStartupConfig(config);
+        this.stopConfig = getStopConfig(config);
+        this.stopMode = stopConfig.getStopMode();
+        this.incrementalParallelism = 
config.get(SourceOptions.INCREMENTAL_PARALLELISM);
+        this.configFactory = createSourceConfigFactory(config);
+        this.offsetFactory = createOffsetFactory(config);
+        this.deserializationSchema = 
createDebeziumDeserializationSchema(config);
+        this.dataSourceDialect = createDataSourceDialect(config);
     }
 
-    private Offset getStartupOffset(ReadonlyConfig config) {
-        StartupMode startupMode = config.get(SourceOptions.STARTUP_MODE);
-        switch (startupMode) {
-            case EARLIEST:
-                return offsetFactory.earliest();
-            case LATEST:
-                return offsetFactory.latest();
-            case INITIAL:
-                return null;
-            case TIMESTAMP:
-                return 
offsetFactory.timstamp(config.get(SourceOptions.STOP_TIMESTAMP));
-            default:
-                throw new IllegalArgumentException(String.format("The %s mode 
is not supported.", startupMode));
-        }
+    protected StartupConfig getStartupConfig(ReadonlyConfig config) {
+        return new StartupConfig(
+            config.get(SourceOptions.STARTUP_MODE),
+            config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE),
+            config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_POS),
+            config.get(SourceOptions.STARTUP_TIMESTAMP));
     }
 
-    private Offset getStopOffset(ReadonlyConfig config) {
-        StopMode stopMode = config.get(SourceOptions.STOP_MODE);
-        switch (stopMode) {
-            case LATEST:
-                return offsetFactory.latest();
-            case NEVER:
-                return offsetFactory.neverStop();
-            case TIMESTAMP:
-                return 
offsetFactory.timstamp(config.get(SourceOptions.STOP_TIMESTAMP));
-            default:
-                throw new IllegalArgumentException(String.format("The %s mode 
is not supported.", stopMode));
-        }
+    private StopConfig getStopConfig(ReadonlyConfig config) {
+        return new StopConfig(
+            config.get(SourceOptions.STOP_MODE),
+            config.get(SourceOptions.STOP_SPECIFIC_OFFSET_FILE),
+            config.get(SourceOptions.STOP_SPECIFIC_OFFSET_POS),
+            config.get(SourceOptions.STOP_TIMESTAMP));
     }
 
-    public abstract OffsetFactory createOffsetFactory();
+    public abstract SourceConfig.Factory<C> 
createSourceConfigFactory(ReadonlyConfig config);
+
+    public abstract OffsetFactory createOffsetFactory(ReadonlyConfig config);
+
+    public abstract DebeziumDeserializationSchema<T> 
createDebeziumDeserializationSchema(ReadonlyConfig config);
+
+    public abstract DataSourceDialect<C> 
createDataSourceDialect(ReadonlyConfig config);
 
     @Override
     public Boundedness getBoundedness() {
@@ -101,23 +113,66 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig> implements Se
     }
 
     @Override
-    public SourceReader<T, SourceSplit> createReader(SourceReader.Context 
readerContext) throws Exception {
+    public SourceReader<T, SourceSplitBase> createReader(SourceReader.Context 
readerContext) throws Exception {
         // TODO: https://github.com/apache/incubator-seatunnel/issues/3255
         // https://github.com/apache/incubator-seatunnel/issues/3256
         return null;
     }
 
     @Override
-    public SourceSplitEnumerator<SourceSplit, Serializable> 
createEnumerator(SourceSplitEnumerator.Context<SourceSplit> enumeratorContext) 
throws Exception {
-        // TODO: https://github.com/apache/incubator-seatunnel/issues/3253
-        // https://github.com/apache/incubator-seatunnel/issues/3254
-        return null;
+    public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> 
createEnumerator(SourceSplitEnumerator.Context<SourceSplitBase> 
enumeratorContext) throws Exception {
+        C sourceConfig = configFactory.create(0);
+        final List<TableId> remainingTables = 
dataSourceDialect.discoverDataCollections(sourceConfig);
+        final SplitAssigner splitAssigner;
+        SplitAssigner.Context<C> assignerContext = new 
SplitAssigner.Context<>(sourceConfig, new HashSet<>(remainingTables), new 
HashMap<>(), new HashMap<>());
+        if (sourceConfig.getStartupConfig().getStartupMode() == 
StartupMode.INITIAL) {
+            try {
+
+                boolean isTableIdCaseSensitive =
+                    
dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig);
+                splitAssigner =
+                    new HybridSplitAssigner<>(
+                        assignerContext,
+                        enumeratorContext.currentParallelism(),
+                        incrementalParallelism,
+                        remainingTables,
+                        isTableIdCaseSensitive,
+                        dataSourceDialect,
+                        offsetFactory);
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to discover captured tables 
for enumerator", e);
+            }
+        } else {
+            splitAssigner = new IncrementalSplitAssigner<>(assignerContext, 
incrementalParallelism, offsetFactory);
+        }
+
+        return new IncrementalSourceEnumerator(enumeratorContext, 
splitAssigner);
     }
 
     @Override
-    public SourceSplitEnumerator<SourceSplit, Serializable> 
restoreEnumerator(SourceSplitEnumerator.Context<SourceSplit> enumeratorContext, 
Serializable checkpointState) throws Exception {
-        // TODO: https://github.com/apache/incubator-seatunnel/issues/3253
-        // https://github.com/apache/incubator-seatunnel/issues/3254
-        return null;
+    public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> 
restoreEnumerator(SourceSplitEnumerator.Context<SourceSplitBase> 
enumeratorContext,
+                                                                               
         PendingSplitsState checkpointState) throws Exception {
+        C sourceConfig = configFactory.create(0);
+        final List<TableId> remainingTables = 
dataSourceDialect.discoverDataCollections(sourceConfig);
+        SplitAssigner.Context<C> assignerContext = new 
SplitAssigner.Context<>(sourceConfig, new HashSet<>(remainingTables), new 
HashMap<>(), new HashMap<>());
+        final SplitAssigner splitAssigner;
+        if (checkpointState instanceof HybridPendingSplitsState) {
+            splitAssigner = new HybridSplitAssigner<>(
+                assignerContext,
+                enumeratorContext.currentParallelism(),
+                incrementalParallelism,
+                (HybridPendingSplitsState) checkpointState,
+                dataSourceDialect,
+                offsetFactory);
+        } else if (checkpointState instanceof IncrementalPhaseState) {
+            splitAssigner = new IncrementalSplitAssigner<>(
+                assignerContext,
+                incrementalParallelism,
+                offsetFactory);
+        } else {
+            throw new UnsupportedOperationException(
+                "Unsupported restored PendingSplitsState: " + checkpointState);
+        }
+        return new IncrementalSourceEnumerator(enumeratorContext, 
splitAssigner);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
new file mode 100644
index 000000000..af23b47e8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator;
+
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
+import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Assigner for Hybrid split which contains snapshot splits and incremental 
splits.
+ */
+public class HybridSplitAssigner<C extends SourceConfig> implements 
SplitAssigner {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSplitAssigner.class);
+
+    private final SnapshotSplitAssigner<C> snapshotSplitAssigner;
+
+    private final IncrementalSplitAssigner<C> incrementalSplitAssigner;
+
+    public HybridSplitAssigner(
+        SplitAssigner.Context<C> context,
+        int currentParallelism,
+        int incrementalParallelism,
+        List<TableId> remainingTables,
+        boolean isTableIdCaseSensitive,
+        DataSourceDialect<C> dialect,
+        OffsetFactory offsetFactory) {
+        this(new SnapshotSplitAssigner<>(
+                context,
+                currentParallelism,
+                remainingTables,
+                isTableIdCaseSensitive,
+                dialect),
+            new IncrementalSplitAssigner<>(
+                context,
+                incrementalParallelism,
+                offsetFactory));
+    }
+
+    public HybridSplitAssigner(
+        SplitAssigner.Context<C> context,
+        int currentParallelism,
+        int incrementalParallelism,
+        HybridPendingSplitsState checkpoint,
+        DataSourceDialect<C> dialect,
+        OffsetFactory offsetFactory) {
+        this(
+            new SnapshotSplitAssigner<>(
+                context,
+                currentParallelism,
+                checkpoint.getSnapshotPhaseState(),
+                dialect),
+            new IncrementalSplitAssigner<>(
+                context,
+                incrementalParallelism,
+                offsetFactory));
+    }
+
+    private HybridSplitAssigner(
+        SnapshotSplitAssigner<C> snapshotSplitAssigner,
+        IncrementalSplitAssigner<C> incrementalSplitAssigner) {
+        this.snapshotSplitAssigner = snapshotSplitAssigner;
+        this.incrementalSplitAssigner = incrementalSplitAssigner;
+    }
+
+    @Override
+    public void open() {
+        snapshotSplitAssigner.open();
+    }
+
+    @Override
+    public Optional<SourceSplitBase> getNext() {
+        if (!snapshotSplitAssigner.noMoreSplits()) {
+            // snapshot assigner still have remaining splits, assign split 
from it
+            return snapshotSplitAssigner.getNext();
+        }
+        if (!snapshotSplitAssigner.isCompleted()) {
+            // incremental split is not ready by now
+            return Optional.empty();
+        }
+        // incremental split assigning
+        if (!incrementalSplitAssigner.noMoreSplits()) {
+            // we need to wait snapshot-assigner to be completed before
+            // assigning the incremental split. Otherwise, records emitted 
from incremental split
+            // might be out-of-order in terms of same primary key with 
snapshot splits.
+            return snapshotSplitAssigner.getNext();
+        }
+        // no more splits for the assigner
+        return Optional.empty();
+    }
+
+    @Override
+    public boolean waitingForCompletedSplits() {
+        return snapshotSplitAssigner.waitingForCompletedSplits();
+    }
+
+    @Override
+    public void onCompletedSplits(List<SnapshotSplitWatermark> 
completedSplitWatermarks) {
+        snapshotSplitAssigner.onCompletedSplits(completedSplitWatermarks);
+        incrementalSplitAssigner.onCompletedSplits(completedSplitWatermarks);
+    }
+
+    @Override
+    public void addSplits(Collection<SourceSplitBase> splits) {
+        List<SourceSplitBase> snapshotSplits = new ArrayList<>();
+        List<SourceSplitBase> incrementalSplits = new ArrayList<>();
+        for (SourceSplitBase split : splits) {
+            if (split.isSnapshotSplit()) {
+                snapshotSplits.add(split);
+            } else {
+                incrementalSplits.add(split);
+            }
+        }
+        snapshotSplitAssigner.addSplits(snapshotSplits);
+        incrementalSplitAssigner.addSplits(incrementalSplits);
+    }
+
+    @Override
+    public PendingSplitsState snapshotState(long checkpointId) {
+        return new 
HybridPendingSplitsState(snapshotSplitAssigner.snapshotState(checkpointId), 
incrementalSplitAssigner.snapshotState(checkpointId));
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
+        incrementalSplitAssigner.notifyCheckpointComplete(checkpointId);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
new file mode 100644
index 000000000..3b47bb3f4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
+import 
org.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsAckEvent;
+import 
org.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
+import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * Incremental source enumerator that enumerates receive the split request and 
assign the split to
+ * source readers.
+ */
+
+public class IncrementalSourceEnumerator
+        implements SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalSourceEnumerator.class);
+
+    private final SourceSplitEnumerator.Context<SourceSplitBase> context;
+    private final SplitAssigner splitAssigner;
+
+    /**
+     * using TreeSet to prefer assigning incremental split to task-0 for 
easier debug
+     */
+    private final TreeSet<Integer> readersAwaitingSplit;
+
+    public IncrementalSourceEnumerator(
+            SourceSplitEnumerator.Context<SourceSplitBase> context,
+            SplitAssigner splitAssigner) {
+        this.context = context;
+        this.splitAssigner = splitAssigner;
+        this.readersAwaitingSplit = new TreeSet<>();
+    }
+
+    @Override
+    public void open() {
+        splitAssigner.open();
+    }
+
+    @Override
+    public void run() throws Exception {
+
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        if (!context.registeredReaders().contains(subtaskId)) {
+            // reader failed between sending the request and now. skip this 
request.
+            return;
+        }
+
+        readersAwaitingSplit.add(subtaskId);
+        assignSplits();
+    }
+
+    @Override
+    public void addSplitsBack(List<SourceSplitBase> splits, int subtaskId) {
+        LOG.debug("Incremental Source Enumerator adds splits back: {}", 
splits);
+        splitAssigner.addSplits(splits);
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return 0;
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        // do nothing
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof CompletedSnapshotSplitsReportEvent) {
+            LOG.info(
+                    "The enumerator receives completed split watermarks(log 
offset) {} from subtask {}.",
+                    sourceEvent,
+                    subtaskId);
+            CompletedSnapshotSplitsReportEvent reportEvent =
+                    (CompletedSnapshotSplitsReportEvent) sourceEvent;
+            List<SnapshotSplitWatermark> completedSplitWatermarks = 
reportEvent.getCompletedSnapshotSplitWatermarks();
+            splitAssigner.onCompletedSplits(completedSplitWatermarks);
+
+            // send acknowledge event
+            CompletedSnapshotSplitsAckEvent ackEvent =
+                    new 
CompletedSnapshotSplitsAckEvent(completedSplitWatermarks.stream()
+                        .map(SnapshotSplitWatermark::getSplitId)
+                        .collect(Collectors.toList()));
+            context.sendEventToSourceReader(subtaskId, ackEvent);
+        }
+    }
+
+    @Override
+    public PendingSplitsState snapshotState(long checkpointId) {
+        return splitAssigner.snapshotState(checkpointId);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        splitAssigner.notifyCheckpointComplete(checkpointId);
+        // incremental split may be available after checkpoint complete
+        assignSplits();
+    }
+
+    @Override
+    public void close() {
+        LOG.info("Closing enumerator...");
+        splitAssigner.close();
+    }
+
+    // 
------------------------------------------------------------------------------------------
+
+    private void assignSplits() {
+        final Iterator<Integer> awaitingReader = 
readersAwaitingSplit.iterator();
+
+        while (awaitingReader.hasNext()) {
+            int nextAwaiting = awaitingReader.next();
+            // if the reader that requested another split has failed in the 
meantime, remove
+            // it from the list of waiting readers
+            if (!context.registeredReaders().contains(nextAwaiting)) {
+                awaitingReader.remove();
+                continue;
+            }
+
+            Optional<SourceSplitBase> split = splitAssigner.getNext();
+            if (split.isPresent()) {
+                final SourceSplitBase sourceSplit = split.get();
+                context.assignSplit(nextAwaiting, sourceSplit);
+                awaitingReader.remove();
+                LOG.info("Assign split {} to subtask {}", sourceSplit, 
nextAwaiting);
+            } else {
+                // there is no available splits by now, skip assigning
+                break;
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
new file mode 100644
index 000000000..0863d41ab
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator;
+
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
+import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+import 
org.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Assigner for incremental split.
+ */
+public class IncrementalSplitAssigner<C extends SourceConfig> implements 
SplitAssigner {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalSplitAssigner.class);
+    protected static final String INCREMENTAL_SPLIT_ID = 
"incremental-split-%d";
+
+    private final SplitAssigner.Context<C> context;
+
+    private final int incrementalParallelism;
+
+    private final OffsetFactory offsetFactory;
+
+    /**
+     * Maximum watermark in SnapshotSplits per table.
+     * <br> Used to delete information in completedSnapshotSplitInfos, 
reducing state size.
+     * <br> Used to support Exactly-Once.
+     */
+    private final Map<TableId, Offset> tableWatermarks = new HashMap<>();
+
+    private boolean splitAssigned = false;
+
+    private final List<IncrementalSplit> remainingSplits = new ArrayList<>();
+
+    private final Map<String, IncrementalSplit> assignedSplits = new 
HashMap<>();
+
+    public IncrementalSplitAssigner(
+        SplitAssigner.Context<C> context,
+        int incrementalParallelism,
+        OffsetFactory offsetFactory) {
+        this.context = context;
+        this.incrementalParallelism = incrementalParallelism;
+        this.offsetFactory = offsetFactory;
+    }
+
+    @Override
+    public void open() {
+    }
+
+    @Override
+    public Optional<SourceSplitBase> getNext() {
+        if (!remainingSplits.isEmpty()) {
+            // return remaining splits firstly
+            Iterator<IncrementalSplit> iterator = remainingSplits.iterator();
+            IncrementalSplit split = iterator.next();
+            iterator.remove();
+            assignedSplits.put(split.splitId(), split);
+            return Optional.of(split);
+        }
+        if (splitAssigned) {
+            return Optional.empty();
+        }
+        List<IncrementalSplit> incrementalSplits = createIncrementalSplits();
+        remainingSplits.addAll(incrementalSplits);
+        splitAssigned = true;
+        return getNext();
+    }
+
+    /**
+     * Indicates there is no more splits available in this assigner.
+     */
+    public boolean noMoreSplits() {
+        return getRemainingTables().isEmpty() && remainingSplits.isEmpty();
+    }
+
+    private Set<TableId> getRemainingTables() {
+        Set<TableId> allTables = new HashSet<>(context.getCapturedTables());
+        assignedSplits.values()
+            .forEach(split -> split.getTableIds().forEach(allTables::remove));
+        return allTables;
+    }
+
+    @Override
+    public boolean waitingForCompletedSplits() {
+        return false;
+    }
+
+    @Override
+    public void onCompletedSplits(List<SnapshotSplitWatermark> 
completedSplitWatermarks) {
+        // do nothing
+    }
+
+    @Override
+    public void addSplits(Collection<SourceSplitBase> splits) {
+        // we don't store the split, but will re-create incremental split later
+        splits.stream()
+            .map(SourceSplitBase::asIncrementalSplit)
+            .forEach(incrementalSplit -> {
+                Offset startupOffset = incrementalSplit.getStartupOffset();
+                List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos = 
incrementalSplit.getCompletedSnapshotSplitInfos();
+                for (CompletedSnapshotSplitInfo info : 
completedSnapshotSplitInfos) {
+                    context.getSplitCompletedOffsets().put(info.getSplitId(), 
info.getWatermark());
+                    context.getAssignedSnapshotSplit().put(info.getSplitId(), 
info.asSnapshotSplit());
+                }
+                for (TableId tableId : incrementalSplit.getTableIds()) {
+                    tableWatermarks.put(tableId, startupOffset);
+                }
+            });
+    }
+
+    @Override
+    public IncrementalPhaseState snapshotState(long checkpointId) {
+        return new IncrementalPhaseState();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        // nothing to do
+    }
+
+    // 
------------------------------------------------------------------------------------------
+
+    public List<IncrementalSplit> createIncrementalSplits() {
+        Set<TableId> allTables = new HashSet<>(context.getCapturedTables());
+        assignedSplits.values()
+            .forEach(split -> split.getTableIds().forEach(allTables::remove));
+        List<TableId>[] capturedTables = new List[incrementalParallelism];
+        int i = 0;
+        for (TableId tableId : allTables) {
+            int index = i % incrementalParallelism;
+            if (capturedTables[index] == null) {
+                capturedTables[index] = new ArrayList<>();
+            }
+            capturedTables[index].add(tableId);
+            i++;
+        }
+        i = 0;
+        List<IncrementalSplit> incrementalSplits = new ArrayList<>();
+        for (List<TableId> capturedTable : capturedTables) {
+            incrementalSplits.add(createIncrementalSplit(capturedTable, i++));
+        }
+        return incrementalSplits;
+    }
+
+    private IncrementalSplit createIncrementalSplit(List<TableId> 
capturedTables, int index) {
+        final List<SnapshotSplit> assignedSnapshotSplit =
+            context.getAssignedSnapshotSplit().values().stream()
+                .filter(split -> capturedTables.contains(split.getTableId()))
+                .sorted(Comparator.comparing(SourceSplitBase::splitId))
+                .collect(Collectors.toList());
+
+        Map<String, Offset> splitCompletedOffsets = 
context.getSplitCompletedOffsets();
+        final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos = 
new ArrayList<>();
+        Offset minOffset = null;
+        for (SnapshotSplit split : assignedSnapshotSplit) {
+            // find the min offset of change log
+            Offset changeLogOffset = 
splitCompletedOffsets.get(split.splitId());
+            if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
+                minOffset = changeLogOffset;
+            }
+            completedSnapshotSplitInfos.add(
+                new CompletedSnapshotSplitInfo(
+                    split.splitId(),
+                    split.getTableId(),
+                    split.getSplitKeyType(),
+                    split.getSplitStart(),
+                    split.getSplitEnd(),
+                    changeLogOffset));
+        }
+        for (TableId tableId : capturedTables) {
+            Offset watermark = tableWatermarks.get(tableId);
+            if (minOffset == null || watermark.isBefore(minOffset)) {
+                minOffset = watermark;
+            }
+        }
+        C sourceConfig = context.getSourceConfig();
+        return new IncrementalSplit(
+            String.format(INCREMENTAL_SPLIT_ID, index),
+            capturedTables,
+            minOffset != null ? minOffset : 
sourceConfig.getStartupConfig().getStartupOffset(offsetFactory),
+            sourceConfig.getStopConfig().getStopOffset(offsetFactory),
+            completedSnapshotSplitInfos);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
new file mode 100644
index 000000000..5e8a62a5c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator;
+
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
+import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Assigner for snapshot split.
+ */
+public class SnapshotSplitAssigner<C extends SourceConfig> implements 
SplitAssigner {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotSplitAssigner.class);
+
+    private final SplitAssigner.Context<C> context;
+
+    private final C sourceConfig;
+    private final List<TableId> alreadyProcessedTables;
+    private final List<SnapshotSplit> remainingSplits;
+    private final Map<String, SnapshotSplit> assignedSplits;
+    private final Map<String, Offset> splitCompletedOffsets;
+    private boolean assignerCompleted;
+    private final int currentParallelism;
+    private final LinkedList<TableId> remainingTables;
+    private final boolean isRemainingTablesCheckpointed;
+
+    private ChunkSplitter chunkSplitter;
+    private boolean isTableIdCaseSensitive;
+
+    private Long checkpointIdToFinish;
+    private final DataSourceDialect<C> dialect;
+
+    SnapshotSplitAssigner(
+        SplitAssigner.Context<C> context,
+        int currentParallelism,
+        List<TableId> remainingTables,
+        boolean isTableIdCaseSensitive,
+        DataSourceDialect<C> dialect) {
+        this(
+            context,
+            currentParallelism,
+            new ArrayList<>(),
+            new ArrayList<>(),
+            new HashMap<>(),
+            new HashMap<>(),
+            false,
+            remainingTables,
+            isTableIdCaseSensitive,
+            true,
+            dialect);
+    }
+
+    SnapshotSplitAssigner(
+        SplitAssigner.Context<C> context,
+        int currentParallelism,
+        SnapshotPhaseState checkpoint,
+        DataSourceDialect<C> dialect) {
+        this(
+            context,
+            currentParallelism,
+            checkpoint.getAlreadyProcessedTables(),
+            checkpoint.getRemainingSplits(),
+            checkpoint.getAssignedSplits(),
+            checkpoint.getSplitCompletedOffsets(),
+            checkpoint.isAssignerCompleted(),
+            checkpoint.getRemainingTables(),
+            checkpoint.isTableIdCaseSensitive(),
+            checkpoint.isRemainingTablesCheckpointed(),
+            dialect);
+    }
+
+    private SnapshotSplitAssigner(
+        SplitAssigner.Context<C> context,
+        int currentParallelism,
+        List<TableId> alreadyProcessedTables,
+        List<SnapshotSplit> remainingSplits,
+        Map<String, SnapshotSplit> assignedSplits,
+        Map<String, Offset> splitCompletedOffsets,
+        boolean assignerCompleted,
+        List<TableId> remainingTables,
+        boolean isTableIdCaseSensitive,
+        boolean isRemainingTablesCheckpointed,
+        DataSourceDialect<C> dialect) {
+        this.context = context;
+        this.sourceConfig = context.getSourceConfig();
+        this.currentParallelism = currentParallelism;
+        this.alreadyProcessedTables = alreadyProcessedTables;
+        this.remainingSplits = remainingSplits;
+        this.assignedSplits = assignedSplits;
+        this.splitCompletedOffsets = splitCompletedOffsets;
+        this.assignerCompleted = assignerCompleted;
+        this.remainingTables = new LinkedList<>(remainingTables);
+        this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
+        this.isTableIdCaseSensitive = isTableIdCaseSensitive;
+        this.dialect = dialect;
+    }
+
+    @Override
+    public void open() {
+        chunkSplitter = dialect.createChunkSplitter(sourceConfig);
+
+        // the legacy state didn't snapshot remaining tables, discovery 
remaining table here
+        if (!isRemainingTablesCheckpointed && !assignerCompleted) {
+            try {
+                final List<TableId> discoverTables = 
dialect.discoverDataCollections(sourceConfig);
+                context.getCapturedTables().addAll(discoverTables);
+                discoverTables.removeAll(alreadyProcessedTables);
+                this.remainingTables.addAll(discoverTables);
+                this.isTableIdCaseSensitive = 
dialect.isDataCollectionIdCaseSensitive(sourceConfig);
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to discover remaining 
tables to capture", e);
+            }
+        }
+    }
+
+    @Override
+    public Optional<SourceSplitBase> getNext() {
+        if (!remainingSplits.isEmpty()) {
+            // return remaining splits firstly
+            Iterator<SnapshotSplit> iterator = remainingSplits.iterator();
+            SnapshotSplit split = iterator.next();
+            iterator.remove();
+            assignedSplits.put(split.splitId(), split);
+            return Optional.of(split);
+        } else {
+            // it's turn for new table
+            TableId nextTable = remainingTables.pollFirst();
+            if (nextTable != null) {
+                // split the given table into chunks (snapshot splits)
+                Collection<SnapshotSplit> splits = 
chunkSplitter.generateSplits(nextTable);
+                remainingSplits.addAll(splits);
+                alreadyProcessedTables.add(nextTable);
+                return getNext();
+            } else {
+                return Optional.empty();
+            }
+        }
+    }
+
+    @Override
+    public boolean waitingForCompletedSplits() {
+        return !allSplitsCompleted();
+    }
+
+    @Override
+    public void onCompletedSplits(List<SnapshotSplitWatermark> 
completedSplitWatermarks) {
+        completedSplitWatermarks.forEach(m -> 
this.splitCompletedOffsets.put(m.getSplitId(), m.getHighWatermark()));
+        if (allSplitsCompleted()) {
+            // Skip the waiting checkpoint when current parallelism is 1 which 
means we do not need
+            // to care about the global output data order of snapshot splits 
and incremental split.
+            if (currentParallelism == 1) {
+                assignerCompleted = true;
+                LOG.info("Snapshot split assigner received all splits 
completed and the job parallelism is 1, snapshot split assigner is turn into 
completed status.");
+            } else {
+                LOG.info("Snapshot split assigner received all splits 
completed, waiting for a complete checkpoint to mark the assigner completed.");
+            }
+        }
+    }
+
+    @Override
+    public void addSplits(Collection<SourceSplitBase> splits) {
+        for (SourceSplitBase split : splits) {
+            remainingSplits.add(split.asSnapshotSplit());
+            // we should remove the add-backed splits from the assigned list, 
because they are failed
+            assignedSplits.remove(split.splitId());
+            splitCompletedOffsets.remove(split.splitId());
+        }
+    }
+
+    @Override
+    public SnapshotPhaseState snapshotState(long checkpointId) {
+        SnapshotPhaseState state =
+            new SnapshotPhaseState(
+                alreadyProcessedTables,
+                remainingSplits,
+                assignedSplits,
+                splitCompletedOffsets,
+                assignerCompleted,
+                remainingTables,
+                isTableIdCaseSensitive,
+                true);
+        // we need a complete checkpoint before mark this assigner to be 
completed, to wait for all
+        // records of snapshot splits are completely processed
+        if (checkpointIdToFinish == null && !assignerCompleted && 
allSplitsCompleted()) {
+            checkpointIdToFinish = checkpointId;
+        }
+        return state;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        // we have waited for at-least one complete checkpoint after all 
snapshot-splits are
+        // completed, then we can mark snapshot assigner as completed.
+        if (checkpointIdToFinish != null && !assignerCompleted && 
allSplitsCompleted()) {
+            assignerCompleted = checkpointId >= checkpointIdToFinish;
+            LOG.info("Snapshot split assigner is turn into completed status.");
+        }
+    }
+
+    /**
+     * Indicates there is no more splits available in this assigner.
+     */
+    public boolean noMoreSplits() {
+        return remainingTables.isEmpty() && remainingSplits.isEmpty();
+    }
+
+    /**
+     * Returns whether the snapshot split assigner is completed, which 
indicates there is no more
+     * splits and all records of splits have been completely processed in the 
pipeline.
+     */
+    public boolean isCompleted() {
+        return assignerCompleted;
+    }
+
+    public Map<String, SnapshotSplit> getAssignedSplits() {
+        return assignedSplits;
+    }
+
+    public Map<String, Offset> getSplitCompletedOffsets() {
+        return splitCompletedOffsets;
+    }
+
+    // 
-------------------------------------------------------------------------------------------
+
+    /**
+     * Returns whether all splits are completed which means no more splits and 
all assigned splits
+     * are completed.
+     */
+    private boolean allSplitsCompleted() {
+        return noMoreSplits() && assignedSplits.size() == 
splitCompletedOffsets.size();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SplitAssigner.java
new file mode 100644
index 000000000..86a375486
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SplitAssigner.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator;
+
+import org.apache.seatunnel.api.state.CheckpointListener;
+
+import io.debezium.relational.TableId;
+import lombok.Data;
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
+import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The {@code SplitAssigner} is responsible for deciding what split should be 
processed. It
+ * determines split processing order.
+ */
+public interface SplitAssigner {
+
+    /**
+     * Called to open the assigner to acquire any resources, like threads or 
network connections.
+     */
+    void open();
+
+    /**
+     * Gets the next split.
+     *
+     * <p>When this method returns an empty {@code Optional}, then the set of 
splits is assumed to
+     * be done and the source will finish once the readers completed their 
current splits.
+     */
+    Optional<SourceSplitBase> getNext();
+
+    /**
+     * Whether the split assigner is still waiting for callback of completed 
splits, i.e. {@link #onCompletedSplits}.
+     */
+    boolean waitingForCompletedSplits();
+
+    /**
+     * Callback to handle the completed splits with completed change log 
offset. This is useful for
+     * determine when to generate incremental split and what incremental split 
to generate.
+     */
+    void onCompletedSplits(List<SnapshotSplitWatermark> 
completedSplitWatermarks);
+
+    /**
+     * Adds a set of splits to this assigner. This happens for example when 
some split processing
+     * failed and the splits need to be re-added.
+     */
+    void addSplits(Collection<SourceSplitBase> splits);
+
+    /**
+     * Creates a snapshot of the state of this split assigner, to be stored in 
a checkpoint.
+     *
+     * <p>The snapshot should contain the latest state of the assigner: It 
should assume that all
+     * operations that happened before the snapshot have successfully 
completed. For example all
+     * splits assigned to readers via {@link #getNext()} don't need to be 
included in the snapshot
+     * anymore.
+     *
+     * <p>This method takes the ID of the checkpoint for which the state is 
snapshotted. Most
+     * implementations should be able to ignore this parameter, because for 
the contents of the
+     * snapshot, it doesn't matter for which checkpoint it gets created. This 
parameter can be
+     * interesting for source connectors with external systems where those 
systems are themselves
+     * aware of checkpoints; for example in cases where the enumerator 
notifies that system about a
+     * specific checkpoint being triggered.
+     *
+     * @param checkpointId The ID of the checkpoint for which the snapshot is 
created.
+     * @return an object containing the state of the split enumerator.
+     */
+    PendingSplitsState snapshotState(long checkpointId);
+
+    /**
+     * Notifies the listener that the checkpoint with the given {@code 
checkpointId} completed and
+     * was committed.
+     *
+     * @see CheckpointListener#notifyCheckpointComplete(long)
+     */
+    void notifyCheckpointComplete(long checkpointId);
+
+    /**
+     * Called to close the assigner, in case it holds on to any resources, 
like threads or network
+     * connections.
+     */
+    default void close() {
+    }
+
+    @Data
+    final class Context<C extends SourceConfig> {
+        private final C sourceConfig;
+
+        private final Set<TableId> capturedTables;
+
+        private final Map<String, SnapshotSplit> assignedSnapshotSplit;
+
+        /**
+         * key: SnapshotSplit id
+         */
+        private final Map<String, Offset> splitCompletedOffsets;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkRange.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkRange.java
new file mode 100644
index 000000000..673921532
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkRange.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator.splitter;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import java.util.Objects;
+
+/**
+ * An internal structure describes a chunk range with a chunk start 
(inclusive) and chunk end
+ * (exclusive). Note that {@code null} represents unbounded chunk start/end.
+ */
+@Getter
+@EqualsAndHashCode
+public class ChunkRange {
+    private final Object chunkStart;
+    private final Object chunkEnd;
+
+    /**
+     * Returns a {@link ChunkRange} which represents a full table scan with 
unbounded chunk start
+     * and chunk end.
+     */
+    public static ChunkRange all() {
+        return new ChunkRange(null, null);
+    }
+
+    /** Returns a {@link ChunkRange} with the given chunk start and chunk end. 
*/
+    public static ChunkRange of(Object chunkStart, Object chunkEnd) {
+        return new ChunkRange(chunkStart, chunkEnd);
+    }
+
+    private ChunkRange(Object chunkStart, Object chunkEnd) {
+        if (chunkStart != null || chunkEnd != null) {
+            checkArgument(
+                    !Objects.equals(chunkStart, chunkEnd),
+                    "Chunk start %s shouldn't be equal to chunk end %s",
+                    chunkStart,
+                    chunkEnd);
+        }
+        this.chunkStart = chunkStart;
+        this.chunkEnd = chunkEnd;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkSplitter.java
similarity index 63%
copy from 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
copy to 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkSplitter.java
index b073ccaf9..b02a891cf 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkSplitter.java
@@ -15,16 +15,18 @@
  * limitations under the License.
  */
 
-package org.seatunnel.connectors.cdc.base.source.event;
+package org.seatunnel.connectors.cdc.base.source.enumerator.splitter;
 
-import org.apache.seatunnel.api.source.SourceEvent;
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 
-import lombok.Data;
+import java.util.Collection;
 
-import java.util.List;
+/** The splitter used to split collection into a set of chunks. */
+public interface ChunkSplitter {
 
-@Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
-    private static final long serialVersionUID = 1L;
-    List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
+    /**
+     * Generates all snapshot splits (chunks) for the give data collection.
+     */
+    Collection<SnapshotSplit> generateSplits(TableId tableId);
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
new file mode 100644
index 000000000..ad34a91b8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator.splitter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+
+import java.sql.SQLException;
+import java.util.Collection;
+
+/**
+ * The {@code ChunkSplitter} used to split table into a set of chunks for JDBC 
data source.
+ */
+public interface JdbcSourceChunkSplitter extends ChunkSplitter {
+
+    /**
+     * Generates all snapshot splits (chunks) for the give table path.
+     */
+    @Override
+    Collection<SnapshotSplit> generateSplits(TableId tableId);
+
+    /**
+     * Query the maximum and minimum value of the column in the table. e.g. 
query string <code>
+     * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+     *
+     * @param jdbc       JDBC connection.
+     * @param tableId    table identity.
+     * @param columnName column name.
+     * @return maximum and minimum value.
+     */
+    Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String 
columnName) throws SQLException;
+
+    /**
+     * Query the minimum value of the column in the table, and the minimum 
value must greater than
+     * the excludedLowerBound value. e.g. prepare query string <code>
+     * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+     *
+     * @param jdbc               JDBC connection.
+     * @param tableId            table identity.
+     * @param columnName         column name.
+     * @param excludedLowerBound the minimum value should be greater than this 
value.
+     * @return minimum value.
+     */
+    Object queryMin(JdbcConnection jdbc, TableId tableId, String columnName, 
Object excludedLowerBound)
+        throws SQLException;
+
+    /**
+     * Query the maximum value of the next chunk, and the next chunk must be 
greater than or equal
+     * to <code>includedLowerBound</code> value [min_1, max_1), [min_2, 
max_2),... [min_n, null).
+     * Each time this method is called it will return max1, max2...
+     *
+     * @param jdbc               JDBC connection.
+     * @param tableId            table identity.
+     * @param columnName         column name.
+     * @param chunkSize          chunk size.
+     * @param includedLowerBound the previous chunk end value.
+     * @return next chunk end value.
+     */
+    Object queryNextChunkMax(
+        JdbcConnection jdbc,
+        TableId tableId,
+        String columnName,
+        int chunkSize,
+        Object includedLowerBound)
+        throws SQLException;
+
+    /**
+     * Approximate total number of entries in the lookup table.
+     *
+     * @param jdbc    JDBC connection.
+     * @param tableId table identity.
+     * @return approximate row count.
+     */
+    Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws 
SQLException;
+
+    /**
+     * Build the scan query sql of the {@link SnapshotSplit}.
+     *
+     * @param tableId      table identity.
+     * @param splitKeyType primary key type.
+     * @param isFirstSplit whether the first split.
+     * @param isLastSplit  whether the last split.
+     * @return query sql.
+     */
+    String buildSplitScanQuery(TableId tableId, SeaTunnelRowType splitKeyType, 
boolean isFirstSplit, boolean isLastSplit);
+
+    /**
+     * Checks whether split column is evenly distributed across its range.
+     *
+     * @param splitColumn split column.
+     * @return true that means split column with type BIGINT, INT, DECIMAL.
+     */
+    default boolean isEvenlySplitColumn(Column splitColumn) {
+        // currently, we only support these types.
+        switch (fromDbzColumn(splitColumn).getSqlType()) {
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+            case BIGINT:
+            case DECIMAL:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    /**
+     * Get a corresponding SeaTunnel data type from a debezium {@link Column}.
+     *
+     * @param splitColumn dbz split column.
+     * @return SeaTunnel data type
+     */
+    SeaTunnelDataType<?> fromDbzColumn(Column splitColumn);
+
+    /**
+     * convert dbz column to SeaTunnel row type.
+     *
+     * @param splitColumn split column.
+     * @return SeaTunnel row type.
+     */
+    default SeaTunnelRowType getSplitType(Column splitColumn) {
+        return new SeaTunnelRowType(new String[]{splitColumn.name()}, new 
SeaTunnelDataType[]{fromDbzColumn(splitColumn)});
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/HybridPendingSplitsState.java
similarity index 70%
copy from 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
copy to 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/HybridPendingSplitsState.java
index b073ccaf9..6c72665a5 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/HybridPendingSplitsState.java
@@ -15,16 +15,13 @@
  * limitations under the License.
  */
 
-package org.seatunnel.connectors.cdc.base.source.event;
-
-import org.apache.seatunnel.api.source.SourceEvent;
+package org.seatunnel.connectors.cdc.base.source.enumerator.state;
 
 import lombok.Data;
 
-import java.util.List;
-
+/** A {@link PendingSplitsState} for pending hybrid (snapshot & incremental) 
splits. */
 @Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
-    private static final long serialVersionUID = 1L;
-    List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
+public class HybridPendingSplitsState implements PendingSplitsState {
+    private final SnapshotPhaseState snapshotPhaseState;
+    private final IncrementalPhaseState incrementalPhaseState;
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/IncrementalPhaseState.java
similarity index 71%
copy from 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
copy to 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/IncrementalPhaseState.java
index b073ccaf9..4b46ef41d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/IncrementalPhaseState.java
@@ -15,16 +15,13 @@
  * limitations under the License.
  */
 
-package org.seatunnel.connectors.cdc.base.source.event;
-
-import org.apache.seatunnel.api.source.SourceEvent;
+package org.seatunnel.connectors.cdc.base.source.enumerator.state;
 
 import lombok.Data;
 
-import java.util.List;
-
+/**
+ * A {@link PendingSplitsState} for pending incremental splits.
+ */
 @Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
-    private static final long serialVersionUID = 1L;
-    List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
+public class IncrementalPhaseState implements PendingSplitsState {
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/PendingSplitsState.java
similarity index 69%
copy from 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
copy to 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/PendingSplitsState.java
index b073ccaf9..487055745 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/PendingSplitsState.java
@@ -15,16 +15,13 @@
  * limitations under the License.
  */
 
-package org.seatunnel.connectors.cdc.base.source.event;
+package org.seatunnel.connectors.cdc.base.source.enumerator.state;
 
-import org.apache.seatunnel.api.source.SourceEvent;
+import java.io.Serializable;
 
-import lombok.Data;
-
-import java.util.List;
-
-@Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
-    private static final long serialVersionUID = 1L;
-    List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
+/**
+ * A checkpoint of the current state of the containing the currently pending 
splits that are not yet
+ * assigned.
+ */
+public interface PendingSplitsState extends Serializable {
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/SnapshotPhaseState.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/SnapshotPhaseState.java
new file mode 100644
index 000000000..ab8b65738
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/SnapshotPhaseState.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator.state;
+
+import io.debezium.relational.TableId;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+
+import java.util.List;
+import java.util.Map;
+
+/** A {@link PendingSplitsState} for pending snapshot splits. */
+@Getter
+@ToString
+@EqualsAndHashCode
+public class SnapshotPhaseState implements PendingSplitsState {
+
+    /** The tables in the checkpoint. */
+    private final List<TableId> remainingTables;
+
+    /**
+     * The paths that are no longer in the enumerator checkpoint, but have 
been processed before and
+     * should this be ignored. Relevant only for sources in continuous 
monitoring mode.
+     */
+    private final List<TableId> alreadyProcessedTables;
+
+    /** The splits in the checkpoint. */
+    private final List<SnapshotSplit> remainingSplits;
+
+    /**
+     * The snapshot splits that the {@link IncrementalSourceEnumerator} has 
assigned to {@link
+     * IncrementalSourceSplitReader}s.
+     */
+    private final Map<String, SnapshotSplit> assignedSplits;
+
+    /**
+     * The offsets of completed (snapshot) splits that the {@link 
IncrementalSourceEnumerator} has
+     * received from {@link IncrementalSourceSplitReader}s.
+     */
+    private final Map<String, Offset> splitCompletedOffsets;
+
+    /**
+     * Whether the snapshot split assigner is completed, which indicates there 
is no more splits and
+     * all records of splits have been completely processed in the pipeline.
+     */
+    private final boolean isAssignerCompleted;
+
+    /** Whether the table identifier is case sensitive. */
+    private final boolean isTableIdCaseSensitive;
+
+    /** Whether the remaining tables are keep when snapshot state. */
+    private final boolean isRemainingTablesCheckpointed;
+
+    public SnapshotPhaseState(
+            List<TableId> alreadyProcessedTables,
+            List<SnapshotSplit> remainingSplits,
+            Map<String, SnapshotSplit> assignedSplits,
+            Map<String, Offset> splitCompletedOffsets,
+            boolean isAssignerCompleted,
+            List<TableId> remainingTables,
+            boolean isTableIdCaseSensitive,
+            boolean isRemainingTablesCheckpointed) {
+        this.alreadyProcessedTables = alreadyProcessedTables;
+        this.remainingSplits = remainingSplits;
+        this.assignedSplits = assignedSplits;
+        this.splitCompletedOffsets = splitCompletedOffsets;
+        this.isAssignerCompleted = isAssignerCompleted;
+        this.remainingTables = remainingTables;
+        this.isTableIdCaseSensitive = isTableIdCaseSensitive;
+        this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsAckEvent.java
similarity index 69%
copy from 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
copy to 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsAckEvent.java
index b073ccaf9..768ca5fc9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsAckEvent.java
@@ -20,11 +20,20 @@ package org.seatunnel.connectors.cdc.base.source.event;
 import org.apache.seatunnel.api.source.SourceEvent;
 
 import lombok.Data;
+import 
org.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator;
 
 import java.util.List;
 
+/**
+ * The {@link SourceEvent} that {@link IncrementalSourceEnumerator} sends to 
{@link
+ * IncrementalSourceReader} to notify the completed snapshot splits has been 
received, i.e.
+ * acknowledge for {@link CompletedSnapshotSplitsReportEvent}.
+ */
 @Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
+public class CompletedSnapshotSplitsAckEvent implements SourceEvent {
+
     private static final long serialVersionUID = 1L;
-    List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
+
+    private final List<String> completedSplits;
+
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsReportEvent.java
similarity index 93%
rename from 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
rename to 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsReportEvent.java
index b073ccaf9..455d597c8 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsReportEvent.java
@@ -24,7 +24,7 @@ import lombok.Data;
 import java.util.List;
 
 @Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
+public class CompletedSnapshotSplitsReportEvent implements SourceEvent {
     private static final long serialVersionUID = 1L;
     List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
index 60d82d260..88b40c528 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
@@ -30,5 +30,7 @@ public abstract class OffsetFactory {
 
     public abstract Offset specific(Map<String, String> offset);
 
+    public abstract Offset specific(String filename, Long position);
+
     public abstract Offset timstamp(long timestmap);
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 3e3d31a5a..d91f653e8 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -28,13 +28,13 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.Si
 
 import lombok.extern.slf4j.Slf4j;
 import org.seatunnel.connectors.cdc.base.config.SourceConfig;
-import 
org.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitReportEvent;
+import 
org.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
 import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
-import org.seatunnel.connectors.cdc.base.source.split.LogSplit;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
 import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.seatunnel.connectors.cdc.base.source.split.SourceRecords;
 import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-import org.seatunnel.connectors.cdc.base.source.split.state.LogSplitState;
+import 
org.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
 import org.seatunnel.connectors.cdc.base.source.split.state.SnapshotSplitState;
 import 
org.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
 
@@ -47,7 +47,7 @@ import java.util.function.Supplier;
 
 /**
  * The multi-parallel source reader for table snapshot phase from {@link 
SnapshotSplit} and then
- * single-parallel source reader for table stream phase from {@link LogSplit}.
+ * single-parallel source reader for table stream phase from {@link 
IncrementalSplit}.
  */
 @Slf4j
 public class IncrementalSourceReader<T, C extends SourceConfig>
@@ -56,7 +56,7 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
 
     private final Map<String, SnapshotSplit> finishedUnackedSplits;
 
-    private final Map<String, LogSplit> uncompletedStreamSplits;
+    private final Map<String, IncrementalSplit> uncompletedIncrementalSplits;
 
     private final int subtaskId;
 
@@ -77,7 +77,7 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
             context);
         this.sourceConfig = sourceConfig;
         this.finishedUnackedSplits = new HashMap<>();
-        this.uncompletedStreamSplits = new HashMap<>();
+        this.uncompletedIncrementalSplits = new HashMap<>();
         this.subtaskId = context.getIndexOfSubtask();
     }
 
@@ -99,14 +99,14 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
                     unfinishedSplits.add(split);
                 }
             } else {
-                // the stream split is uncompleted
-                uncompletedStreamSplits.put(split.splitId(), 
split.asLogSplit());
-                unfinishedSplits.add(split.asLogSplit());
+                // the incremental split is uncompleted
+                uncompletedIncrementalSplits.put(split.splitId(), 
split.asIncrementalSplit());
+                unfinishedSplits.add(split.asIncrementalSplit());
             }
         }
         // notify split enumerator again about the finished unacked snapshot 
splits
         reportFinishedSnapshotSplitsIfNeed();
-        // add all un-finished splits (including stream split) to 
SourceReaderBase
+        // add all un-finished splits (including incremental split) to 
SourceReaderBase
         super.addSplits(unfinishedSplits);
     }
 
@@ -117,7 +117,7 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
             checkState(
                 sourceSplit.isSnapshotSplit(),
                 String.format(
-                    "Only snapshot split could finish, but the actual split is 
stream split %s",
+                    "Only snapshot split could finish, but the actual split is 
incremental split %s",
                     sourceSplit));
             finishedUnackedSplits.put(sourceSplit.splitId(), 
sourceSplit.asSnapshotSplit());
         }
@@ -132,7 +132,7 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
             for (SnapshotSplit split : finishedUnackedSplits.values()) {
                 completedSnapshotSplitWatermarks.add(new 
SnapshotSplitWatermark(split.splitId(), split.getHighWatermark()));
             }
-            CompletedSnapshotSplitReportEvent reportEvent = new 
CompletedSnapshotSplitReportEvent();
+            CompletedSnapshotSplitsReportEvent reportEvent = new 
CompletedSnapshotSplitsReportEvent();
             
reportEvent.setCompletedSnapshotSplitWatermarks(completedSnapshotSplitWatermarks);
             context.sendSourceEventToEnumerator(reportEvent);
             //TODO need enumerator return ack
@@ -149,7 +149,7 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
         if (split.isSnapshotSplit()) {
             return new SnapshotSplitState(split.asSnapshotSplit());
         } else {
-            return new LogSplitState(split.asLogSplit());
+            return new IncrementalSplitState(split.asIncrementalSplit());
         }
     }
 
@@ -161,8 +161,8 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
         // add finished snapshot splits that didn't receive ack yet
         stateSplits.addAll(finishedUnackedSplits.values());
 
-        // add stream splits who are uncompleted
-        stateSplits.addAll(uncompletedStreamSplits.values());
+        // add incremental splits who are uncompleted
+        stateSplits.addAll(uncompletedIncrementalSplits.values());
 
         return stateSplits;
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
index 663c0d727..462650d21 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -78,12 +78,12 @@ public class IncrementalSourceRecordEmitter<T>
             if (isLowWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
                 splitState.asSnapshotSplitState().setHighWatermark(watermark);
             }
-        } else if (isSchemaChangeEvent(element) && 
splitState.isLogSplitState()) {
+        } else if (isSchemaChangeEvent(element) && 
splitState.isIncrementalSplitState()) {
             //TODO Currently not supported Schema Change
         } else if (isDataChangeRecord(element)) {
-            if (splitState.isLogSplitState()) {
+            if (splitState.isIncrementalSplitState()) {
                 Offset position = getOffsetPosition(element);
-                splitState.asLogSplitState().setStartupOffset(position);
+                
splitState.asIncrementalSplitState().setStartupOffset(position);
             }
             emitElement(element, output);
         } else {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
index 0eb8e6699..9bfe4c49b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
@@ -123,9 +123,9 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
                     currentFetcher = new 
IncrementalSourceScanFetcher(taskContext, subtaskId);
                 }
             } else {
-                // point from snapshot split to stream split
+                // point from snapshot split to incremental split
                 if (currentFetcher != null) {
-                    log.info("It's turn to read stream split, close current 
snapshot fetcher.");
+                    log.info("It's turn to read incremental split, close 
current snapshot fetcher.");
                     currentFetcher.close();
                 }
                 final FetchTask.Context taskContext =
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
index a77f05a14..e5a5d94c2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
@@ -17,14 +17,14 @@
 
 package org.seatunnel.connectors.cdc.base.source.reader.external;
 
-import org.seatunnel.connectors.cdc.base.source.split.LogSplit;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
 import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 
 import java.util.Iterator;
 
 /**
  * Fetcher to fetch data of a table split, the split is either snapshot split 
{@link SnapshotSplit}
- * or stream split {@link LogSplit}.
+ * or incremental split {@link IncrementalSplit}.
  */
 public interface Fetcher<T, Split> {
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 7952f5bd2..6b6e22104 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -26,7 +26,7 @@ import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.seatunnel.connectors.cdc.base.source.offset.Offset;
-import org.seatunnel.connectors.cdc.base.source.split.LogSplit;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
 import org.seatunnel.connectors.cdc.base.source.split.SourceRecords;
 import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 
@@ -43,7 +43,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Fetcher to fetch data from table split, the split is the stream split 
{@link LogSplit}.
+ * Fetcher to fetch data from table split, the split is the incremental split 
{@link IncrementalSplit}.
  */
 @Slf4j
 public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, 
SourceSplitBase> {
@@ -56,7 +56,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
 
     private FetchTask<SourceSplitBase> streamFetchTask;
 
-    private LogSplit currentLogSplit;
+    private IncrementalSplit currentIncrementalSplit;
 
     private Map<TableId, Offset> maxSplitHighWatermarkMap;
 
@@ -73,9 +73,9 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
     @Override
     public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
         this.streamFetchTask = fetchTask;
-        this.currentLogSplit = fetchTask.getSplit().asLogSplit();
+        this.currentIncrementalSplit = 
fetchTask.getSplit().asIncrementalSplit();
         configureFilter();
-        taskContext.configure(currentLogSplit);
+        taskContext.configure(currentIncrementalSplit);
         this.queue = taskContext.getQueue();
         executorService.submit(
             () -> {
@@ -84,8 +84,8 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
                 } catch (Exception e) {
                     log.error(
                         String.format(
-                            "Execute stream read task for stream split %s 
fail",
-                            currentLogSplit),
+                            "Execute stream read task for incremental split %s 
fail",
+                            currentIncrementalSplit),
                         e);
                     readException = e;
                 }
@@ -94,7 +94,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
 
     @Override
     public boolean isFinished() {
-        return currentLogSplit == null || !streamFetchTask.isRunning();
+        return currentIncrementalSplit == null || !streamFetchTask.isRunning();
     }
 
     @Override
@@ -119,7 +119,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
             throw new SeaTunnelException(
                 String.format(
                     "Read split %s error due to %s.",
-                    currentLogSplit, readException.getMessage()),
+                    currentIncrementalSplit, readException.getMessage()),
                 readException);
         }
     }
@@ -144,7 +144,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
     /**
      * Returns the record should emit or not.
      *
-     * <p>The watermark signal algorithm is the stream split reader only sends 
the change event that
+     * <p>The watermark signal algorithm is the incremental split reader only 
sends the change event that
      * belongs to its finished snapshot splits. For each snapshot split, the 
change event is valid
      * since the offset is after its high watermark.
      *
@@ -188,8 +188,8 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
         Map<TableId, Offset> tableIdOffsetPositionMap = new HashMap<>();
         // latest-offset mode
 
-        for (TableId tableId : currentLogSplit.getTableIds()) {
-            tableIdOffsetPositionMap.put(tableId, 
currentLogSplit.getStartupOffset());
+        for (TableId tableId : currentIncrementalSplit.getTableIds()) {
+            tableIdOffsetPositionMap.put(tableId, 
currentIncrementalSplit.getStartupOffset());
         }
         this.maxSplitHighWatermarkMap = tableIdOffsetPositionMap;
         this.pureStreamPhaseTables.clear();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
index 5b84daaa4..6f37776be 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
@@ -47,4 +47,13 @@ public class CompletedSnapshotSplitInfo implements 
Serializable {
         this.splitEnd = splitEnd;
         this.watermark = watermark;
     }
+
+    public SnapshotSplit asSnapshotSplit() {
+        return new SnapshotSplit(splitId,
+            tableId,
+            splitKeyType,
+            splitStart,
+            splitEnd,
+            watermark);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/LogSplit.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
similarity index 86%
rename from 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/LogSplit.java
rename to 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
index 267f08eca..de9f72ef8 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/LogSplit.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
@@ -24,15 +24,15 @@ import 
org.seatunnel.connectors.cdc.base.source.offset.Offset;
 import java.util.List;
 
 @Getter
-public class LogSplit extends SourceSplitBase {
+public class IncrementalSplit extends SourceSplitBase {
 
     /**
-     * All the tables that this log split needs to capture.
+     * All the tables that this incremental split needs to capture.
      */
     private final List<TableId> tableIds;
 
     /**
-     * Minimum watermark for SnapshotSplits for all tables in this LogSplit
+     * Minimum watermark for SnapshotSplits for all tables in this 
IncrementalSplit
      */
     private final Offset startupOffset;
 
@@ -42,12 +42,12 @@ public class LogSplit extends SourceSplitBase {
     private final Offset stopOffset;
 
     /**
-     * SnapshotSplit information for all tables in this LogSplit.
+     * SnapshotSplit information for all tables in this IncrementalSplit.
      * <br> Used to support Exactly-Once.
      */
     private final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos;
 
-    public LogSplit(
+    public IncrementalSplit(
             String splitId,
             List<TableId> capturedTables,
             Offset startupOffset,
@@ -59,9 +59,4 @@ public class LogSplit extends SourceSplitBase {
         this.stopOffset = stopOffset;
         this.completedSnapshotSplitInfos = completedSnapshotSplitInfos;
     }
-
-    @Override
-    public String splitId() {
-        return this.splitId;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/SourceSplitBase.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/SourceSplitBase.java
index a53120a40..acf0c0402 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/SourceSplitBase.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/SourceSplitBase.java
@@ -36,9 +36,9 @@ public abstract class SourceSplitBase implements SourceSplit {
         return getClass() == SnapshotSplit.class;
     }
 
-    /** Checks whether this split is a stream split. */
-    public final boolean isLogSplit() {
-        return getClass() == LogSplit.class;
+    /** Checks whether this split is an incremental split. */
+    public final boolean isIncrementalSplit() {
+        return getClass() == IncrementalSplit.class;
     }
 
     /** Casts this split into a {@link SnapshotSplit}. */
@@ -46,9 +46,9 @@ public abstract class SourceSplitBase implements SourceSplit {
         return (SnapshotSplit) this;
     }
 
-    /** Casts this split into a {@link LogSplit}. */
-    public final LogSplit asLogSplit() {
-        return (LogSplit) this;
+    /** Casts this split into a {@link IncrementalSplit}. */
+    public final IncrementalSplit asIncrementalSplit() {
+        return (IncrementalSplit) this;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/LogSplitState.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
similarity index 77%
rename from 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/LogSplitState.java
rename to 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
index 6b5258b8b..6cf1e241d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/LogSplitState.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
@@ -21,7 +21,7 @@ import io.debezium.relational.TableId;
 import lombok.Getter;
 import lombok.Setter;
 import org.seatunnel.connectors.cdc.base.source.offset.Offset;
-import org.seatunnel.connectors.cdc.base.source.split.LogSplit;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
 
 import java.util.List;
 
@@ -30,12 +30,12 @@ import java.util.List;
  */
 @Getter
 @Setter
-public class LogSplitState extends SourceSplitStateBase {
+public class IncrementalSplitState extends SourceSplitStateBase {
 
     private  List<TableId> tableIds;
 
     /**
-     * Minimum watermark for SnapshotSplits for all tables in this LogSplit
+     * Minimum watermark for SnapshotSplits for all tables in this 
IncrementalSplit
      */
     private Offset startupOffset;
 
@@ -44,7 +44,7 @@ public class LogSplitState extends SourceSplitStateBase {
      */
     private  Offset stopOffset;
 
-    public LogSplitState(LogSplit split) {
+    public IncrementalSplitState(IncrementalSplit split) {
         super(split);
         this.tableIds = split.getTableIds();
         this.startupOffset = split.getStartupOffset();
@@ -52,14 +52,14 @@ public class LogSplitState extends SourceSplitStateBase {
     }
 
     @Override
-    public LogSplit toSourceSplit() {
-        final LogSplit logSplit = split.asLogSplit();
-        return new LogSplit(
-            logSplit.splitId(),
+    public IncrementalSplit toSourceSplit() {
+        final IncrementalSplit incrementalSplit = split.asIncrementalSplit();
+        return new IncrementalSplit(
+            incrementalSplit.splitId(),
             getTableIds(),
             getStartupOffset(),
             getStopOffset(),
-            logSplit.getCompletedSnapshotSplitInfos()
+            incrementalSplit.getCompletedSnapshotSplitInfos()
         );
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/SourceSplitStateBase.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/SourceSplitStateBase.java
index 8c2ded141..b2af82c98 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/SourceSplitStateBase.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/SourceSplitStateBase.java
@@ -35,9 +35,9 @@ public abstract class SourceSplitStateBase {
         return getClass() == SnapshotSplitState.class;
     }
 
-    /** Checks whether this split state is a stream split state. */
-    public final boolean isLogSplitState() {
-        return getClass() == LogSplitState.class;
+    /** Checks whether this split state is a incremental split state. */
+    public final boolean isIncrementalSplitState() {
+        return getClass() == IncrementalSplitState.class;
     }
 
     /** Casts this split state into a {@link SnapshotSplitState}. */
@@ -45,9 +45,9 @@ public abstract class SourceSplitStateBase {
         return (SnapshotSplitState) this;
     }
 
-    /** Casts this split state into a {@link LogSplitState}. */
-    public final LogSplitState asLogSplitState() {
-        return (LogSplitState) this;
+    /** Casts this split state into a {@link IncrementalSplitState}. */
+    public final IncrementalSplitState asIncrementalSplitState() {
+        return (IncrementalSplitState) this;
     }
 
     /** Use the current split state to create a new SourceSplit. */

Reply via email to