This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3e6a20acfa [Improve][CDC] Use Source to output the CatalogTable (#5626)
3e6a20acfa is described below

commit 3e6a20acfa94c1601efe461971b3d94cd90dfcf1
Author: hailin0 <[email protected]>
AuthorDate: Tue Oct 17 17:56:53 2023 +0800

    [Improve][CDC] Use Source to output the CatalogTable (#5626)
---
 docs/en/connector-v2/formats/cdc-compatible-debezium-json.md | 11 -----------
 .../connectors/cdc/base/source/IncrementalSource.java        | 12 ++++++++++++
 .../src/test/resources/mysqlcdc_to_mysql.conf                |  3 ---
 3 files changed, 12 insertions(+), 14 deletions(-)

diff --git a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md 
b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
index e0751a2492..4660dacf22 100644
--- a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
+++ b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
@@ -21,9 +21,6 @@ source {
 
     base-url="jdbc:mysql://localhost:3306/test"
     "startup.mode"=INITIAL
-    catalog {
-        factory=MySQL
-    }
     table-names=[
         "database1.t1",
         "database1.t2",
@@ -41,14 +38,6 @@ source {
         # topic prefix
         database.server.name =  "mysql_cdc_1"
     }
-    # compatible_debezium_json fixed schema
-    schema = {
-        fields = {
-            topic = string
-            key = string
-            value = string
-        }
-    }
   }
 }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index ff13fc30b1..4c8ee235a9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -29,12 +29,14 @@ import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
 import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
 import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
@@ -56,14 +58,17 @@ import 
org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 import 
org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
 import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
+import 
org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
 
 import com.google.common.collect.Sets;
 import io.debezium.relational.TableId;
 import lombok.NoArgsConstructor;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -145,6 +150,13 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
 
     @Override
     public List<CatalogTable> getProducedCatalogTables() {
+        if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
+                readonlyConfig.get(JdbcSourceOptions.FORMAT))) {
+            return Collections.singletonList(
+                    CatalogTableUtil.getCatalogTable(
+                            "default.default",
+                            
CompatibleDebeziumJsonDeserializationSchema.DEBEZIUM_DATA_ROW_TYPE));
+        }
         return catalogTables;
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
index 50aa6b5e38..7c9cf2f079 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf
@@ -35,9 +35,6 @@ source {
     password = "seatunnel"
     table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
-    catalog {
-      factory = MySQL
-    }
   }
 }
 

Reply via email to