This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9b1821f47 [feature][connector][cdc] add CDC enumerator base classes
(#3419)
9b1821f47 is described below
commit 9b1821f476e69b77ef958204afcc528c633d5738
Author: Zongwen Li <[email protected]>
AuthorDate: Mon Nov 14 14:03:10 2022 +0800
[feature][connector][cdc] add CDC enumerator base classes (#3419)
* [feature][connector][cdc] add CDC enumerator base classes
* [chore] stream split replace to incremental split
---
.../cdc/base/config/BaseSourceConfig.java | 31 +--
.../connectors/cdc/base/config/SourceConfig.java | 6 +-
.../connectors/cdc/base/config/StartupConfig.java | 53 +++++
.../connectors/cdc/base/config/StopConfig.java | 54 +++++
.../cdc/base/dialect/DataSourceDialect.java | 23 +-
.../connectors/cdc/base/option/SourceOptions.java | 81 +++++--
.../cdc/base/source/IncrementalSource.java | 147 ++++++++----
.../source/enumerator/HybridSplitAssigner.java | 155 ++++++++++++
.../enumerator/IncrementalSourceEnumerator.java | 164 +++++++++++++
.../enumerator/IncrementalSplitAssigner.java | 219 +++++++++++++++++
.../source/enumerator/SnapshotSplitAssigner.java | 264 +++++++++++++++++++++
.../cdc/base/source/enumerator/SplitAssigner.java | 121 ++++++++++
.../source/enumerator/splitter/ChunkRange.java | 61 +++++
.../splitter/ChunkSplitter.java} | 18 +-
.../splitter/JdbcSourceChunkSplitter.java | 144 +++++++++++
.../state/HybridPendingSplitsState.java} | 13 +-
.../state/IncrementalPhaseState.java} | 13 +-
.../state/PendingSplitsState.java} | 17 +-
.../enumerator/state/SnapshotPhaseState.java | 91 +++++++
...t.java => CompletedSnapshotSplitsAckEvent.java} | 13 +-
...ava => CompletedSnapshotSplitsReportEvent.java} | 2 +-
.../cdc/base/source/offset/OffsetFactory.java | 2 +
.../source/reader/IncrementalSourceReader.java | 30 +--
.../reader/IncrementalSourceRecordEmitter.java | 6 +-
.../reader/IncrementalSourceSplitReader.java | 4 +-
.../cdc/base/source/reader/external/Fetcher.java | 4 +-
.../external/IncrementalSourceStreamFetcher.java | 24 +-
.../source/split/CompletedSnapshotSplitInfo.java | 9 +
.../split/{LogSplit.java => IncrementalSplit.java} | 15 +-
.../cdc/base/source/split/SourceSplitBase.java | 12 +-
...gSplitState.java => IncrementalSplitState.java} | 18 +-
.../source/split/state/SourceSplitStateBase.java | 12 +-
32 files changed, 1612 insertions(+), 214 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
index 6fc5f33a9..dfadfdf3e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java
@@ -20,7 +20,6 @@ package org.seatunnel.connectors.cdc.base.config;
import io.debezium.config.Configuration;
import lombok.Getter;
import org.seatunnel.connectors.cdc.base.source.IncrementalSource;
-import org.seatunnel.connectors.cdc.base.source.offset.Offset;
import java.util.Properties;
@@ -31,10 +30,13 @@ public abstract class BaseSourceConfig implements
SourceConfig {
private static final long serialVersionUID = 1L;
- protected final Offset startupOffset;
+ @Getter
+ protected final StartupConfig startupConfig;
- protected final Offset stopOffset;
+ @Getter
+ protected final StopConfig stopConfig;
+ @Getter
protected final int splitSize;
@Getter
@@ -48,35 +50,20 @@ public abstract class BaseSourceConfig implements
SourceConfig {
protected final Properties dbzProperties;
public BaseSourceConfig(
- Offset startupOffset,
- Offset stopOffset,
+ StartupConfig startupConfig,
+ StopConfig stopConfig,
int splitSize,
double distributionFactorUpper,
double distributionFactorLower,
Properties dbzProperties) {
- this.startupOffset = startupOffset;
- this.stopOffset = stopOffset;
+ this.startupConfig = startupConfig;
+ this.stopConfig = stopConfig;
this.splitSize = splitSize;
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.dbzProperties = dbzProperties;
}
- @Override
- public Offset getStartupOffset() {
- return this.startupOffset;
- }
-
- @Override
- public Offset getStopOffset() {
- return this.stopOffset;
- }
-
- @Override
- public int getSplitSize() {
- return splitSize;
- }
-
public Configuration getDbzConfiguration() {
return Configuration.from(dbzProperties);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/SourceConfig.java
index 6d943dac0..96ce851ed 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/SourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/SourceConfig.java
@@ -17,16 +17,14 @@
package org.seatunnel.connectors.cdc.base.config;
-import org.seatunnel.connectors.cdc.base.source.offset.Offset;
-
import java.io.Serializable;
/** The source configuration which offers basic source configuration. */
public interface SourceConfig extends Serializable {
- Offset getStartupOffset();
+ StartupConfig getStartupConfig();
- Offset getStopOffset();
+ StopConfig getStopConfig();
int getSplitSize();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StartupConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StartupConfig.java
new file mode 100644
index 000000000..049c963d9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StartupConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.config;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+
+import java.io.Serializable;
+
+@AllArgsConstructor
+@EqualsAndHashCode
+public final class StartupConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
+ @Getter
+ private final StartupMode startupMode;
+ private final String specificOffsetFile;
+ private final Long specificOffsetPos;
+ private final Long timestamp;
+
+ public Offset getStartupOffset(OffsetFactory offsetFactory) {
+ switch (startupMode) {
+ case EARLIEST:
+ return offsetFactory.earliest();
+ case LATEST:
+ return offsetFactory.latest();
+ case INITIAL:
+ return null;
+ case TIMESTAMP:
+ return offsetFactory.timstamp(timestamp);
+ default:
+ throw new IllegalArgumentException(String.format("The %s mode
is not supported.", startupMode));
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StopConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StopConfig.java
new file mode 100644
index 000000000..f6cd95fc1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/StopConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.config;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.seatunnel.connectors.cdc.base.option.StopMode;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+
+import java.io.Serializable;
+
+@AllArgsConstructor
+@EqualsAndHashCode
+public final class StopConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Getter
+ private final StopMode stopMode;
+ private final String specificOffsetFile;
+ private final Long specificOffsetPos;
+ private final Long timestamp;
+
+ public Offset getStopOffset(OffsetFactory offsetFactory) {
+ switch (stopMode) {
+ case LATEST:
+ return offsetFactory.latest();
+ case NEVER:
+ return offsetFactory.neverStop();
+ case SPECIFIC:
+ return offsetFactory.specific(specificOffsetFile,
specificOffsetPos);
+ case TIMESTAMP:
+ return offsetFactory.timstamp(timestamp);
+ default:
+ throw new IllegalArgumentException(String.format("The %s mode
is not supported.", stopMode));
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
index 0f42a4039..9088fbe3b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/DataSourceDialect.java
@@ -18,22 +18,19 @@
package org.seatunnel.connectors.cdc.base.dialect;
import io.debezium.relational.TableId;
-import io.debezium.relational.history.TableChanges;
import org.seatunnel.connectors.cdc.base.config.SourceConfig;
-import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import java.io.Serializable;
import java.util.List;
-import java.util.Map;
/**
* The dialect of data source.
*
* @param <C> The source config of data source.
*/
-
public interface DataSourceDialect<C extends SourceConfig> extends
Serializable {
/** Get the name of dialect. */
@@ -42,23 +39,13 @@ public interface DataSourceDialect<C extends SourceConfig>
extends Serializable
/** Discovers the list of data collection to capture. */
List<TableId> discoverDataCollections(C sourceConfig);
- /**
- * Discovers the captured data collections' schema by {@link SourceConfig}.
- *
- * @param sourceConfig a basic source configuration.
- */
- Map<TableId, TableChanges.TableChange> discoverDataCollectionSchemas(C
sourceConfig);
-
- /**
- * Displays current offset from the database e.g. query Mysql binary logs
by query <code>
- * SHOW MASTER STATUS</code>.
- */
- Offset displayCurrentOffset(C sourceConfig);
-
/** Check if the CollectionId is case-sensitive or not. */
boolean isDataCollectionIdCaseSensitive(C sourceConfig);
- /** The fetch task used to fetch data of a snapshot split or stream split.
*/
+ /** Returns the {@link ChunkSplitter} which used to split collection to
splits. */
+ ChunkSplitter createChunkSplitter(C sourceConfig);
+
+ /** The fetch task used to fetch data of a snapshot split or incremental
split. */
FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase
sourceSplitBase);
/** The task context used for fetch task to fetch data from external
systems. */
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/SourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/SourceOptions.java
index afd296947..6cf299e28 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -19,39 +19,76 @@ package org.seatunnel.connectors.cdc.base.option;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
@SuppressWarnings("MagicNumber")
public class SourceOptions {
public static final Option<Integer> SNAPSHOT_SPLIT_SIZE =
Options.key("snapshot.split.size")
- .intType()
- .defaultValue(8096)
- .withDescription("The split size (number of rows) of table
snapshot, captured tables are split into multiple splits when read the snapshot
of table.");
+ .intType()
+ .defaultValue(8096)
+ .withDescription("The split size (number of rows) of table snapshot,
captured tables are split into multiple splits when read the snapshot of
table.");
public static final Option<Integer> SNAPSHOT_FETCH_SIZE =
Options.key("snapshot.fetch.size")
- .intType()
- .defaultValue(1024)
- .withDescription("The maximum fetch size for per poll when read
table snapshot.");
+ .intType()
+ .defaultValue(1024)
+ .withDescription("The maximum fetch size for per poll when read table
snapshot.");
public static final Option<StartupMode> STARTUP_MODE =
Options.key("startup.mode")
- .enumType(StartupMode.class)
- .defaultValue(StartupMode.INITIAL)
- .withDescription("Optional startup mode for CDC source, valid
enumerations are "
- + "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n
or \"specific\"");
-
- public static final Option<StopMode> STOP_MODE = Options.key("stop.mode")
- .enumType(StopMode.class)
- .defaultValue(StopMode.NEVER)
- .withDescription("Optional stop mode for CDC source, valid
enumerations are "
- + "\"never\", \"latest\", \"timestamp\"\n or
\"specific\"");
+ .enumType(StartupMode.class)
+ .defaultValue(StartupMode.INITIAL)
+ .withDescription("Optional startup mode for CDC source, valid
enumerations are "
+ + "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or
\"specific\"");
public static final Option<Long> STARTUP_TIMESTAMP =
Options.key("startup.timestamp")
- .longType()
- .noDefaultValue()
- .withDescription("Optional timestamp(mills) used in case of
\"timestamp\" startup mode");
+ .longType()
+ .noDefaultValue()
+ .withDescription("Optional timestamp(mills) used in case of
\"timestamp\" startup mode");
+
+ public static final Option<String> STARTUP_SPECIFIC_OFFSET_FILE =
Options.key("startup.specific-offset.file")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional offsets used in case of \"specific\" startup mode");
+
+ public static final Option<Long> STARTUP_SPECIFIC_OFFSET_POS =
Options.key("startup.specific-offset.pos")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional offsets used in case of \"specific\" startup mode");
+
+ public static final Option<Integer> INCREMENTAL_PARALLELISM =
Options.key("incremental.parallelism")
+ .intType()
+ .defaultValue(1)
+ .withDescription("The number of parallel readers in the incremental
phase.");
+
+ public static final Option<StopMode> STOP_MODE = Options.key("stop.mode")
+ .enumType(StopMode.class)
+ .defaultValue(StopMode.NEVER)
+ .withDescription("Optional stop mode for CDC source, valid
enumerations are "
+ + "\"never\", \"latest\", \"timestamp\"\n or \"specific\"");
public static final Option<Long> STOP_TIMESTAMP =
Options.key("stop.timestamp")
- .longType()
- .noDefaultValue()
- .withDescription("Optional timestamp(mills) used in case of
\"timestamp\" stop mode");
+ .longType()
+ .noDefaultValue()
+ .withDescription("Optional timestamp(mills) used in case of
\"timestamp\" stop mode");
+
+ public static final Option<String> STOP_SPECIFIC_OFFSET_FILE =
Options.key("stop.specific-offset.file")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Optional offsets used in case of \"specific\" stop
mode");
+
+ public static final Option<Long> STOP_SPECIFIC_OFFSET_POS =
Options.key("stop.specific-offset.pos")
+ .longType()
+ .noDefaultValue()
+ .withDescription("Optional offsets used in case of \"specific\" stop
mode");
+
+ public static final OptionRule.Builder BASE_RULE = OptionRule.builder()
+ .optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
+ .optional(INCREMENTAL_PARALLELISM)
+ .optional(STARTUP_MODE, STOP_MODE)
+ .conditional(STARTUP_MODE, StartupMode.TIMESTAMP, STARTUP_TIMESTAMP)
+ .conditional(STARTUP_MODE, StartupMode.SPECIFIC,
STARTUP_SPECIFIC_OFFSET_FILE, STARTUP_SPECIFIC_OFFSET_POS)
+ .conditional(STOP_MODE, StopMode.TIMESTAMP, STOP_TIMESTAMP)
+ .conditional(STOP_MODE, StopMode.SPECIFIC, STOP_SPECIFIC_OFFSET_FILE,
STOP_SPECIFIC_OFFSET_POS);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 4804fdd50..163288151 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -22,28 +22,44 @@ import
org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import io.debezium.relational.TableId;
import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.seatunnel.connectors.cdc.base.config.StartupConfig;
+import org.seatunnel.connectors.cdc.base.config.StopConfig;
+import org.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.seatunnel.connectors.cdc.base.option.StartupMode;
import org.seatunnel.connectors.cdc.base.option.StopMode;
-import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.enumerator.HybridSplitAssigner;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner;
+import org.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
-import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
-public abstract class IncrementalSource<T, C extends SourceConfig> implements
SeaTunnelSource<T, SourceSplit, Serializable> {
+public abstract class IncrementalSource<T, C extends SourceConfig> implements
SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
+ protected SourceConfig.Factory<C> configFactory;
protected OffsetFactory offsetFactory;
- protected Offset startupOffset;
- protected Offset stopOffset;
+ protected DataSourceDialect<C> dataSourceDialect;
+ protected StartupConfig startupConfig;
+
+ protected int incrementalParallelism;
+ protected StopConfig stopConfig;
protected StopMode stopMode;
protected DebeziumDeserializationSchema<T> deserializationSchema;
@@ -52,43 +68,39 @@ public abstract class IncrementalSource<T, C extends
SourceConfig> implements Se
public final void prepare(Config pluginConfig) throws PrepareFailException
{
ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
- this.offsetFactory = createOffsetFactory();
- this.startupOffset = getStartupOffset(config);
- this.stopOffset = getStopOffset(config);
- this.stopMode = config.get(SourceOptions.STOP_MODE);
+ this.startupConfig = getStartupConfig(config);
+ this.stopConfig = getStopConfig(config);
+ this.stopMode = stopConfig.getStopMode();
+ this.incrementalParallelism =
config.get(SourceOptions.INCREMENTAL_PARALLELISM);
+ this.configFactory = createSourceConfigFactory(config);
+ this.offsetFactory = createOffsetFactory(config);
+ this.deserializationSchema =
createDebeziumDeserializationSchema(config);
+ this.dataSourceDialect = createDataSourceDialect(config);
}
- private Offset getStartupOffset(ReadonlyConfig config) {
- StartupMode startupMode = config.get(SourceOptions.STARTUP_MODE);
- switch (startupMode) {
- case EARLIEST:
- return offsetFactory.earliest();
- case LATEST:
- return offsetFactory.latest();
- case INITIAL:
- return null;
- case TIMESTAMP:
- return
offsetFactory.timstamp(config.get(SourceOptions.STOP_TIMESTAMP));
- default:
- throw new IllegalArgumentException(String.format("The %s mode
is not supported.", startupMode));
- }
+ protected StartupConfig getStartupConfig(ReadonlyConfig config) {
+ return new StartupConfig(
+ config.get(SourceOptions.STARTUP_MODE),
+ config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE),
+ config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_POS),
+ config.get(SourceOptions.STARTUP_TIMESTAMP));
}
- private Offset getStopOffset(ReadonlyConfig config) {
- StopMode stopMode = config.get(SourceOptions.STOP_MODE);
- switch (stopMode) {
- case LATEST:
- return offsetFactory.latest();
- case NEVER:
- return offsetFactory.neverStop();
- case TIMESTAMP:
- return
offsetFactory.timstamp(config.get(SourceOptions.STOP_TIMESTAMP));
- default:
- throw new IllegalArgumentException(String.format("The %s mode
is not supported.", stopMode));
- }
+ private StopConfig getStopConfig(ReadonlyConfig config) {
+ return new StopConfig(
+ config.get(SourceOptions.STOP_MODE),
+ config.get(SourceOptions.STOP_SPECIFIC_OFFSET_FILE),
+ config.get(SourceOptions.STOP_SPECIFIC_OFFSET_POS),
+ config.get(SourceOptions.STOP_TIMESTAMP));
}
- public abstract OffsetFactory createOffsetFactory();
+ public abstract SourceConfig.Factory<C>
createSourceConfigFactory(ReadonlyConfig config);
+
+ public abstract OffsetFactory createOffsetFactory(ReadonlyConfig config);
+
+ public abstract DebeziumDeserializationSchema<T>
createDebeziumDeserializationSchema(ReadonlyConfig config);
+
+ public abstract DataSourceDialect<C>
createDataSourceDialect(ReadonlyConfig config);
@Override
public Boundedness getBoundedness() {
@@ -101,23 +113,66 @@ public abstract class IncrementalSource<T, C extends
SourceConfig> implements Se
}
@Override
- public SourceReader<T, SourceSplit> createReader(SourceReader.Context
readerContext) throws Exception {
+ public SourceReader<T, SourceSplitBase> createReader(SourceReader.Context
readerContext) throws Exception {
// TODO: https://github.com/apache/incubator-seatunnel/issues/3255
// https://github.com/apache/incubator-seatunnel/issues/3256
return null;
}
@Override
- public SourceSplitEnumerator<SourceSplit, Serializable>
createEnumerator(SourceSplitEnumerator.Context<SourceSplit> enumeratorContext)
throws Exception {
- // TODO: https://github.com/apache/incubator-seatunnel/issues/3253
- // https://github.com/apache/incubator-seatunnel/issues/3254
- return null;
+ public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState>
createEnumerator(SourceSplitEnumerator.Context<SourceSplitBase>
enumeratorContext) throws Exception {
+ C sourceConfig = configFactory.create(0);
+ final List<TableId> remainingTables =
dataSourceDialect.discoverDataCollections(sourceConfig);
+ final SplitAssigner splitAssigner;
+ SplitAssigner.Context<C> assignerContext = new
SplitAssigner.Context<>(sourceConfig, new HashSet<>(remainingTables), new
HashMap<>(), new HashMap<>());
+ if (sourceConfig.getStartupConfig().getStartupMode() ==
StartupMode.INITIAL) {
+ try {
+
+ boolean isTableIdCaseSensitive =
+
dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig);
+ splitAssigner =
+ new HybridSplitAssigner<>(
+ assignerContext,
+ enumeratorContext.currentParallelism(),
+ incrementalParallelism,
+ remainingTables,
+ isTableIdCaseSensitive,
+ dataSourceDialect,
+ offsetFactory);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to discover captured tables
for enumerator", e);
+ }
+ } else {
+ splitAssigner = new IncrementalSplitAssigner<>(assignerContext,
incrementalParallelism, offsetFactory);
+ }
+
+ return new IncrementalSourceEnumerator(enumeratorContext,
splitAssigner);
}
@Override
- public SourceSplitEnumerator<SourceSplit, Serializable>
restoreEnumerator(SourceSplitEnumerator.Context<SourceSplit> enumeratorContext,
Serializable checkpointState) throws Exception {
- // TODO: https://github.com/apache/incubator-seatunnel/issues/3253
- // https://github.com/apache/incubator-seatunnel/issues/3254
- return null;
+ public SourceSplitEnumerator<SourceSplitBase, PendingSplitsState>
restoreEnumerator(SourceSplitEnumerator.Context<SourceSplitBase>
enumeratorContext,
+
PendingSplitsState checkpointState) throws Exception {
+ C sourceConfig = configFactory.create(0);
+ final List<TableId> remainingTables =
dataSourceDialect.discoverDataCollections(sourceConfig);
+ SplitAssigner.Context<C> assignerContext = new
SplitAssigner.Context<>(sourceConfig, new HashSet<>(remainingTables), new
HashMap<>(), new HashMap<>());
+ final SplitAssigner splitAssigner;
+ if (checkpointState instanceof HybridPendingSplitsState) {
+ splitAssigner = new HybridSplitAssigner<>(
+ assignerContext,
+ enumeratorContext.currentParallelism(),
+ incrementalParallelism,
+ (HybridPendingSplitsState) checkpointState,
+ dataSourceDialect,
+ offsetFactory);
+ } else if (checkpointState instanceof IncrementalPhaseState) {
+ splitAssigner = new IncrementalSplitAssigner<>(
+ assignerContext,
+ incrementalParallelism,
+ offsetFactory);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported restored PendingSplitsState: " + checkpointState);
+ }
+ return new IncrementalSourceEnumerator(enumeratorContext,
splitAssigner);
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
new file mode 100644
index 000000000..af23b47e8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator;
+
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
+import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Assigner for Hybrid split which contains snapshot splits and incremental
splits.
+ */
+public class HybridSplitAssigner<C extends SourceConfig> implements
SplitAssigner {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HybridSplitAssigner.class);
+
+ private final SnapshotSplitAssigner<C> snapshotSplitAssigner;
+
+ private final IncrementalSplitAssigner<C> incrementalSplitAssigner;
+
+ public HybridSplitAssigner(
+ SplitAssigner.Context<C> context,
+ int currentParallelism,
+ int incrementalParallelism,
+ List<TableId> remainingTables,
+ boolean isTableIdCaseSensitive,
+ DataSourceDialect<C> dialect,
+ OffsetFactory offsetFactory) {
+ this(new SnapshotSplitAssigner<>(
+ context,
+ currentParallelism,
+ remainingTables,
+ isTableIdCaseSensitive,
+ dialect),
+ new IncrementalSplitAssigner<>(
+ context,
+ incrementalParallelism,
+ offsetFactory));
+ }
+
+ public HybridSplitAssigner(
+ SplitAssigner.Context<C> context,
+ int currentParallelism,
+ int incrementalParallelism,
+ HybridPendingSplitsState checkpoint,
+ DataSourceDialect<C> dialect,
+ OffsetFactory offsetFactory) {
+ this(
+ new SnapshotSplitAssigner<>(
+ context,
+ currentParallelism,
+ checkpoint.getSnapshotPhaseState(),
+ dialect),
+ new IncrementalSplitAssigner<>(
+ context,
+ incrementalParallelism,
+ offsetFactory));
+ }
+
+ private HybridSplitAssigner(
+ SnapshotSplitAssigner<C> snapshotSplitAssigner,
+ IncrementalSplitAssigner<C> incrementalSplitAssigner) {
+ this.snapshotSplitAssigner = snapshotSplitAssigner;
+ this.incrementalSplitAssigner = incrementalSplitAssigner;
+ }
+
+ @Override
+ public void open() {
+ snapshotSplitAssigner.open();
+ }
+
+ @Override
+ public Optional<SourceSplitBase> getNext() {
+ if (!snapshotSplitAssigner.noMoreSplits()) {
+ // snapshot assigner still have remaining splits, assign split
from it
+ return snapshotSplitAssigner.getNext();
+ }
+ if (!snapshotSplitAssigner.isCompleted()) {
+ // incremental split is not ready by now
+ return Optional.empty();
+ }
+ // incremental split assigning
+ if (!incrementalSplitAssigner.noMoreSplits()) {
+ // we need to wait snapshot-assigner to be completed before
+ // assigning the incremental split. Otherwise, records emitted
from incremental split
+ // might be out-of-order in terms of same primary key with
snapshot splits.
+ return snapshotSplitAssigner.getNext();
+ }
+ // no more splits for the assigner
+ return Optional.empty();
+ }
+
+ @Override
+ public boolean waitingForCompletedSplits() {
+ return snapshotSplitAssigner.waitingForCompletedSplits();
+ }
+
+ @Override
+ public void onCompletedSplits(List<SnapshotSplitWatermark>
completedSplitWatermarks) {
+ snapshotSplitAssigner.onCompletedSplits(completedSplitWatermarks);
+ incrementalSplitAssigner.onCompletedSplits(completedSplitWatermarks);
+ }
+
+ @Override
+ public void addSplits(Collection<SourceSplitBase> splits) {
+ List<SourceSplitBase> snapshotSplits = new ArrayList<>();
+ List<SourceSplitBase> incrementalSplits = new ArrayList<>();
+ for (SourceSplitBase split : splits) {
+ if (split.isSnapshotSplit()) {
+ snapshotSplits.add(split);
+ } else {
+ incrementalSplits.add(split);
+ }
+ }
+ snapshotSplitAssigner.addSplits(snapshotSplits);
+ incrementalSplitAssigner.addSplits(incrementalSplits);
+ }
+
+ @Override
+ public PendingSplitsState snapshotState(long checkpointId) {
+ return new
HybridPendingSplitsState(snapshotSplitAssigner.snapshotState(checkpointId),
incrementalSplitAssigner.snapshotState(checkpointId));
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
+ incrementalSplitAssigner.notifyCheckpointComplete(checkpointId);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
new file mode 100644
index 000000000..3b47bb3f4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import
org.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
+import
org.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsAckEvent;
+import
org.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
+import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * Incremental source enumerator that enumerates receive the split request and
assign the split to
+ * source readers.
+ */
+
+public class IncrementalSourceEnumerator
+ implements SourceSplitEnumerator<SourceSplitBase, PendingSplitsState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(IncrementalSourceEnumerator.class);
+
+ private final SourceSplitEnumerator.Context<SourceSplitBase> context;
+ private final SplitAssigner splitAssigner;
+
+ /**
+ * using TreeSet to prefer assigning incremental split to task-0 for
easier debug
+ */
+ private final TreeSet<Integer> readersAwaitingSplit;
+
+ public IncrementalSourceEnumerator(
+ SourceSplitEnumerator.Context<SourceSplitBase> context,
+ SplitAssigner splitAssigner) {
+ this.context = context;
+ this.splitAssigner = splitAssigner;
+ this.readersAwaitingSplit = new TreeSet<>();
+ }
+
+ @Override
+ public void open() {
+ splitAssigner.open();
+ }
+
+ @Override
+ public void run() throws Exception {
+
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+ if (!context.registeredReaders().contains(subtaskId)) {
+ // reader failed between sending the request and now. skip this
request.
+ return;
+ }
+
+ readersAwaitingSplit.add(subtaskId);
+ assignSplits();
+ }
+
+ @Override
+ public void addSplitsBack(List<SourceSplitBase> splits, int subtaskId) {
+ LOG.debug("Incremental Source Enumerator adds splits back: {}",
splits);
+ splitAssigner.addSplits(splits);
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return 0;
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ // do nothing
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof CompletedSnapshotSplitsReportEvent) {
+ LOG.info(
+ "The enumerator receives completed split watermarks(log
offset) {} from subtask {}.",
+ sourceEvent,
+ subtaskId);
+ CompletedSnapshotSplitsReportEvent reportEvent =
+ (CompletedSnapshotSplitsReportEvent) sourceEvent;
+ List<SnapshotSplitWatermark> completedSplitWatermarks =
reportEvent.getCompletedSnapshotSplitWatermarks();
+ splitAssigner.onCompletedSplits(completedSplitWatermarks);
+
+ // send acknowledge event
+ CompletedSnapshotSplitsAckEvent ackEvent =
+ new
CompletedSnapshotSplitsAckEvent(completedSplitWatermarks.stream()
+ .map(SnapshotSplitWatermark::getSplitId)
+ .collect(Collectors.toList()));
+ context.sendEventToSourceReader(subtaskId, ackEvent);
+ }
+ }
+
+ @Override
+ public PendingSplitsState snapshotState(long checkpointId) {
+ return splitAssigner.snapshotState(checkpointId);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ splitAssigner.notifyCheckpointComplete(checkpointId);
+ // incremental split may be available after checkpoint complete
+ assignSplits();
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Closing enumerator...");
+ splitAssigner.close();
+ }
+
+ //
------------------------------------------------------------------------------------------
+
+ private void assignSplits() {
+ final Iterator<Integer> awaitingReader =
readersAwaitingSplit.iterator();
+
+ while (awaitingReader.hasNext()) {
+ int nextAwaiting = awaitingReader.next();
+ // if the reader that requested another split has failed in the
meantime, remove
+ // it from the list of waiting readers
+ if (!context.registeredReaders().contains(nextAwaiting)) {
+ awaitingReader.remove();
+ continue;
+ }
+
+ Optional<SourceSplitBase> split = splitAssigner.getNext();
+ if (split.isPresent()) {
+ final SourceSplitBase sourceSplit = split.get();
+ context.assignSplit(nextAwaiting, sourceSplit);
+ awaitingReader.remove();
+ LOG.info("Assign split {} to subtask {}", sourceSplit,
nextAwaiting);
+ } else {
+ // there is no available splits by now, skip assigning
+ break;
+ }
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
new file mode 100644
index 000000000..0863d41ab
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator;
+
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
+import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+import
org.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Assigner for incremental split.
+ */
+public class IncrementalSplitAssigner<C extends SourceConfig> implements
SplitAssigner {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IncrementalSplitAssigner.class);
+ protected static final String INCREMENTAL_SPLIT_ID =
"incremental-split-%d";
+
+ private final SplitAssigner.Context<C> context;
+
+ private final int incrementalParallelism;
+
+ private final OffsetFactory offsetFactory;
+
+ /**
+ * Maximum watermark in SnapshotSplits per table.
+ * <br> Used to delete information in completedSnapshotSplitInfos,
reducing state size.
+ * <br> Used to support Exactly-Once.
+ */
+ private final Map<TableId, Offset> tableWatermarks = new HashMap<>();
+
+ private boolean splitAssigned = false;
+
+ private final List<IncrementalSplit> remainingSplits = new ArrayList<>();
+
+ private final Map<String, IncrementalSplit> assignedSplits = new
HashMap<>();
+
+ public IncrementalSplitAssigner(
+ SplitAssigner.Context<C> context,
+ int incrementalParallelism,
+ OffsetFactory offsetFactory) {
+ this.context = context;
+ this.incrementalParallelism = incrementalParallelism;
+ this.offsetFactory = offsetFactory;
+ }
+
+ @Override
+ public void open() {
+ }
+
+ @Override
+ public Optional<SourceSplitBase> getNext() {
+ if (!remainingSplits.isEmpty()) {
+ // return remaining splits firstly
+ Iterator<IncrementalSplit> iterator = remainingSplits.iterator();
+ IncrementalSplit split = iterator.next();
+ iterator.remove();
+ assignedSplits.put(split.splitId(), split);
+ return Optional.of(split);
+ }
+ if (splitAssigned) {
+ return Optional.empty();
+ }
+ List<IncrementalSplit> incrementalSplits = createIncrementalSplits();
+ remainingSplits.addAll(incrementalSplits);
+ splitAssigned = true;
+ return getNext();
+ }
+
+ /**
+ * Indicates there is no more splits available in this assigner.
+ */
+ public boolean noMoreSplits() {
+ return getRemainingTables().isEmpty() && remainingSplits.isEmpty();
+ }
+
+ private Set<TableId> getRemainingTables() {
+ Set<TableId> allTables = new HashSet<>(context.getCapturedTables());
+ assignedSplits.values()
+ .forEach(split -> split.getTableIds().forEach(allTables::remove));
+ return allTables;
+ }
+
+ @Override
+ public boolean waitingForCompletedSplits() {
+ return false;
+ }
+
+ @Override
+ public void onCompletedSplits(List<SnapshotSplitWatermark>
completedSplitWatermarks) {
+ // do nothing
+ }
+
+ @Override
+ public void addSplits(Collection<SourceSplitBase> splits) {
+ // we don't store the split, but will re-create incremental split later
+ splits.stream()
+ .map(SourceSplitBase::asIncrementalSplit)
+ .forEach(incrementalSplit -> {
+ Offset startupOffset = incrementalSplit.getStartupOffset();
+ List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos =
incrementalSplit.getCompletedSnapshotSplitInfos();
+ for (CompletedSnapshotSplitInfo info :
completedSnapshotSplitInfos) {
+ context.getSplitCompletedOffsets().put(info.getSplitId(),
info.getWatermark());
+ context.getAssignedSnapshotSplit().put(info.getSplitId(),
info.asSnapshotSplit());
+ }
+ for (TableId tableId : incrementalSplit.getTableIds()) {
+ tableWatermarks.put(tableId, startupOffset);
+ }
+ });
+ }
+
+ @Override
+ public IncrementalPhaseState snapshotState(long checkpointId) {
+ return new IncrementalPhaseState();
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // nothing to do
+ }
+
+ //
------------------------------------------------------------------------------------------
+
+ public List<IncrementalSplit> createIncrementalSplits() {
+ Set<TableId> allTables = new HashSet<>(context.getCapturedTables());
+ assignedSplits.values()
+ .forEach(split -> split.getTableIds().forEach(allTables::remove));
+ List<TableId>[] capturedTables = new List[incrementalParallelism];
+ int i = 0;
+ for (TableId tableId : allTables) {
+ int index = i % incrementalParallelism;
+ if (capturedTables[index] == null) {
+ capturedTables[index] = new ArrayList<>();
+ }
+ capturedTables[index].add(tableId);
+ i++;
+ }
+ i = 0;
+ List<IncrementalSplit> incrementalSplits = new ArrayList<>();
+ for (List<TableId> capturedTable : capturedTables) {
+ incrementalSplits.add(createIncrementalSplit(capturedTable, i++));
+ }
+ return incrementalSplits;
+ }
+
+ private IncrementalSplit createIncrementalSplit(List<TableId>
capturedTables, int index) {
+ final List<SnapshotSplit> assignedSnapshotSplit =
+ context.getAssignedSnapshotSplit().values().stream()
+ .filter(split -> capturedTables.contains(split.getTableId()))
+ .sorted(Comparator.comparing(SourceSplitBase::splitId))
+ .collect(Collectors.toList());
+
+ Map<String, Offset> splitCompletedOffsets =
context.getSplitCompletedOffsets();
+ final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos =
new ArrayList<>();
+ Offset minOffset = null;
+ for (SnapshotSplit split : assignedSnapshotSplit) {
+ // find the min offset of change log
+ Offset changeLogOffset =
splitCompletedOffsets.get(split.splitId());
+ if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
+ minOffset = changeLogOffset;
+ }
+ completedSnapshotSplitInfos.add(
+ new CompletedSnapshotSplitInfo(
+ split.splitId(),
+ split.getTableId(),
+ split.getSplitKeyType(),
+ split.getSplitStart(),
+ split.getSplitEnd(),
+ changeLogOffset));
+ }
+ for (TableId tableId : capturedTables) {
+ Offset watermark = tableWatermarks.get(tableId);
+ if (minOffset == null || watermark.isBefore(minOffset)) {
+ minOffset = watermark;
+ }
+ }
+ C sourceConfig = context.getSourceConfig();
+ return new IncrementalSplit(
+ String.format(INCREMENTAL_SPLIT_ID, index),
+ capturedTables,
+ minOffset != null ? minOffset :
sourceConfig.getStartupConfig().getStartupOffset(offsetFactory),
+ sourceConfig.getStopConfig().getStopOffset(offsetFactory),
+ completedSnapshotSplitInfos);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
new file mode 100644
index 000000000..5e8a62a5c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator;
+
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
+import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Assigner for snapshot split.
+ */
+public class SnapshotSplitAssigner<C extends SourceConfig> implements
SplitAssigner {
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotSplitAssigner.class);
+
+ private final SplitAssigner.Context<C> context;
+
+ private final C sourceConfig;
+ private final List<TableId> alreadyProcessedTables;
+ private final List<SnapshotSplit> remainingSplits;
+ private final Map<String, SnapshotSplit> assignedSplits;
+ private final Map<String, Offset> splitCompletedOffsets;
+ private boolean assignerCompleted;
+ private final int currentParallelism;
+ private final LinkedList<TableId> remainingTables;
+ private final boolean isRemainingTablesCheckpointed;
+
+ private ChunkSplitter chunkSplitter;
+ private boolean isTableIdCaseSensitive;
+
+ private Long checkpointIdToFinish;
+ private final DataSourceDialect<C> dialect;
+
+ SnapshotSplitAssigner(
+ SplitAssigner.Context<C> context,
+ int currentParallelism,
+ List<TableId> remainingTables,
+ boolean isTableIdCaseSensitive,
+ DataSourceDialect<C> dialect) {
+ this(
+ context,
+ currentParallelism,
+ new ArrayList<>(),
+ new ArrayList<>(),
+ new HashMap<>(),
+ new HashMap<>(),
+ false,
+ remainingTables,
+ isTableIdCaseSensitive,
+ true,
+ dialect);
+ }
+
+ SnapshotSplitAssigner(
+ SplitAssigner.Context<C> context,
+ int currentParallelism,
+ SnapshotPhaseState checkpoint,
+ DataSourceDialect<C> dialect) {
+ this(
+ context,
+ currentParallelism,
+ checkpoint.getAlreadyProcessedTables(),
+ checkpoint.getRemainingSplits(),
+ checkpoint.getAssignedSplits(),
+ checkpoint.getSplitCompletedOffsets(),
+ checkpoint.isAssignerCompleted(),
+ checkpoint.getRemainingTables(),
+ checkpoint.isTableIdCaseSensitive(),
+ checkpoint.isRemainingTablesCheckpointed(),
+ dialect);
+ }
+
+ private SnapshotSplitAssigner(
+ SplitAssigner.Context<C> context,
+ int currentParallelism,
+ List<TableId> alreadyProcessedTables,
+ List<SnapshotSplit> remainingSplits,
+ Map<String, SnapshotSplit> assignedSplits,
+ Map<String, Offset> splitCompletedOffsets,
+ boolean assignerCompleted,
+ List<TableId> remainingTables,
+ boolean isTableIdCaseSensitive,
+ boolean isRemainingTablesCheckpointed,
+ DataSourceDialect<C> dialect) {
+ this.context = context;
+ this.sourceConfig = context.getSourceConfig();
+ this.currentParallelism = currentParallelism;
+ this.alreadyProcessedTables = alreadyProcessedTables;
+ this.remainingSplits = remainingSplits;
+ this.assignedSplits = assignedSplits;
+ this.splitCompletedOffsets = splitCompletedOffsets;
+ this.assignerCompleted = assignerCompleted;
+ this.remainingTables = new LinkedList<>(remainingTables);
+ this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
+ this.isTableIdCaseSensitive = isTableIdCaseSensitive;
+ this.dialect = dialect;
+ }
+
+ @Override
+ public void open() {
+ chunkSplitter = dialect.createChunkSplitter(sourceConfig);
+
+ // the legacy state didn't snapshot remaining tables, discovery
remaining table here
+ if (!isRemainingTablesCheckpointed && !assignerCompleted) {
+ try {
+ final List<TableId> discoverTables =
dialect.discoverDataCollections(sourceConfig);
+ context.getCapturedTables().addAll(discoverTables);
+ discoverTables.removeAll(alreadyProcessedTables);
+ this.remainingTables.addAll(discoverTables);
+ this.isTableIdCaseSensitive =
dialect.isDataCollectionIdCaseSensitive(sourceConfig);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to discover remaining
tables to capture", e);
+ }
+ }
+ }
+
+ @Override
+ public Optional<SourceSplitBase> getNext() {
+ if (!remainingSplits.isEmpty()) {
+ // return remaining splits firstly
+ Iterator<SnapshotSplit> iterator = remainingSplits.iterator();
+ SnapshotSplit split = iterator.next();
+ iterator.remove();
+ assignedSplits.put(split.splitId(), split);
+ return Optional.of(split);
+ } else {
+ // it's turn for new table
+ TableId nextTable = remainingTables.pollFirst();
+ if (nextTable != null) {
+ // split the given table into chunks (snapshot splits)
+ Collection<SnapshotSplit> splits =
chunkSplitter.generateSplits(nextTable);
+ remainingSplits.addAll(splits);
+ alreadyProcessedTables.add(nextTable);
+ return getNext();
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
+
+ @Override
+ public boolean waitingForCompletedSplits() {
+ return !allSplitsCompleted();
+ }
+
+ @Override
+ public void onCompletedSplits(List<SnapshotSplitWatermark>
completedSplitWatermarks) {
+ completedSplitWatermarks.forEach(m ->
this.splitCompletedOffsets.put(m.getSplitId(), m.getHighWatermark()));
+ if (allSplitsCompleted()) {
+ // Skip the waiting checkpoint when current parallelism is 1 which
means we do not need
+ // to care about the global output data order of snapshot splits
and incremental split.
+ if (currentParallelism == 1) {
+ assignerCompleted = true;
+ LOG.info("Snapshot split assigner received all splits
completed and the job parallelism is 1, snapshot split assigner is turn into
completed status.");
+ } else {
+ LOG.info("Snapshot split assigner received all splits
completed, waiting for a complete checkpoint to mark the assigner completed.");
+ }
+ }
+ }
+
+ @Override
+ public void addSplits(Collection<SourceSplitBase> splits) {
+ for (SourceSplitBase split : splits) {
+ remainingSplits.add(split.asSnapshotSplit());
+ // we should remove the add-backed splits from the assigned list,
because they are failed
+ assignedSplits.remove(split.splitId());
+ splitCompletedOffsets.remove(split.splitId());
+ }
+ }
+
+ @Override
+ public SnapshotPhaseState snapshotState(long checkpointId) {
+ SnapshotPhaseState state =
+ new SnapshotPhaseState(
+ alreadyProcessedTables,
+ remainingSplits,
+ assignedSplits,
+ splitCompletedOffsets,
+ assignerCompleted,
+ remainingTables,
+ isTableIdCaseSensitive,
+ true);
+ // we need a complete checkpoint before mark this assigner to be
completed, to wait for all
+ // records of snapshot splits are completely processed
+ if (checkpointIdToFinish == null && !assignerCompleted &&
allSplitsCompleted()) {
+ checkpointIdToFinish = checkpointId;
+ }
+ return state;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // we have waited for at-least one complete checkpoint after all
snapshot-splits are
+ // completed, then we can mark snapshot assigner as completed.
+ if (checkpointIdToFinish != null && !assignerCompleted &&
allSplitsCompleted()) {
+ assignerCompleted = checkpointId >= checkpointIdToFinish;
+ LOG.info("Snapshot split assigner is turn into completed status.");
+ }
+ }
+
+ /**
+ * Indicates there is no more splits available in this assigner.
+ */
+ public boolean noMoreSplits() {
+ return remainingTables.isEmpty() && remainingSplits.isEmpty();
+ }
+
+ /**
+ * Returns whether the snapshot split assigner is completed, which
indicates there is no more
+ * splits and all records of splits have been completely processed in the
pipeline.
+ */
+ public boolean isCompleted() {
+ return assignerCompleted;
+ }
+
+ public Map<String, SnapshotSplit> getAssignedSplits() {
+ return assignedSplits;
+ }
+
+ public Map<String, Offset> getSplitCompletedOffsets() {
+ return splitCompletedOffsets;
+ }
+
+ //
-------------------------------------------------------------------------------------------
+
+ /**
+ * Returns whether all splits are completed which means no more splits and
all assigned splits
+ * are completed.
+ */
+ private boolean allSplitsCompleted() {
+ return noMoreSplits() && assignedSplits.size() ==
splitCompletedOffsets.size();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SplitAssigner.java
new file mode 100644
index 000000000..86a375486
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/SplitAssigner.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator;
+
+import org.apache.seatunnel.api.state.CheckpointListener;
+
+import io.debezium.relational.TableId;
+import lombok.Data;
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
+import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The {@code SplitAssigner} is responsible for deciding what split should be
processed. It
+ * determines split processing order.
+ */
+public interface SplitAssigner {
+
+ /**
+ * Called to open the assigner to acquire any resources, like threads or
network connections.
+ */
+ void open();
+
+ /**
+ * Gets the next split.
+ *
+ * <p>When this method returns an empty {@code Optional}, then the set of
splits is assumed to
+ * be done and the source will finish once the readers completed their
current splits.
+ */
+ Optional<SourceSplitBase> getNext();
+
+ /**
+ * Whether the split assigner is still waiting for callback of completed
splits, i.e. {@link #onCompletedSplits}.
+ */
+ boolean waitingForCompletedSplits();
+
+ /**
+ * Callback to handle the completed splits with completed change log
offset. This is useful for
+ * determine when to generate incremental split and what incremental split
to generate.
+ */
+ void onCompletedSplits(List<SnapshotSplitWatermark>
completedSplitWatermarks);
+
+ /**
+ * Adds a set of splits to this assigner. This happens for example when
some split processing
+ * failed and the splits need to be re-added.
+ */
+ void addSplits(Collection<SourceSplitBase> splits);
+
+ /**
+ * Creates a snapshot of the state of this split assigner, to be stored in
a checkpoint.
+ *
+ * <p>The snapshot should contain the latest state of the assigner: It
should assume that all
+ * operations that happened before the snapshot have successfully
completed. For example all
+ * splits assigned to readers via {@link #getNext()} don't need to be
included in the snapshot
+ * anymore.
+ *
+ * <p>This method takes the ID of the checkpoint for which the state is
snapshotted. Most
+ * implementations should be able to ignore this parameter, because for
the contents of the
+ * snapshot, it doesn't matter for which checkpoint it gets created. This
parameter can be
+ * interesting for source connectors with external systems where those
systems are themselves
+ * aware of checkpoints; for example in cases where the enumerator
notifies that system about a
+ * specific checkpoint being triggered.
+ *
+ * @param checkpointId The ID of the checkpoint for which the snapshot is
created.
+ * @return an object containing the state of the split enumerator.
+ */
+ PendingSplitsState snapshotState(long checkpointId);
+
+ /**
+ * Notifies the listener that the checkpoint with the given {@code
checkpointId} completed and
+ * was committed.
+ *
+ * @see CheckpointListener#notifyCheckpointComplete(long)
+ */
+ void notifyCheckpointComplete(long checkpointId);
+
+ /**
+ * Called to close the assigner, in case it holds on to any resources,
like threads or network
+ * connections.
+ */
+ default void close() {
+ }
+
+ @Data
+ final class Context<C extends SourceConfig> {
+ private final C sourceConfig;
+
+ private final Set<TableId> capturedTables;
+
+ private final Map<String, SnapshotSplit> assignedSnapshotSplit;
+
+ /**
+ * key: SnapshotSplit id
+ */
+ private final Map<String, Offset> splitCompletedOffsets;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkRange.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkRange.java
new file mode 100644
index 000000000..673921532
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkRange.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator.splitter;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import java.util.Objects;
+
+/**
+ * An internal structure describes a chunk range with a chunk start
(inclusive) and chunk end
+ * (exclusive). Note that {@code null} represents unbounded chunk start/end.
+ */
+@Getter
+@EqualsAndHashCode
+public class ChunkRange {
+ private final Object chunkStart;
+ private final Object chunkEnd;
+
+ /**
+ * Returns a {@link ChunkRange} which represents a full table scan with
unbounded chunk start
+ * and chunk end.
+ */
+ public static ChunkRange all() {
+ return new ChunkRange(null, null);
+ }
+
+ /** Returns a {@link ChunkRange} with the given chunk start and chunk end.
*/
+ public static ChunkRange of(Object chunkStart, Object chunkEnd) {
+ return new ChunkRange(chunkStart, chunkEnd);
+ }
+
+ private ChunkRange(Object chunkStart, Object chunkEnd) {
+ if (chunkStart != null || chunkEnd != null) {
+ checkArgument(
+ !Objects.equals(chunkStart, chunkEnd),
+ "Chunk start %s shouldn't be equal to chunk end %s",
+ chunkStart,
+ chunkEnd);
+ }
+ this.chunkStart = chunkStart;
+ this.chunkEnd = chunkEnd;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkSplitter.java
similarity index 63%
copy from
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
copy to
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkSplitter.java
index b073ccaf9..b02a891cf 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/ChunkSplitter.java
@@ -15,16 +15,18 @@
* limitations under the License.
*/
-package org.seatunnel.connectors.cdc.base.source.event;
+package org.seatunnel.connectors.cdc.base.source.enumerator.splitter;
-import org.apache.seatunnel.api.source.SourceEvent;
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
-import lombok.Data;
+import java.util.Collection;
-import java.util.List;
+/** The splitter used to split collection into a set of chunks. */
+public interface ChunkSplitter {
-@Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
- private static final long serialVersionUID = 1L;
- List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
+ /**
+ * Generates all snapshot splits (chunks) for the give data collection.
+ */
+ Collection<SnapshotSplit> generateSplits(TableId tableId);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
new file mode 100644
index 000000000..ad34a91b8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator.splitter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+
+import java.sql.SQLException;
+import java.util.Collection;
+
+/**
+ * The {@code ChunkSplitter} used to split table into a set of chunks for JDBC
data source.
+ */
+public interface JdbcSourceChunkSplitter extends ChunkSplitter {
+
+ /**
+ * Generates all snapshot splits (chunks) for the give table path.
+ */
+ @Override
+ Collection<SnapshotSplit> generateSplits(TableId tableId);
+
+ /**
+ * Query the maximum and minimum value of the column in the table. e.g.
query string <code>
+ * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+ *
+ * @param jdbc JDBC connection.
+ * @param tableId table identity.
+ * @param columnName column name.
+ * @return maximum and minimum value.
+ */
+ Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String
columnName) throws SQLException;
+
+ /**
+ * Query the minimum value of the column in the table, and the minimum
value must greater than
+ * the excludedLowerBound value. e.g. prepare query string <code>
+ * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
+ *
+ * @param jdbc JDBC connection.
+ * @param tableId table identity.
+ * @param columnName column name.
+ * @param excludedLowerBound the minimum value should be greater than this
value.
+ * @return minimum value.
+ */
+ Object queryMin(JdbcConnection jdbc, TableId tableId, String columnName,
Object excludedLowerBound)
+ throws SQLException;
+
+ /**
+ * Query the maximum value of the next chunk, and the next chunk must be
greater than or equal
+ * to <code>includedLowerBound</code> value [min_1, max_1), [min_2,
max_2),... [min_n, null).
+ * Each time this method is called it will return max1, max2...
+ *
+ * @param jdbc JDBC connection.
+ * @param tableId table identity.
+ * @param columnName column name.
+ * @param chunkSize chunk size.
+ * @param includedLowerBound the previous chunk end value.
+ * @return next chunk end value.
+ */
+ Object queryNextChunkMax(
+ JdbcConnection jdbc,
+ TableId tableId,
+ String columnName,
+ int chunkSize,
+ Object includedLowerBound)
+ throws SQLException;
+
+ /**
+ * Approximate total number of entries in the lookup table.
+ *
+ * @param jdbc JDBC connection.
+ * @param tableId table identity.
+ * @return approximate row count.
+ */
+ Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws
SQLException;
+
+ /**
+ * Build the scan query sql of the {@link SnapshotSplit}.
+ *
+ * @param tableId table identity.
+ * @param splitKeyType primary key type.
+ * @param isFirstSplit whether the first split.
+ * @param isLastSplit whether the last split.
+ * @return query sql.
+ */
+ String buildSplitScanQuery(TableId tableId, SeaTunnelRowType splitKeyType,
boolean isFirstSplit, boolean isLastSplit);
+
+ /**
+ * Checks whether split column is evenly distributed across its range.
+ *
+ * @param splitColumn split column.
+ * @return true that means split column with type BIGINT, INT, DECIMAL.
+ */
+ default boolean isEvenlySplitColumn(Column splitColumn) {
+ // currently, we only support these types.
+ switch (fromDbzColumn(splitColumn).getSqlType()) {
+ case TINYINT:
+ case SMALLINT:
+ case INT:
+ case BIGINT:
+ case DECIMAL:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Get a corresponding SeaTunnel data type from a debezium {@link Column}.
+ *
+ * @param splitColumn dbz split column.
+ * @return SeaTunnel data type
+ */
+ SeaTunnelDataType<?> fromDbzColumn(Column splitColumn);
+
+ /**
+ * convert dbz column to SeaTunnel row type.
+ *
+ * @param splitColumn split column.
+ * @return SeaTunnel row type.
+ */
+ default SeaTunnelRowType getSplitType(Column splitColumn) {
+ return new SeaTunnelRowType(new String[]{splitColumn.name()}, new
SeaTunnelDataType[]{fromDbzColumn(splitColumn)});
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/HybridPendingSplitsState.java
similarity index 70%
copy from
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
copy to
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/HybridPendingSplitsState.java
index b073ccaf9..6c72665a5 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/HybridPendingSplitsState.java
@@ -15,16 +15,13 @@
* limitations under the License.
*/
-package org.seatunnel.connectors.cdc.base.source.event;
-
-import org.apache.seatunnel.api.source.SourceEvent;
+package org.seatunnel.connectors.cdc.base.source.enumerator.state;
import lombok.Data;
-import java.util.List;
-
+/** A {@link PendingSplitsState} for pending hybrid (snapshot & incremental)
splits. */
@Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
- private static final long serialVersionUID = 1L;
- List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
+public class HybridPendingSplitsState implements PendingSplitsState {
+ private final SnapshotPhaseState snapshotPhaseState;
+ private final IncrementalPhaseState incrementalPhaseState;
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/IncrementalPhaseState.java
similarity index 71%
copy from
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
copy to
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/IncrementalPhaseState.java
index b073ccaf9..4b46ef41d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/IncrementalPhaseState.java
@@ -15,16 +15,13 @@
* limitations under the License.
*/
-package org.seatunnel.connectors.cdc.base.source.event;
-
-import org.apache.seatunnel.api.source.SourceEvent;
+package org.seatunnel.connectors.cdc.base.source.enumerator.state;
import lombok.Data;
-import java.util.List;
-
+/**
+ * A {@link PendingSplitsState} for pending incremental splits.
+ */
@Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
- private static final long serialVersionUID = 1L;
- List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
+public class IncrementalPhaseState implements PendingSplitsState {
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/PendingSplitsState.java
similarity index 69%
copy from
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
copy to
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/PendingSplitsState.java
index b073ccaf9..487055745 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/PendingSplitsState.java
@@ -15,16 +15,13 @@
* limitations under the License.
*/
-package org.seatunnel.connectors.cdc.base.source.event;
+package org.seatunnel.connectors.cdc.base.source.enumerator.state;
-import org.apache.seatunnel.api.source.SourceEvent;
+import java.io.Serializable;
-import lombok.Data;
-
-import java.util.List;
-
-@Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
- private static final long serialVersionUID = 1L;
- List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
+/**
+ * A checkpoint of the current state of the containing the currently pending
splits that are not yet
+ * assigned.
+ */
+public interface PendingSplitsState extends Serializable {
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/SnapshotPhaseState.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/SnapshotPhaseState.java
new file mode 100644
index 000000000..ab8b65738
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/enumerator/state/SnapshotPhaseState.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.enumerator.state;
+
+import io.debezium.relational.TableId;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+
+import java.util.List;
+import java.util.Map;
+
+/** A {@link PendingSplitsState} for pending snapshot splits. */
+@Getter
+@ToString
+@EqualsAndHashCode
+public class SnapshotPhaseState implements PendingSplitsState {
+
+ /** The tables in the checkpoint. */
+ private final List<TableId> remainingTables;
+
+ /**
+ * The paths that are no longer in the enumerator checkpoint, but have
been processed before and
+ * should this be ignored. Relevant only for sources in continuous
monitoring mode.
+ */
+ private final List<TableId> alreadyProcessedTables;
+
+ /** The splits in the checkpoint. */
+ private final List<SnapshotSplit> remainingSplits;
+
+ /**
+ * The snapshot splits that the {@link IncrementalSourceEnumerator} has
assigned to {@link
+ * IncrementalSourceSplitReader}s.
+ */
+ private final Map<String, SnapshotSplit> assignedSplits;
+
+ /**
+ * The offsets of completed (snapshot) splits that the {@link
IncrementalSourceEnumerator} has
+ * received from {@link IncrementalSourceSplitReader}s.
+ */
+ private final Map<String, Offset> splitCompletedOffsets;
+
+ /**
+ * Whether the snapshot split assigner is completed, which indicates there
is no more splits and
+ * all records of splits have been completely processed in the pipeline.
+ */
+ private final boolean isAssignerCompleted;
+
+ /** Whether the table identifier is case sensitive. */
+ private final boolean isTableIdCaseSensitive;
+
+ /** Whether the remaining tables are keep when snapshot state. */
+ private final boolean isRemainingTablesCheckpointed;
+
+ public SnapshotPhaseState(
+ List<TableId> alreadyProcessedTables,
+ List<SnapshotSplit> remainingSplits,
+ Map<String, SnapshotSplit> assignedSplits,
+ Map<String, Offset> splitCompletedOffsets,
+ boolean isAssignerCompleted,
+ List<TableId> remainingTables,
+ boolean isTableIdCaseSensitive,
+ boolean isRemainingTablesCheckpointed) {
+ this.alreadyProcessedTables = alreadyProcessedTables;
+ this.remainingSplits = remainingSplits;
+ this.assignedSplits = assignedSplits;
+ this.splitCompletedOffsets = splitCompletedOffsets;
+ this.isAssignerCompleted = isAssignerCompleted;
+ this.remainingTables = remainingTables;
+ this.isTableIdCaseSensitive = isTableIdCaseSensitive;
+ this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsAckEvent.java
similarity index 69%
copy from
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
copy to
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsAckEvent.java
index b073ccaf9..768ca5fc9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsAckEvent.java
@@ -20,11 +20,20 @@ package org.seatunnel.connectors.cdc.base.source.event;
import org.apache.seatunnel.api.source.SourceEvent;
import lombok.Data;
+import
org.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator;
import java.util.List;
+/**
+ * The {@link SourceEvent} that {@link IncrementalSourceEnumerator} sends to
{@link
+ * IncrementalSourceReader} to notify the completed snapshot splits has been
received, i.e.
+ * acknowledge for {@link CompletedSnapshotSplitsReportEvent}.
+ */
@Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
+public class CompletedSnapshotSplitsAckEvent implements SourceEvent {
+
private static final long serialVersionUID = 1L;
- List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
+
+ private final List<String> completedSplits;
+
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsReportEvent.java
similarity index 93%
rename from
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
rename to
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsReportEvent.java
index b073ccaf9..455d597c8 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitReportEvent.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/event/CompletedSnapshotSplitsReportEvent.java
@@ -24,7 +24,7 @@ import lombok.Data;
import java.util.List;
@Data
-public class CompletedSnapshotSplitReportEvent implements SourceEvent {
+public class CompletedSnapshotSplitsReportEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
List<SnapshotSplitWatermark> completedSnapshotSplitWatermarks;
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
index 60d82d260..88b40c528 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
@@ -30,5 +30,7 @@ public abstract class OffsetFactory {
public abstract Offset specific(Map<String, String> offset);
+ public abstract Offset specific(String filename, Long position);
+
public abstract Offset timstamp(long timestmap);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 3e3d31a5a..d91f653e8 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -28,13 +28,13 @@ import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.Si
import lombok.extern.slf4j.Slf4j;
import org.seatunnel.connectors.cdc.base.config.SourceConfig;
-import
org.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitReportEvent;
+import
org.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
import org.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
-import org.seatunnel.connectors.cdc.base.source.split.LogSplit;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-import org.seatunnel.connectors.cdc.base.source.split.state.LogSplitState;
+import
org.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
import org.seatunnel.connectors.cdc.base.source.split.state.SnapshotSplitState;
import
org.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
@@ -47,7 +47,7 @@ import java.util.function.Supplier;
/**
* The multi-parallel source reader for table snapshot phase from {@link
SnapshotSplit} and then
- * single-parallel source reader for table stream phase from {@link LogSplit}.
+ * single-parallel source reader for table stream phase from {@link
IncrementalSplit}.
*/
@Slf4j
public class IncrementalSourceReader<T, C extends SourceConfig>
@@ -56,7 +56,7 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
private final Map<String, SnapshotSplit> finishedUnackedSplits;
- private final Map<String, LogSplit> uncompletedStreamSplits;
+ private final Map<String, IncrementalSplit> uncompletedIncrementalSplits;
private final int subtaskId;
@@ -77,7 +77,7 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
context);
this.sourceConfig = sourceConfig;
this.finishedUnackedSplits = new HashMap<>();
- this.uncompletedStreamSplits = new HashMap<>();
+ this.uncompletedIncrementalSplits = new HashMap<>();
this.subtaskId = context.getIndexOfSubtask();
}
@@ -99,14 +99,14 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
unfinishedSplits.add(split);
}
} else {
- // the stream split is uncompleted
- uncompletedStreamSplits.put(split.splitId(),
split.asLogSplit());
- unfinishedSplits.add(split.asLogSplit());
+ // the incremental split is uncompleted
+ uncompletedIncrementalSplits.put(split.splitId(),
split.asIncrementalSplit());
+ unfinishedSplits.add(split.asIncrementalSplit());
}
}
// notify split enumerator again about the finished unacked snapshot
splits
reportFinishedSnapshotSplitsIfNeed();
- // add all un-finished splits (including stream split) to
SourceReaderBase
+ // add all un-finished splits (including incremental split) to
SourceReaderBase
super.addSplits(unfinishedSplits);
}
@@ -117,7 +117,7 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
checkState(
sourceSplit.isSnapshotSplit(),
String.format(
- "Only snapshot split could finish, but the actual split is
stream split %s",
+ "Only snapshot split could finish, but the actual split is
incremental split %s",
sourceSplit));
finishedUnackedSplits.put(sourceSplit.splitId(),
sourceSplit.asSnapshotSplit());
}
@@ -132,7 +132,7 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
for (SnapshotSplit split : finishedUnackedSplits.values()) {
completedSnapshotSplitWatermarks.add(new
SnapshotSplitWatermark(split.splitId(), split.getHighWatermark()));
}
- CompletedSnapshotSplitReportEvent reportEvent = new
CompletedSnapshotSplitReportEvent();
+ CompletedSnapshotSplitsReportEvent reportEvent = new
CompletedSnapshotSplitsReportEvent();
reportEvent.setCompletedSnapshotSplitWatermarks(completedSnapshotSplitWatermarks);
context.sendSourceEventToEnumerator(reportEvent);
//TODO need enumerator return ack
@@ -149,7 +149,7 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
if (split.isSnapshotSplit()) {
return new SnapshotSplitState(split.asSnapshotSplit());
} else {
- return new LogSplitState(split.asLogSplit());
+ return new IncrementalSplitState(split.asIncrementalSplit());
}
}
@@ -161,8 +161,8 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
// add finished snapshot splits that didn't receive ack yet
stateSplits.addAll(finishedUnackedSplits.values());
- // add stream splits who are uncompleted
- stateSplits.addAll(uncompletedStreamSplits.values());
+ // add incremental splits who are uncompleted
+ stateSplits.addAll(uncompletedIncrementalSplits.values());
return stateSplits;
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
index 663c0d727..462650d21 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -78,12 +78,12 @@ public class IncrementalSourceRecordEmitter<T>
if (isLowWatermarkEvent(element) &&
splitState.isSnapshotSplitState()) {
splitState.asSnapshotSplitState().setHighWatermark(watermark);
}
- } else if (isSchemaChangeEvent(element) &&
splitState.isLogSplitState()) {
+ } else if (isSchemaChangeEvent(element) &&
splitState.isIncrementalSplitState()) {
//TODO Currently not supported Schema Change
} else if (isDataChangeRecord(element)) {
- if (splitState.isLogSplitState()) {
+ if (splitState.isIncrementalSplitState()) {
Offset position = getOffsetPosition(element);
- splitState.asLogSplitState().setStartupOffset(position);
+
splitState.asIncrementalSplitState().setStartupOffset(position);
}
emitElement(element, output);
} else {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
index 0eb8e6699..9bfe4c49b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
@@ -123,9 +123,9 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
currentFetcher = new
IncrementalSourceScanFetcher(taskContext, subtaskId);
}
} else {
- // point from snapshot split to stream split
+ // point from snapshot split to incremental split
if (currentFetcher != null) {
- log.info("It's turn to read stream split, close current
snapshot fetcher.");
+ log.info("It's turn to read incremental split, close
current snapshot fetcher.");
currentFetcher.close();
}
final FetchTask.Context taskContext =
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
index a77f05a14..e5a5d94c2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
@@ -17,14 +17,14 @@
package org.seatunnel.connectors.cdc.base.source.reader.external;
-import org.seatunnel.connectors.cdc.base.source.split.LogSplit;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import java.util.Iterator;
/**
* Fetcher to fetch data of a table split, the split is either snapshot split
{@link SnapshotSplit}
- * or stream split {@link LogSplit}.
+ * or incremental split {@link IncrementalSplit}.
*/
public interface Fetcher<T, Split> {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 7952f5bd2..6b6e22104 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -26,7 +26,7 @@ import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.source.SourceRecord;
import org.seatunnel.connectors.cdc.base.source.offset.Offset;
-import org.seatunnel.connectors.cdc.base.source.split.LogSplit;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
@@ -43,7 +43,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
- * Fetcher to fetch data from table split, the split is the stream split
{@link LogSplit}.
+ * Fetcher to fetch data from table split, the split is the incremental split
{@link IncrementalSplit}.
*/
@Slf4j
public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords,
SourceSplitBase> {
@@ -56,7 +56,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
private FetchTask<SourceSplitBase> streamFetchTask;
- private LogSplit currentLogSplit;
+ private IncrementalSplit currentIncrementalSplit;
private Map<TableId, Offset> maxSplitHighWatermarkMap;
@@ -73,9 +73,9 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
@Override
public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
this.streamFetchTask = fetchTask;
- this.currentLogSplit = fetchTask.getSplit().asLogSplit();
+ this.currentIncrementalSplit =
fetchTask.getSplit().asIncrementalSplit();
configureFilter();
- taskContext.configure(currentLogSplit);
+ taskContext.configure(currentIncrementalSplit);
this.queue = taskContext.getQueue();
executorService.submit(
() -> {
@@ -84,8 +84,8 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
} catch (Exception e) {
log.error(
String.format(
- "Execute stream read task for stream split %s
fail",
- currentLogSplit),
+ "Execute stream read task for incremental split %s
fail",
+ currentIncrementalSplit),
e);
readException = e;
}
@@ -94,7 +94,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
@Override
public boolean isFinished() {
- return currentLogSplit == null || !streamFetchTask.isRunning();
+ return currentIncrementalSplit == null || !streamFetchTask.isRunning();
}
@Override
@@ -119,7 +119,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
throw new SeaTunnelException(
String.format(
"Read split %s error due to %s.",
- currentLogSplit, readException.getMessage()),
+ currentIncrementalSplit, readException.getMessage()),
readException);
}
}
@@ -144,7 +144,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
/**
* Returns the record should emit or not.
*
- * <p>The watermark signal algorithm is the stream split reader only sends
the change event that
+ * <p>The watermark signal algorithm is the incremental split reader only
sends the change event that
* belongs to its finished snapshot splits. For each snapshot split, the
change event is valid
* since the offset is after its high watermark.
*
@@ -188,8 +188,8 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
Map<TableId, Offset> tableIdOffsetPositionMap = new HashMap<>();
// latest-offset mode
- for (TableId tableId : currentLogSplit.getTableIds()) {
- tableIdOffsetPositionMap.put(tableId,
currentLogSplit.getStartupOffset());
+ for (TableId tableId : currentIncrementalSplit.getTableIds()) {
+ tableIdOffsetPositionMap.put(tableId,
currentIncrementalSplit.getStartupOffset());
}
this.maxSplitHighWatermarkMap = tableIdOffsetPositionMap;
this.pureStreamPhaseTables.clear();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
index 5b84daaa4..6f37776be 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
@@ -47,4 +47,13 @@ public class CompletedSnapshotSplitInfo implements
Serializable {
this.splitEnd = splitEnd;
this.watermark = watermark;
}
+
+ public SnapshotSplit asSnapshotSplit() {
+ return new SnapshotSplit(splitId,
+ tableId,
+ splitKeyType,
+ splitStart,
+ splitEnd,
+ watermark);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/LogSplit.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
similarity index 86%
rename from
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/LogSplit.java
rename to
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
index 267f08eca..de9f72ef8 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/LogSplit.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
@@ -24,15 +24,15 @@ import
org.seatunnel.connectors.cdc.base.source.offset.Offset;
import java.util.List;
@Getter
-public class LogSplit extends SourceSplitBase {
+public class IncrementalSplit extends SourceSplitBase {
/**
- * All the tables that this log split needs to capture.
+ * All the tables that this incremental split needs to capture.
*/
private final List<TableId> tableIds;
/**
- * Minimum watermark for SnapshotSplits for all tables in this LogSplit
+ * Minimum watermark for SnapshotSplits for all tables in this
IncrementalSplit
*/
private final Offset startupOffset;
@@ -42,12 +42,12 @@ public class LogSplit extends SourceSplitBase {
private final Offset stopOffset;
/**
- * SnapshotSplit information for all tables in this LogSplit.
+ * SnapshotSplit information for all tables in this IncrementalSplit.
* <br> Used to support Exactly-Once.
*/
private final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos;
- public LogSplit(
+ public IncrementalSplit(
String splitId,
List<TableId> capturedTables,
Offset startupOffset,
@@ -59,9 +59,4 @@ public class LogSplit extends SourceSplitBase {
this.stopOffset = stopOffset;
this.completedSnapshotSplitInfos = completedSnapshotSplitInfos;
}
-
- @Override
- public String splitId() {
- return this.splitId;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/SourceSplitBase.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/SourceSplitBase.java
index a53120a40..acf0c0402 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/SourceSplitBase.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/SourceSplitBase.java
@@ -36,9 +36,9 @@ public abstract class SourceSplitBase implements SourceSplit {
return getClass() == SnapshotSplit.class;
}
- /** Checks whether this split is a stream split. */
- public final boolean isLogSplit() {
- return getClass() == LogSplit.class;
+ /** Checks whether this split is an incremental split. */
+ public final boolean isIncrementalSplit() {
+ return getClass() == IncrementalSplit.class;
}
/** Casts this split into a {@link SnapshotSplit}. */
@@ -46,9 +46,9 @@ public abstract class SourceSplitBase implements SourceSplit {
return (SnapshotSplit) this;
}
- /** Casts this split into a {@link LogSplit}. */
- public final LogSplit asLogSplit() {
- return (LogSplit) this;
+ /** Casts this split into a {@link IncrementalSplit}. */
+ public final IncrementalSplit asIncrementalSplit() {
+ return (IncrementalSplit) this;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/LogSplitState.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
similarity index 77%
rename from
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/LogSplitState.java
rename to
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
index 6b5258b8b..6cf1e241d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/LogSplitState.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/IncrementalSplitState.java
@@ -21,7 +21,7 @@ import io.debezium.relational.TableId;
import lombok.Getter;
import lombok.Setter;
import org.seatunnel.connectors.cdc.base.source.offset.Offset;
-import org.seatunnel.connectors.cdc.base.source.split.LogSplit;
+import org.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import java.util.List;
@@ -30,12 +30,12 @@ import java.util.List;
*/
@Getter
@Setter
-public class LogSplitState extends SourceSplitStateBase {
+public class IncrementalSplitState extends SourceSplitStateBase {
private List<TableId> tableIds;
/**
- * Minimum watermark for SnapshotSplits for all tables in this LogSplit
+ * Minimum watermark for SnapshotSplits for all tables in this
IncrementalSplit
*/
private Offset startupOffset;
@@ -44,7 +44,7 @@ public class LogSplitState extends SourceSplitStateBase {
*/
private Offset stopOffset;
- public LogSplitState(LogSplit split) {
+ public IncrementalSplitState(IncrementalSplit split) {
super(split);
this.tableIds = split.getTableIds();
this.startupOffset = split.getStartupOffset();
@@ -52,14 +52,14 @@ public class LogSplitState extends SourceSplitStateBase {
}
@Override
- public LogSplit toSourceSplit() {
- final LogSplit logSplit = split.asLogSplit();
- return new LogSplit(
- logSplit.splitId(),
+ public IncrementalSplit toSourceSplit() {
+ final IncrementalSplit incrementalSplit = split.asIncrementalSplit();
+ return new IncrementalSplit(
+ incrementalSplit.splitId(),
getTableIds(),
getStartupOffset(),
getStopOffset(),
- logSplit.getCompletedSnapshotSplitInfos()
+ incrementalSplit.getCompletedSnapshotSplitInfos()
);
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/SourceSplitStateBase.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/SourceSplitStateBase.java
index 8c2ded141..b2af82c98 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/SourceSplitStateBase.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/split/state/SourceSplitStateBase.java
@@ -35,9 +35,9 @@ public abstract class SourceSplitStateBase {
return getClass() == SnapshotSplitState.class;
}
- /** Checks whether this split state is a stream split state. */
- public final boolean isLogSplitState() {
- return getClass() == LogSplitState.class;
+ /** Checks whether this split state is a incremental split state. */
+ public final boolean isIncrementalSplitState() {
+ return getClass() == IncrementalSplitState.class;
}
/** Casts this split state into a {@link SnapshotSplitState}. */
@@ -45,9 +45,9 @@ public abstract class SourceSplitStateBase {
return (SnapshotSplitState) this;
}
- /** Casts this split state into a {@link LogSplitState}. */
- public final LogSplitState asLogSplitState() {
- return (LogSplitState) this;
+ /** Casts this split state into a {@link IncrementalSplitState}. */
+ public final IncrementalSplitState asIncrementalSplitState() {
+ return (IncrementalSplitState) this;
}
/** Use the current split state to create a new SourceSplit. */