This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3e6a20acfa [Improve][CDC] Use Source to output the CatalogTable (#5626)
3e6a20acfa is described below
commit 3e6a20acfa94c1601efe461971b3d94cd90dfcf1
Author: hailin0 <[email protected]>
AuthorDate: Tue Oct 17 17:56:53 2023 +0800
[Improve][CDC] Use Source to output the CatalogTable (#5626)
---
docs/en/connector-v2/formats/cdc-compatible-debezium-json.md | 11 -----------
.../connectors/cdc/base/source/IncrementalSource.java | 12 ++++++++++++
.../src/test/resources/mysqlcdc_to_mysql.conf | 3 ---
3 files changed, 12 insertions(+), 14 deletions(-)
diff --git a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
index e0751a2492..4660dacf22 100644
--- a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
+++ b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
@@ -21,9 +21,6 @@ source {
base-url="jdbc:mysql://localhost:3306/test"
"startup.mode"=INITIAL
- catalog {
- factory=MySQL
- }
table-names=[
"database1.t1",
"database1.t2",
@@ -41,14 +38,6 @@ source {
# topic prefix
database.server.name = "mysql_cdc_1"
}
- # compatible_debezium_json fixed schema
- schema = {
- fields = {
- topic = string
- key = string
- value = string
- }
- }
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index ff13fc30b1..4c8ee235a9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -29,12 +29,14 @@ import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
@@ -56,14 +58,17 @@ import
org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import
org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
+import
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
import com.google.common.collect.Sets;
import io.debezium.relational.TableId;
import lombok.NoArgsConstructor;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -145,6 +150,13 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
@Override
public List<CatalogTable> getProducedCatalogTables() {
+ if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
+ readonlyConfig.get(JdbcSourceOptions.FORMAT))) {
+ return Collections.singletonList(
+ CatalogTableUtil.getCatalogTable(
+ "default.default",
+
CompatibleDebeziumJsonDeserializationSchema.DEBEZIUM_DATA_ROW_TYPE));
+ }
return catalogTables;
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
index 50aa6b5e38..7c9cf2f079 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
@@ -35,9 +35,6 @@ source {
password = "seatunnel"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
- catalog {
- factory = MySQL
- }
}
}