This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2238fda30 [feature][cdc] Fixed error in mysql cdc under real-time job
(#3666)
2238fda30 is described below
commit 2238fda30029120cc7c9478a6029cfcca1080a43
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Dec 7 21:10:13 2022 +0800
[feature][cdc] Fixed error in mysql cdc under real-time job (#3666)
* [feature][cdc] Fixed error in mysql cdc under real-time job
* [chore] license header
---
plugin-mapping.properties | 3 +-
.../main/java/io/debezium/relational/TableId.java | 280 +++++++++++++++++++++
.../cdc/base/config/JdbcSourceConfigFactory.java | 10 +-
.../source/enumerator/HybridSplitAssigner.java | 2 +-
.../enumerator/IncrementalSourceEnumerator.java | 9 +-
.../enumerator/IncrementalSplitAssigner.java | 5 +-
.../source/enumerator/SnapshotSplitAssigner.java | 4 +
.../cdc/base/source/offset/OffsetFactory.java | 3 +-
.../source/reader/IncrementalSourceReader.java | 13 +
.../cdc/base/source/split/IncrementalSplit.java | 1 +
.../cdc/base/source/split/SnapshotSplit.java | 1 +
.../cdc/debezium/EmbeddedDatabaseHistory.java | 153 +++++++++++
.../cdc/mysql/config/MySqlSourceConfigFactory.java | 8 +
.../seatunnel/cdc/mysql/source/MySqlDialect.java | 16 +-
.../cdc/mysql/source/MySqlIncrementalSource.java | 4 +-
.../reader/fetch/MySqlSourceFetchTaskContext.java | 14 +-
.../common/source/reader/SourceReaderBase.java | 6 +-
seatunnel-dist/pom.xml | 5 +
18 files changed, 525 insertions(+), 12 deletions(-)
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index e73f8c283..0a8d16d70 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -156,4 +156,5 @@ seatunnel.source.Jira = connector-http-jira
seatunnel.source.Gitlab = connector-http-gitlab
seatunnel.sink.RabbitMQ = connector-rabbitmq
seatunnel.source.RabbitMQ = connector-rabbitmq
-seatunnel.source.OpenMldb = connector-openmldb
\ No newline at end of file
+seatunnel.source.OpenMldb = connector-openmldb
+seatunnel.source.MySQL-CDC = connector-cdc-mysql
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/io/debezium/relational/TableId.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/io/debezium/relational/TableId.java
new file mode 100644
index 000000000..91448d51e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/io/debezium/relational/TableId.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.relational;
+
+import io.debezium.annotation.Immutable;
+import io.debezium.relational.Selectors.TableIdToStringMapper;
+import io.debezium.schema.DataCollectionId;
+
+import java.io.Serializable;
+
+/**
+ * Unique identifier for a database table.
+ *
+ */
+@Immutable
+public final class TableId implements DataCollectionId, Comparable<TableId>,
Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Parse the supplied string, extracting up to the first 3 parts into a
TableID.
+ *
+ * @param str the string representation of the table identifier; may not
be null
+ * @return the table ID, or null if it could not be parsed
+ */
+ public static TableId parse(String str) {
+ return parse(str, true);
+ }
+
+ /**
+ * Parse the supplied string, extracting up to the first 3 parts into a
TableID.
+ *
+ * @param str the string representation of the table identifier; may not
be null
+ * @param useCatalogBeforeSchema {@code true} if the parsed string
contains only 2 items and the first should be used as
+ * the catalog and the second as the table name, or {@code
false} if the first should be used as the schema and the
+ * second as the table name
+ * @return the table ID, or null if it could not be parsed
+ */
+ public static TableId parse(String str, boolean useCatalogBeforeSchema) {
+ String[] parts = TableIdParser.parse(str).toArray(new String[0]);
+
+ return TableId.parse(parts, parts.length, useCatalogBeforeSchema);
+ }
+
+ /**
+ * Parse the supplied string, extracting up to the first 3 parts into a
TableID.
+ *
+ * @param parts the parts of the identifier; may not be null
+ * @param numParts the number of parts to use for the table identifier
+ * @param useCatalogBeforeSchema {@code true} if the parsed string
contains only 2 items and the first should be used as
+ * the catalog and the second as the table name, or {@code
false} if the first should be used as the schema and the
+ * second as the table name
+ * @return the table ID, or null if it could not be parsed
+ */
+ protected static TableId parse(String[] parts, int numParts, boolean
useCatalogBeforeSchema) {
+ if (numParts == 0) {
+ return null;
+ }
+ if (numParts == 1) {
+ return new TableId(null, null, parts[0]); // table only
+ }
+ if (numParts == 2) {
+ if (useCatalogBeforeSchema) {
+ return new TableId(parts[0], null, parts[1]); // catalog &
table only
+ }
+ return new TableId(null, parts[0], parts[1]); // schema & table
only
+ }
+ return new TableId(parts[0], parts[1], parts[2]); // catalog, schema &
table
+ }
+
+ private final String catalogName;
+ private final String schemaName;
+ private final String tableName;
+ private final String id;
+
+ /**
+ * Create a new table identifier.
+ *
+ * @param catalogName the name of the database catalog that contains the
table; may be null if the JDBC driver does not
+ * show a schema for this table
+ * @param schemaName the name of the database schema that contains the
table; may be null if the JDBC driver does not
+ * show a schema for this table
+ * @param tableName the name of the table; may not be null
+ * @param tableIdMapper the customization of fully quailified table name
+ */
+ public TableId(String catalogName, String schemaName, String tableName,
TableIdToStringMapper tableIdMapper) {
+ this.catalogName = catalogName;
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ assert this.tableName != null;
+ this.id = tableIdMapper == null ? tableId(this.catalogName,
this.schemaName, this.tableName) : tableIdMapper.toString(this);
+ }
+
+ /**
+ * Create a new table identifier.
+ *
+ * @param catalogName the name of the database catalog that contains the
table; may be null if the JDBC driver does not
+ * show a schema for this table
+ * @param schemaName the name of the database schema that contains the
table; may be null if the JDBC driver does not
+ * show a schema for this table
+ * @param tableName the name of the table; may not be null
+ */
+ public TableId(String catalogName, String schemaName, String tableName) {
+ this(catalogName, schemaName, tableName, null);
+ }
+
+ /**
+ * Get the name of the JDBC catalog.
+ *
+ * @return the catalog name, or null if the table does not belong to a
catalog
+ */
+ public String catalog() {
+ return catalogName;
+ }
+
+ /**
+ * Get the name of the JDBC schema.
+ *
+ * @return the JDBC schema name, or null if the table does not belong to a
JDBC schema
+ */
+ public String schema() {
+ return schemaName;
+ }
+
+ /**
+ * Get the name of the table.
+ *
+ * @return the table name; never null
+ */
+ public String table() {
+ return tableName;
+ }
+
+ @Override
+ public String identifier() {
+ return id;
+ }
+
+ @Override
+ public int compareTo(TableId that) {
+ if (this == that) {
+ return 0;
+ }
+ return this.id.compareTo(that.id);
+ }
+
+ public int compareToIgnoreCase(TableId that) {
+ if (this == that) {
+ return 0;
+ }
+ return this.id.compareToIgnoreCase(that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof TableId) {
+ return this.compareTo((TableId) obj) == 0;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return identifier();
+ }
+
+ /**
+ * Returns a dot-separated String representation of this identifier,
quoting all
+ * name parts with the {@code "} char.
+ */
+ public String toDoubleQuotedString() {
+ return toQuotedString('"');
+ }
+
+ /**
+ * Returns a new {@link TableId} with all parts of the identifier using
{@code "} character.
+ */
+ public TableId toDoubleQuoted() {
+ return toQuoted('"');
+ }
+
+ /**
+ * Returns a new {@link TableId} that has all parts of the identifier
quoted.
+ *
+ * @param quotingChar the character to be used to quote the identifier
parts.
+ */
+ public TableId toQuoted(char quotingChar) {
+ String catalogName = null;
+ if (this.catalogName != null && !this.catalogName.isEmpty()) {
+ catalogName = quote(this.catalogName, quotingChar);
+ }
+
+ String schemaName = null;
+ if (this.schemaName != null && !this.schemaName.isEmpty()) {
+ schemaName = quote(this.schemaName, quotingChar);
+ }
+
+ return new TableId(catalogName, schemaName, quote(this.tableName,
quotingChar));
+ }
+
+ /**
+ * Returns a dot-separated String representation of this identifier,
quoting all
+ * name parts with the given quoting char.
+ */
+ public String toQuotedString(char quotingChar) {
+ StringBuilder quoted = new StringBuilder();
+
+ if (catalogName != null && !catalogName.isEmpty()) {
+ quoted.append(quote(catalogName, quotingChar)).append(".");
+ }
+
+ if (schemaName != null && !schemaName.isEmpty()) {
+ quoted.append(quote(schemaName, quotingChar)).append(".");
+ }
+
+ quoted.append(quote(tableName, quotingChar));
+
+ return quoted.toString();
+ }
+
+ private static String tableId(String catalog, String schema, String table)
{
+ if (catalog == null || catalog.length() == 0) {
+ if (schema == null || schema.length() == 0) {
+ return table;
+ }
+ return schema + "." + table;
+ }
+ if (schema == null || schema.length() == 0) {
+ return catalog + "." + table;
+ }
+ return catalog + "." + schema + "." + table;
+ }
+
+ /**
+ * Quotes the given identifier part, e.g. schema or table name.
+ */
+ private static String quote(String identifierPart, char quotingChar) {
+ if (identifierPart == null) {
+ return null;
+ }
+
+ if (identifierPart.isEmpty()) {
+ return new
StringBuilder().append(quotingChar).append(quotingChar).toString();
+ }
+
+ if (identifierPart.charAt(0) != quotingChar &&
identifierPart.charAt(identifierPart.length() - 1) != quotingChar) {
+ identifierPart = identifierPart.replace(quotingChar + "",
repeat(quotingChar));
+ identifierPart = quotingChar + identifierPart + quotingChar;
+ }
+
+ return identifierPart;
+ }
+
+ private static String repeat(char quotingChar) {
+ return new
StringBuilder().append(quotingChar).append(quotingChar).toString();
+ }
+
+ public TableId toLowercase() {
+ return new TableId(catalogName, schemaName, tableName.toLowerCase());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index 0f98969ce..8263863f2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -179,13 +179,21 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
return this;
}
+ /** Specifies the stop options. */
+ public JdbcSourceConfigFactory stopOptions(StopConfig stopConfig) {
+ this.stopConfig = stopConfig;
+ return this;
+ }
+
public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
this.port = config.get(JdbcSourceOptions.PORT);
this.hostname = config.get(JdbcSourceOptions.HOSTNAME);
+ this.username = config.get(JdbcSourceOptions.USERNAME);
this.password = config.get(JdbcSourceOptions.PASSWORD);
// TODO: support multi-table
this.databaseList =
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME));
- this.tableList =
Collections.singletonList(config.get(JdbcSourceOptions.TABLE_NAME));
+ this.tableList =
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME)
+ + "." + config.get(JdbcSourceOptions.TABLE_NAME));
this.distributionFactorUpper =
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
this.distributionFactorLower =
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
index fcb71cc90..e0dde4a64 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java
@@ -111,7 +111,7 @@ public class HybridSplitAssigner<C extends SourceConfig>
implements SplitAssigne
// we need to wait snapshot-assigner to be completed before
// assigning the incremental split. Otherwise, records emitted
from incremental split
// might be out-of-order in terms of same primary key with
snapshot splits.
- return snapshotSplitAssigner.getNext();
+ return incrementalSplitAssigner.getNext();
}
// no more splits for the assigner
return Optional.empty();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
index 3cbebae4c..cee57bdd0 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -51,12 +51,14 @@ public class IncrementalSourceEnumerator
*/
private final TreeSet<Integer> readersAwaitingSplit;
+ private volatile boolean running;
public IncrementalSourceEnumerator(
SourceSplitEnumerator.Context<SourceSplitBase> context,
SplitAssigner splitAssigner) {
this.context = context;
this.splitAssigner = splitAssigner;
this.readersAwaitingSplit = new TreeSet<>();
+ this.running = false;
}
@Override
@@ -66,7 +68,8 @@ public class IncrementalSourceEnumerator
@Override
public void run() throws Exception {
-
+ this.running = true;
+ assignSplits();
}
@Override
@@ -77,7 +80,9 @@ public class IncrementalSourceEnumerator
}
readersAwaitingSplit.add(subtaskId);
- assignSplits();
+ if (running) {
+ assignSplits();
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
index fdbadf0c7..ac47bcee4 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
@@ -124,6 +124,8 @@ public class IncrementalSplitAssigner<C extends
SourceConfig> implements SplitAs
@Override
public void onCompletedSplits(List<SnapshotSplitWatermark>
completedSplitWatermarks) {
// do nothing
+ completedSplitWatermarks.forEach(watermark ->
+ context.getSplitCompletedOffsets().put(watermark.getSplitId(),
watermark.getHighWatermark()));
}
@Override
@@ -205,7 +207,8 @@ public class IncrementalSplitAssigner<C extends
SourceConfig> implements SplitAs
}
for (TableId tableId : capturedTables) {
Offset watermark = tableWatermarks.get(tableId);
- if (minOffset == null || watermark.isBefore(minOffset)) {
+ if (minOffset == null ||
+ (watermark != null && watermark.isBefore(minOffset))) {
minOffset = watermark;
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
index a2612df7e..1c76fff6f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/SnapshotSplitAssigner.java
@@ -148,12 +148,16 @@ public class SnapshotSplitAssigner<C extends
SourceConfig> implements SplitAssig
@Override
public Optional<SourceSplitBase> getNext() {
+ if (chunkSplitter == null) {
+ return Optional.empty();
+ }
if (!remainingSplits.isEmpty()) {
// return remaining splits firstly
Iterator<SnapshotSplit> iterator = remainingSplits.iterator();
SnapshotSplit split = iterator.next();
iterator.remove();
assignedSplits.put(split.splitId(), split);
+ context.getAssignedSnapshotSplit().put(split.splitId(), split);
return Optional.of(split);
} else {
// it's turn for new table
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
index 542bdb549..fc73afbf3 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/offset/OffsetFactory.java
@@ -17,9 +17,10 @@
package org.apache.seatunnel.connectors.cdc.base.source.offset;
+import java.io.Serializable;
import java.util.Map;
-public abstract class OffsetFactory {
+public abstract class OffsetFactory implements Serializable {
public OffsetFactory() {}
public abstract Offset earliest();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 002cdcd93..2a0dbbb78 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.cdc.base.source.reader;
import static com.google.common.base.Preconditions.checkState;
+import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import
org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
@@ -58,6 +59,7 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
private final Map<String, IncrementalSplit> uncompletedIncrementalSplits;
+ private volatile boolean running = false;
private final int subtaskId;
private final C sourceConfig;
@@ -81,6 +83,17 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
this.subtaskId = context.getIndexOfSubtask();
}
+ @Override
+ public void pollNext(Collector<T> output) throws Exception {
+ if (!running) {
+ if (getNumberOfCurrentlyAssignedSplits() == 0) {
+ context.sendSplitRequest();
+ }
+ running = true;
+ }
+ super.pollNext(output);
+ }
+
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
index fd3c56f24..838dab250 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/IncrementalSplit.java
@@ -26,6 +26,7 @@ import java.util.List;
@Getter
public class IncrementalSplit extends SourceSplitBase {
+ private static final long serialVersionUID = 1L;
/**
* All the tables that this incremental split needs to capture.
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
index f2033da01..733f8832f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
@@ -25,6 +25,7 @@ import lombok.Getter;
@Getter
public class SnapshotSplit extends SourceSplitBase {
+ private static final long serialVersionUID = 1L;
private final TableId tableId;
private final SeaTunnelRowType splitKeyType;
private final Object splitStart;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/EmbeddedDatabaseHistory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/EmbeddedDatabaseHistory.java
new file mode 100644
index 000000000..263de0b53
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/EmbeddedDatabaseHistory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium;
+
+import
org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
+
+import io.debezium.config.Configuration;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.relational.ddl.DdlParser;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A {@link DatabaseHistory} implementation which store the latest table
schema in Flink state.
+ *
+ * <p>It stores/recovers history using data offered by {@link
SourceSplitStateBase}.
+ */
+public class EmbeddedDatabaseHistory implements DatabaseHistory {
+
+ public static final String DATABASE_HISTORY_INSTANCE_NAME =
"database.history.instance.name";
+
+ public static final ConcurrentMap<String, Collection<TableChange>>
TABLE_SCHEMAS =
+ new ConcurrentHashMap<>();
+
+ private Map<TableId, TableChange> tableSchemas;
+ private DatabaseHistoryListener listener;
+ private boolean storeOnlyMonitoredTablesDdl;
+ private boolean skipUnparseableDDL;
+
+ @Override
+ public void configure(
+ Configuration config,
+ HistoryRecordComparator comparator,
+ DatabaseHistoryListener listener,
+ boolean useCatalogBeforeSchema) {
+ this.listener = listener;
+ this.storeOnlyMonitoredTablesDdl =
config.getBoolean(STORE_ONLY_MONITORED_TABLES_DDL);
+ this.skipUnparseableDDL =
config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);
+
+ // recover
+ String instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
+ this.tableSchemas = new HashMap<>();
+ for (TableChange tableChange : removeHistory(instanceName)) {
+ tableSchemas.put(tableChange.getId(), tableChange);
+ }
+ }
+
+ @Override
+ public void start() {
+ listener.started();
+ }
+
+ @Override
+ public void record(
+ Map<String, ?> source, Map<String, ?> position, String
databaseName, String ddl)
+ throws DatabaseHistoryException {
+ throw new UnsupportedOperationException("should not call here, error");
+ }
+
+ @Override
+ public void record(
+ Map<String, ?> source,
+ Map<String, ?> position,
+ String databaseName,
+ String schemaName,
+ String ddl,
+ TableChanges changes)
+ throws DatabaseHistoryException {
+ final HistoryRecord record =
+ new HistoryRecord(source, position, databaseName, schemaName,
ddl, changes);
+ listener.onChangeApplied(record);
+ }
+
+ @Override
+ public void recover(
+ Map<String, ?> source, Map<String, ?> position, Tables schema,
DdlParser ddlParser) {
+ listener.recoveryStarted();
+ for (TableChange tableChange : tableSchemas.values()) {
+ schema.overwriteTable(tableChange.getTable());
+ }
+ listener.recoveryStopped();
+ }
+
+ @Override
+ public void stop() {
+ listener.stopped();
+ }
+
+ @Override
+ public boolean exists() {
+ return true;
+ }
+
+ @Override
+ public boolean storageExists() {
+ return true;
+ }
+
+ @Override
+ public void initializeStorage() {
+ // do nothing
+ }
+
+ @Override
+ public boolean storeOnlyCapturedTables() {
+ return storeOnlyMonitoredTablesDdl;
+ }
+
+ @Override
+ public boolean skipUnparseableDdlStatements() {
+ return skipUnparseableDDL;
+ }
+
+ public static void registerHistory(String engineName,
Collection<TableChange> engineHistory) {
+ TABLE_SCHEMAS.put(engineName, engineHistory);
+ }
+
+ public static Collection<TableChange> removeHistory(String engineName) {
+ if (engineName == null) {
+ return Collections.emptyList();
+ }
+ Collection<TableChange> tableChanges =
TABLE_SCHEMAS.remove(engineName);
+ return tableChanges != null ? tableChanges : Collections.emptyList();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
index 437fc2d41..1f67a6f56 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -20,11 +20,13 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
+import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import java.util.Properties;
+import java.util.UUID;
/** A factory to initialize {@link MySqlSourceConfig}. */
public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory {
@@ -63,6 +65,12 @@ public class MySqlSourceConfigFactory extends
JdbcSourceConfigFactory {
props.setProperty("database.responseBuffering", "adaptive");
props.setProperty("database.serverTimezone", serverTimeZone);
+ // database history
+ props.setProperty("database.history",
EmbeddedDatabaseHistory.class.getCanonicalName());
+ props.setProperty("database.history.instance.name", UUID.randomUUID()
+ "_" + subtaskId);
+ props.setProperty("database.history.skip.unparseable.ddl",
String.valueOf(true));
+ props.setProperty("database.history.refer.ddl", String.valueOf(true));
+
props.setProperty("connect.timeout.ms",
String.valueOf(connectTimeout.toMillis()));
// the underlying debezium reader should always capture the schema
changes and forward them.
// Note: the includeSchemaChanges parameter is used to control
emitting the schema record,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
index 313a0d5ca..590a000ff 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -27,6 +27,8 @@ import
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
@@ -44,6 +46,7 @@ import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.List;
/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
@@ -108,8 +111,19 @@ public class MySqlDialect implements JdbcDataSourceDialect
{
createMySqlConnection(taskSourceConfig.getDbzConfiguration());
final BinaryLogClient binaryLogClient =
createBinaryClient(taskSourceConfig.getDbzConfiguration());
+ List<TableChanges.TableChange> tableChangeList = new ArrayList<>();
+ // TODO: support save table schema
+ if (sourceSplitBase instanceof SnapshotSplit) {
+ SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
+ tableChangeList.add(queryTableSchema(jdbcConnection,
snapshotSplit.getTableId()));
+ } else {
+ IncrementalSplit incrementalSplit = (IncrementalSplit)
sourceSplitBase;
+ for (TableId tableId : incrementalSplit.getTableIds()) {
+ tableChangeList.add(queryTableSchema(jdbcConnection, tableId));
+ }
+ }
return new MySqlSourceFetchTaskContext(
- taskSourceConfig, this, jdbcConnection, binaryLogClient);
+ taskSourceConfig, this, jdbcConnection, binaryLogClient,
tableChangeList);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index be3cfbc4f..bac7186bf 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -52,6 +52,8 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
MySqlSourceConfigFactory configFactory = new
MySqlSourceConfigFactory();
configFactory.serverId(config.get(JdbcSourceOptions.SERVER_ID));
configFactory.fromReadonlyConfig(readonlyConfig);
+ configFactory.startupOptions(startupConfig);
+ configFactory.stopOptions(stopConfig);
return configFactory;
}
@@ -63,7 +65,7 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
// TODO: support multi-table
// TODO: support metadata keys
MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql",
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(),
jdbcSourceConfig.getPassword(), baseUrl);
- CatalogTable table =
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0),
jdbcSourceConfig.getTableList().get(0)));
+ CatalogTable table =
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0),
config.get(JdbcSourceOptions.TABLE_NAME)));
SeaTunnelRowType physicalRowType =
table.getTableSchema().toPhysicalRowDataType();
String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
return (DebeziumDeserializationSchema<T>)
SeaTunnelRowDebeziumDeserializeSchema.builder()
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index d1c9b63be..04c2f1b3b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispat
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
@@ -51,6 +52,7 @@ import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
+import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
@@ -60,6 +62,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -83,15 +86,18 @@ public class MySqlSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
private ChangeEventQueue<DataChangeEvent> queue;
private MySqlErrorHandler errorHandler;
+ private Collection<TableChanges.TableChange> engineHistory;
public MySqlSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
MySqlConnection connection,
- BinaryLogClient binaryLogClient) {
+ BinaryLogClient binaryLogClient,
+ Collection<TableChanges.TableChange> engineHistory) {
super(sourceConfig, dataSourceDialect);
this.connection = connection;
this.binaryLogClient = binaryLogClient;
this.metadataProvider = new MySqlEventMetadataProvider();
+ this.engineHistory = engineHistory;
}
@Override
@@ -100,7 +106,11 @@ public class MySqlSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
final MySqlConnectorConfig connectorConfig = getDbzConnectorConfig();
final boolean tableIdCaseInsensitive =
connection.isTableIdCaseSensitive();
this.topicSelector =
MySqlTopicSelector.defaultSelector(connectorConfig);
-
+ EmbeddedDatabaseHistory.registerHistory(
+ sourceConfig
+ .getDbzConfiguration()
+
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
+ engineHistory);
this.databaseSchema =
MySqlUtils.createMySqlDatabaseSchema(connectorConfig,
tableIdCaseInsensitive);
this.offsetContext =
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
index 9c81c9b73..c7ce52b83 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
@@ -155,7 +155,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends
SourceSplit, SplitSt
splitFetcherManager.checkErrors();
RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId,
output)) {
- log.info("Current fetch is finished.");
+ log.debug("Current fetch is finished.");
return null;
}
@@ -196,6 +196,10 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
fetch.recycle();
}
+ public int getNumberOfCurrentlyAssignedSplits() {
+ return this.splitStates.size();
+ }
+
/**
* Handles the finished splits to clean the state if needed.
*
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index bbbd55f29..5d94efc78 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -380,6 +380,11 @@
<artifactId>connector-openmldb</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-mysql</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</profile>
<profile>