This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2238fda30 [feature][cdc] Fixed error in mysql cdc under real-time job 
(#3666)
2238fda30 is described below

commit 2238fda30029120cc7c9478a6029cfcca1080a43
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Dec 7 21:10:13 2022 +0800

    [feature][cdc] Fixed error in mysql cdc under real-time job (#3666)
    
    * [feature][cdc] Fixed error in mysql cdc under real-time job
    
    * [chore] license header
---
 plugin-mapping.properties                          |   3 +-
 .../main/java/io/debezium/relational/TableId.java  | 280 +++++++++++++++++++++
 .../cdc/base/config/JdbcSourceConfigFactory.java   |  10 +-
 .../source/enumerator/HybridSplitAssigner.java     |   2 +-
 .../enumerator/IncrementalSourceEnumerator.java    |   9 +-
 .../enumerator/IncrementalSplitAssigner.java       |   5 +-
 .../source/enumerator/SnapshotSplitAssigner.java   |   4 +
 .../cdc/base/source/offset/OffsetFactory.java      |   3 +-
 .../source/reader/IncrementalSourceReader.java     |  13 +
 .../cdc/base/source/split/IncrementalSplit.java    |   1 +
 .../cdc/base/source/split/SnapshotSplit.java       |   1 +
 .../cdc/debezium/EmbeddedDatabaseHistory.java      | 153 +++++++++++
 .../cdc/mysql/config/MySqlSourceConfigFactory.java |   8 +
 .../seatunnel/cdc/mysql/source/MySqlDialect.java   |  16 +-
 .../cdc/mysql/source/MySqlIncrementalSource.java   |   4 +-
 .../reader/fetch/MySqlSourceFetchTaskContext.java  |  14 +-
 .../common/source/reader/SourceReaderBase.java     |   6 +-
 seatunnel-dist/pom.xml                             |   5 +
 18 files changed, 525 insertions(+), 12 deletions(-)

diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index e73f8c283..0a8d16d70 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -156,4 +156,5 @@ seatunnel.source.Jira = connector-http-jira
 seatunnel.source.Gitlab = connector-http-gitlab
 seatunnel.sink.RabbitMQ = connector-rabbitmq
 seatunnel.source.RabbitMQ = connector-rabbitmq
-seatunnel.source.OpenMldb = connector-openmldb
\ No newline at end of file
+seatunnel.source.OpenMldb = connector-openmldb
+seatunnel.source.MySQL-CDC = connector-cdc-mysql
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/io/debezium/relational/TableId.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/io/debezium/relational/TableId.java
new file mode 100644
index 000000000..91448d51e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/io/debezium/relational/TableId.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.relational;
+
+import io.debezium.annotation.Immutable;
+import io.debezium.relational.Selectors.TableIdToStringMapper;
+import io.debezium.schema.DataCollectionId;
+
+import java.io.Serializable;
+
+/**
+ * Unique identifier for a database table.
+ *
+ */
+@Immutable
+public final class TableId implements DataCollectionId, Comparable<TableId>, 
Serializable {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Parse the supplied string, extracting up to the first 3 parts into a 
TableID.
+     *
+     * @param str the string representation of the table identifier; may not 
be null
+     * @return the table ID, or null if it could not be parsed
+     */
+    public static TableId parse(String str) {
+        return parse(str, true);
+    }
+
+    /**
+     * Parse the supplied string, extracting up to the first 3 parts into a 
TableID.
+     *
+     * @param str the string representation of the table identifier; may not 
be null
+     * @param useCatalogBeforeSchema {@code true} if the parsed string 
contains only 2 items and the first should be used as
+     *            the catalog and the second as the table name, or {@code 
false} if the first should be used as the schema and the
+     *            second as the table name
+     * @return the table ID, or null if it could not be parsed
+     */
+    public static TableId parse(String str, boolean useCatalogBeforeSchema) {
+        String[] parts = TableIdParser.parse(str).toArray(new String[0]);
+
+        return TableId.parse(parts, parts.length, useCatalogBeforeSchema);
+    }
+
+    /**
+     * Parse the supplied string, extracting up to the first 3 parts into a 
TableID.
+     *
+     * @param parts the parts of the identifier; may not be null
+     * @param numParts the number of parts to use for the table identifier
+     * @param useCatalogBeforeSchema {@code true} if the parsed string 
contains only 2 items and the first should be used as
+     *            the catalog and the second as the table name, or {@code 
false} if the first should be used as the schema and the
+     *            second as the table name
+     * @return the table ID, or null if it could not be parsed
+     */
+    protected static TableId parse(String[] parts, int numParts, boolean 
useCatalogBeforeSchema) {
+        if (numParts == 0) {
+            return null;
+        }
+        if (numParts == 1) {
+            return new TableId(null, null, parts[0]); // table only
+        }
+        if (numParts == 2) {
+            if (useCatalogBeforeSchema) {
+                return new TableId(parts[0], null, parts[1]); // catalog & 
table only
+            }
+            return new TableId(null, parts[0], parts[1]); // schema & table 
only
+        }
+        return new TableId(parts[0], parts[1], parts[2]); // catalog, schema & 
table
+    }
+
+    private final String catalogName;
+    private final String schemaName;
+    private final String tableName;
+    private final String id;
+
+    /**
+     * Create a new table identifier.
+     *
+     * @param catalogName the name of the database catalog that contains the 
table; may be null if the JDBC driver does not
+     *            show a schema for this table
+     * @param schemaName the name of the database schema that contains the 
table; may be null if the JDBC driver does not
+     *            show a schema for this table
+     * @param tableName the name of the table; may not be null
+     * @param tableIdMapper the customization of fully quailified table name
+     */
+    public TableId(String catalogName, String schemaName, String tableName, 
TableIdToStringMapper tableIdMapper) {
+        this.catalogName = catalogName;
+        this.schemaName = schemaName;
+        this.tableName = tableName;
+        assert this.tableName != null;
+        this.id = tableIdMapper == null ? tableId(this.catalogName, 
this.schemaName, this.tableName) : tableIdMapper.toString(this);
+    }
+
+    /**
+     * Create a new table identifier.
+     *
+     * @param catalogName the name of the database catalog that contains the 
table; may be null if the JDBC driver does not
+     *            show a schema for this table
+     * @param schemaName the name of the database schema that contains the 
table; may be null if the JDBC driver does not
+     *            show a schema for this table
+     * @param tableName the name of the table; may not be null
+     */
+    public TableId(String catalogName, String schemaName, String tableName) {
+        this(catalogName, schemaName, tableName, null);
+    }
+
+    /**
+     * Get the name of the JDBC catalog.
+     *
+     * @return the catalog name, or null if the table does not belong to a 
catalog
+     */
+    public String catalog() {
+        return catalogName;
+    }
+
+    /**
+     * Get the name of the JDBC schema.
+     *
+     * @return the JDBC schema name, or null if the table does not belong to a 
JDBC schema
+     */
+    public String schema() {
+        return schemaName;
+    }
+
+    /**
+     * Get the name of the table.
+     *
+     * @return the table name; never null
+     */
+    public String table() {
+        return tableName;
+    }
+
+    @Override
+    public String identifier() {
+        return id;
+    }
+
+    @Override
+    public int compareTo(TableId that) {
+        if (this == that) {
+            return 0;
+        }
+        return this.id.compareTo(that.id);
+    }
+
+    public int compareToIgnoreCase(TableId that) {
+        if (this == that) {
+            return 0;
+        }
+        return this.id.compareToIgnoreCase(that.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return id.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof TableId) {
+            return this.compareTo((TableId) obj) == 0;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return identifier();
+    }
+
+    /**
+     * Returns a dot-separated String representation of this identifier, 
quoting all
+     * name parts with the {@code "} char.
+     */
+    public String toDoubleQuotedString() {
+        return toQuotedString('"');
+    }
+
+    /**
+     * Returns a new {@link TableId} with all parts of the identifier using 
{@code "} character.
+     */
+    public TableId toDoubleQuoted() {
+        return toQuoted('"');
+    }
+
+    /**
+     * Returns a new {@link TableId} that has all parts of the identifier 
quoted.
+     *
+     * @param quotingChar the character to be used to quote the identifier 
parts.
+     */
+    public TableId toQuoted(char quotingChar) {
+        String catalogName = null;
+        if (this.catalogName != null && !this.catalogName.isEmpty()) {
+            catalogName = quote(this.catalogName, quotingChar);
+        }
+
+        String schemaName = null;
+        if (this.schemaName != null && !this.schemaName.isEmpty()) {
+            schemaName = quote(this.schemaName, quotingChar);
+        }
+
+        return new TableId(catalogName, schemaName, quote(this.tableName, 
quotingChar));
+    }
+
+    /**
+     * Returns a dot-separated String representation of this identifier, 
quoting all
+     * name parts with the given quoting char.
+     */
+    public String toQuotedString(char quotingChar) {
+        StringBuilder quoted = new StringBuilder();
+
+        if (catalogName != null && !catalogName.isEmpty()) {
+            quoted.append(quote(catalogName, quotingChar)).append(".");
+        }
+
+        if (schemaName != null && !schemaName.isEmpty()) {
+            quoted.append(quote(schemaName, quotingChar)).append(".");
+        }
+
+        quoted.append(quote(tableName, quotingChar));
+
+        return quoted.toString();
+    }
+
+    private static String tableId(String catalog, String schema, String table) 
{
+        if (catalog == null || catalog.length() == 0) {
+            if (schema == null || schema.length() == 0) {
+                return table;
+            }
+            return schema + "." + table;
+        }
+        if (schema == null || schema.length() == 0) {
+            return catalog + "." + table;
+        }
+        return catalog + "." + schema + "." + table;
+    }
+
+    /**
+     * Quotes the given identifier part, e.g. schema or table name.
+     */
+    private static String quote(String identifierPart, char quotingChar) {
+        if (identifierPart == null) {
+            return null;
+        }
+
+        if (identifierPart.isEmpty()) {
+            return new 
StringBuilder().append(quotingChar).append(quotingChar).toString();
+        }
+
+        if (identifierPart.charAt(0) != quotingChar && 
identifierPart.charAt(identifierPart.length() - 1) != quotingChar) {
+            identifierPart = identifierPart.replace(quotingChar + "", 
repeat(quotingChar));
+            identifierPart = quotingChar + identifierPart + quotingChar;
+        }
+
+        return identifierPart;
+    }
+
+    private static String repeat(char quotingChar) {
+        return new 
StringBuilder().append(quotingChar).append(quotingChar).toString();
+    }
+
+    public TableId toLowercase() {
+        return new TableId(catalogName, schemaName, tableName.toLowerCase());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index 0f98969ce..8263863f2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -179,13 +179,21 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
         return this;
     }
 
+    /** Specifies the stop options. */
+    public JdbcSourceConfigFactory stopOptions(StopConfig stopConfig) {
+        this.stopConfig = stopConfig;
+        return this;
+    }
+
     public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
         this.port = config.get(JdbcSourceOptions.PORT);
         this.hostname = config.get(JdbcSourceOptions.HOSTNAME);
+        this.username = config.get(JdbcSourceOptions.USERNAME);
         this.password = config.get(JdbcSourceOptions.PASSWORD);
         // TODO: support multi-table
         this.databaseList = 
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME));
-        this.tableList = 
Collections.singletonList(config.get(JdbcSourceOptions.TABLE_NAME));
+        this.tableList = 
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME)
+            + "." + config.get(JdbcSourceOptions.TABLE_NAME));
         this.distributionFactorUpper = 
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
         this.distributionFactorLower = 
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
         this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
index fcb71cc90..e0dde4a64 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
@@ -111,7 +111,7 @@ public class HybridSplitAssigner<C extends SourceConfig> 
implements SplitAssigne
             // we need to wait snapshot-assigner to be completed before
             // assigning the incremental split. Otherwise, records emitted 
from incremental split
             // might be out-of-order in terms of same primary key with 
snapshot splits.
-            return snapshotSplitAssigner.getNext();
+            return incrementalSplitAssigner.getNext();
         }
         // no more splits for the assigner
         return Optional.empty();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
index 3cbebae4c..cee57bdd0 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -51,12 +51,14 @@ public class IncrementalSourceEnumerator
      */
     private final TreeSet<Integer> readersAwaitingSplit;
 
+    private volatile boolean running;
     public IncrementalSourceEnumerator(
             SourceSplitEnumerator.Context<SourceSplitBase> context,
             SplitAssigner splitAssigner) {
         this.context = context;
         this.splitAssigner = splitAssigner;
         this.readersAwaitingSplit = new TreeSet<>();
+        this.running = false;
     }
 
     @Override
@@ -66,7 +68,8 @@ public class IncrementalSourceEnumerator
 
     @Override
     public void run() throws Exception {
-
+        this.running = true;
+        assignSplits();
     }
 
     @Override
@@ -77,7 +80,9 @@ public class IncrementalSourceEnumerator
         }
 
         readersAwaitingSplit.add(subtaskId);
-        assignSplits();
+        if (running) {
+            assignSplits();
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
index fdbadf0c7..ac47bcee4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
@@ -124,6 +124,8 @@ public class IncrementalSplitAssigner<C extends 
SourceConfig> implements SplitAs
     @Override
     public void onCompletedSplits(List<SnapshotSplitWatermark> 
completedSplitWatermarks) {
         // do nothing
+        completedSplitWatermarks.forEach(watermark ->
+            context.getSplitCompletedOffsets().put(watermark.getSplitId(), 
watermark.getHighWatermark()));
     }
 
     @Override
@@ -205,7 +207,8 @@ public class IncrementalSplitAssigner<C extends 
SourceConfig> implements SplitAs
         }
         for (TableId tableId : capturedTables) {
             Offset watermark = tableWatermarks.get(tableId);
-            if (minOffset == null || watermark.isBefore(minOffset)) {
+            if (minOffset == null ||
+                (watermark != null && watermark.isBefore(minOffset))) {
                 minOffset = watermark;
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
index a2612df7e..1c76fff6f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
@@ -148,12 +148,16 @@ public class SnapshotSplitAssigner<C extends 
SourceConfig> implements SplitAssig
 
     @Override
     public Optional<SourceSplitBase> getNext() {
+        if (chunkSplitter == null) {
+            return Optional.empty();
+        }
         if (!remainingSplits.isEmpty()) {
             // return remaining splits firstly
             Iterator<SnapshotSplit> iterator = remainingSplits.iterator();
             SnapshotSplit split = iterator.next();
             iterator.remove();
             assignedSplits.put(split.splitId(), split);
+            context.getAssignedSnapshotSplit().put(split.splitId(), split);
             return Optional.of(split);
         } else {
             // it's turn for new table
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
index 542bdb549..fc73afbf3 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
@@ -17,9 +17,10 @@
 
 package org.apache.seatunnel.connectors.cdc.base.source.offset;
 
+import java.io.Serializable;
 import java.util.Map;
 
-public abstract class OffsetFactory {
+public abstract class OffsetFactory implements Serializable {
     public OffsetFactory() {}
 
     public abstract Offset earliest();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 002cdcd93..2a0dbbb78 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.cdc.base.source.reader;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import 
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
@@ -58,6 +59,7 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
 
     private final Map<String, IncrementalSplit> uncompletedIncrementalSplits;
 
+    private volatile boolean running = false;
     private final int subtaskId;
 
     private final C sourceConfig;
@@ -81,6 +83,17 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
         this.subtaskId = context.getIndexOfSubtask();
     }
 
+    @Override
+    public void pollNext(Collector<T> output) throws Exception {
+        if (!running) {
+            if (getNumberOfCurrentlyAssignedSplits() == 0) {
+                context.sendSplitRequest();
+            }
+            running = true;
+        }
+        super.pollNext(output);
+    }
+
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
index fd3c56f24..838dab250 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 @Getter
 public class IncrementalSplit extends SourceSplitBase {
+    private static final long serialVersionUID = 1L;
 
     /**
      * All the tables that this incremental split needs to capture.
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
index f2033da01..733f8832f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
@@ -25,6 +25,7 @@ import lombok.Getter;
 
 @Getter
 public class SnapshotSplit extends SourceSplitBase {
+    private static final long serialVersionUID = 1L;
     private final TableId tableId;
     private final SeaTunnelRowType splitKeyType;
     private final Object splitStart;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/EmbeddedDatabaseHistory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/EmbeddedDatabaseHistory.java
new file mode 100644
index 000000000..263de0b53
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/EmbeddedDatabaseHistory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium;
+
+import 
org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
+
+import io.debezium.config.Configuration;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.relational.ddl.DdlParser;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A {@link DatabaseHistory} implementation which store the latest table 
schema in Flink state.
+ *
+ * <p>It stores/recovers history using data offered by {@link 
SourceSplitStateBase}.
+ */
+public class EmbeddedDatabaseHistory implements DatabaseHistory {
+
+    public static final String DATABASE_HISTORY_INSTANCE_NAME = 
"database.history.instance.name";
+
+    public static final ConcurrentMap<String, Collection<TableChange>> 
TABLE_SCHEMAS =
+            new ConcurrentHashMap<>();
+
+    private Map<TableId, TableChange> tableSchemas;
+    private DatabaseHistoryListener listener;
+    private boolean storeOnlyMonitoredTablesDdl;
+    private boolean skipUnparseableDDL;
+
+    @Override
+    public void configure(
+            Configuration config,
+            HistoryRecordComparator comparator,
+            DatabaseHistoryListener listener,
+            boolean useCatalogBeforeSchema) {
+        this.listener = listener;
+        this.storeOnlyMonitoredTablesDdl = 
config.getBoolean(STORE_ONLY_MONITORED_TABLES_DDL);
+        this.skipUnparseableDDL = 
config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);
+
+        // recover
+        String instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
+        this.tableSchemas = new HashMap<>();
+        for (TableChange tableChange : removeHistory(instanceName)) {
+            tableSchemas.put(tableChange.getId(), tableChange);
+        }
+    }
+
+    @Override
+    public void start() {
+        listener.started();
+    }
+
+    @Override
+    public void record(
+            Map<String, ?> source, Map<String, ?> position, String 
databaseName, String ddl)
+            throws DatabaseHistoryException {
+        throw new UnsupportedOperationException("should not call here, error");
+    }
+
+    @Override
+    public void record(
+            Map<String, ?> source,
+            Map<String, ?> position,
+            String databaseName,
+            String schemaName,
+            String ddl,
+            TableChanges changes)
+            throws DatabaseHistoryException {
+        final HistoryRecord record =
+                new HistoryRecord(source, position, databaseName, schemaName, 
ddl, changes);
+        listener.onChangeApplied(record);
+    }
+
+    @Override
+    public void recover(
+            Map<String, ?> source, Map<String, ?> position, Tables schema, 
DdlParser ddlParser) {
+        listener.recoveryStarted();
+        for (TableChange tableChange : tableSchemas.values()) {
+            schema.overwriteTable(tableChange.getTable());
+        }
+        listener.recoveryStopped();
+    }
+
+    @Override
+    public void stop() {
+        listener.stopped();
+    }
+
+    @Override
+    public boolean exists() {
+        return true;
+    }
+
+    @Override
+    public boolean storageExists() {
+        return true;
+    }
+
+    @Override
+    public void initializeStorage() {
+        // do nothing
+    }
+
+    @Override
+    public boolean storeOnlyCapturedTables() {
+        return storeOnlyMonitoredTablesDdl;
+    }
+
+    @Override
+    public boolean skipUnparseableDdlStatements() {
+        return skipUnparseableDDL;
+    }
+
+    public static void registerHistory(String engineName, 
Collection<TableChange> engineHistory) {
+        TABLE_SCHEMAS.put(engineName, engineHistory);
+    }
+
+    public static Collection<TableChange> removeHistory(String engineName) {
+        if (engineName == null) {
+            return Collections.emptyList();
+        }
+        Collection<TableChange> tableChanges = 
TABLE_SCHEMAS.remove(engineName);
+        return tableChanges != null ? tableChanges : Collections.emptyList();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
index 437fc2d41..1f67a6f56 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -20,11 +20,13 @@ package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
+import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
 
 import io.debezium.config.Configuration;
 import io.debezium.connector.mysql.MySqlConnectorConfig;
 
 import java.util.Properties;
+import java.util.UUID;
 
 /** A factory to initialize {@link MySqlSourceConfig}. */
 public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory {
@@ -63,6 +65,12 @@ public class MySqlSourceConfigFactory extends 
JdbcSourceConfigFactory {
         props.setProperty("database.responseBuffering", "adaptive");
         props.setProperty("database.serverTimezone", serverTimeZone);
 
+        // database history
+        props.setProperty("database.history", 
EmbeddedDatabaseHistory.class.getCanonicalName());
+        props.setProperty("database.history.instance.name", UUID.randomUUID() 
+ "_" + subtaskId);
+        props.setProperty("database.history.skip.unparseable.ddl", 
String.valueOf(true));
+        props.setProperty("database.history.refer.ddl", String.valueOf(true));
+
         props.setProperty("connect.timeout.ms", 
String.valueOf(connectTimeout.toMillis()));
         // the underlying debezium reader should always capture the schema 
changes and forward them.
         // Note: the includeSchemaChanges parameter is used to control 
emitting the schema record,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
index 313a0d5ca..590a000ff 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -27,6 +27,8 @@ import 
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
@@ -44,6 +46,7 @@ import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
 
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 
 /** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
@@ -108,8 +111,19 @@ public class MySqlDialect implements JdbcDataSourceDialect 
{
                 createMySqlConnection(taskSourceConfig.getDbzConfiguration());
         final BinaryLogClient binaryLogClient =
                 createBinaryClient(taskSourceConfig.getDbzConfiguration());
+        List<TableChanges.TableChange> tableChangeList = new ArrayList<>();
+        // TODO: support save table schema
+        if (sourceSplitBase instanceof SnapshotSplit) {
+            SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
+            tableChangeList.add(queryTableSchema(jdbcConnection, 
snapshotSplit.getTableId()));
+        } else {
+            IncrementalSplit incrementalSplit = (IncrementalSplit) 
sourceSplitBase;
+            for (TableId tableId : incrementalSplit.getTableIds()) {
+                tableChangeList.add(queryTableSchema(jdbcConnection, tableId));
+            }
+        }
         return new MySqlSourceFetchTaskContext(
-                taskSourceConfig, this, jdbcConnection, binaryLogClient);
+                taskSourceConfig, this, jdbcConnection, binaryLogClient, 
tableChangeList);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index be3cfbc4f..bac7186bf 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -52,6 +52,8 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
         MySqlSourceConfigFactory configFactory = new 
MySqlSourceConfigFactory();
         configFactory.serverId(config.get(JdbcSourceOptions.SERVER_ID));
         configFactory.fromReadonlyConfig(readonlyConfig);
+        configFactory.startupOptions(startupConfig);
+        configFactory.stopOptions(stopConfig);
         return configFactory;
     }
 
@@ -63,7 +65,7 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
         // TODO: support multi-table
         // TODO: support metadata keys
         MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", 
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(), 
jdbcSourceConfig.getPassword(), baseUrl);
-        CatalogTable table = 
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0), 
jdbcSourceConfig.getTableList().get(0)));
+        CatalogTable table = 
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0), 
config.get(JdbcSourceOptions.TABLE_NAME)));
         SeaTunnelRowType physicalRowType = 
table.getTableSchema().toPhysicalRowDataType();
         String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
         return (DebeziumDeserializationSchema<T>) 
SeaTunnelRowDebeziumDeserializeSchema.builder()
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index d1c9b63be..04c2f1b3b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispat
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
 import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
@@ -51,6 +52,7 @@ import io.debezium.pipeline.spi.OffsetContext;
 import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import io.debezium.relational.Tables;
+import io.debezium.relational.history.TableChanges;
 import io.debezium.schema.DataCollectionId;
 import io.debezium.schema.TopicSelector;
 import io.debezium.util.Collect;
@@ -60,6 +62,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Instant;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -83,15 +86,18 @@ public class MySqlSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
     private ChangeEventQueue<DataChangeEvent> queue;
     private MySqlErrorHandler errorHandler;
 
+    private Collection<TableChanges.TableChange> engineHistory;
     public MySqlSourceFetchTaskContext(
         JdbcSourceConfig sourceConfig,
         JdbcDataSourceDialect dataSourceDialect,
         MySqlConnection connection,
-        BinaryLogClient binaryLogClient) {
+        BinaryLogClient binaryLogClient,
+        Collection<TableChanges.TableChange> engineHistory) {
         super(sourceConfig, dataSourceDialect);
         this.connection = connection;
         this.binaryLogClient = binaryLogClient;
         this.metadataProvider = new MySqlEventMetadataProvider();
+        this.engineHistory = engineHistory;
     }
 
     @Override
@@ -100,7 +106,11 @@ public class MySqlSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
         final MySqlConnectorConfig connectorConfig = getDbzConnectorConfig();
         final boolean tableIdCaseInsensitive = 
connection.isTableIdCaseSensitive();
         this.topicSelector = 
MySqlTopicSelector.defaultSelector(connectorConfig);
-
+        EmbeddedDatabaseHistory.registerHistory(
+            sourceConfig
+                .getDbzConfiguration()
+                
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
+            engineHistory);
         this.databaseSchema =
             MySqlUtils.createMySqlDatabaseSchema(connectorConfig, 
tableIdCaseInsensitive);
         this.offsetContext =
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
index 9c81c9b73..c7ce52b83 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
@@ -155,7 +155,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
         splitFetcherManager.checkErrors();
         RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
         if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, 
output)) {
-            log.info("Current fetch is finished.");
+            log.debug("Current fetch is finished.");
             return null;
         }
 
@@ -196,6 +196,10 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
         fetch.recycle();
     }
 
+    public int getNumberOfCurrentlyAssignedSplits() {
+        return this.splitStates.size();
+    }
+
     /**
      * Handles the finished splits to clean the state if needed.
      *
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index bbbd55f29..5d94efc78 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -380,6 +380,11 @@
                     <artifactId>connector-openmldb</artifactId>
                     <version>${project.version}</version>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-cdc-mysql</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
             </dependencies>
         </profile>
         <profile>


Reply via email to