This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 10cb84435b [Fix][Connector-V2] Update catalog table schema of debezium 
json (#9525)
10cb84435b is described below

commit 10cb84435bad9792333ff01fef7523f70c8e778c
Author: corgy-w <[email protected]>
AuthorDate: Thu Jul 10 22:36:27 2025 +0800

    [Fix][Connector-V2] Update catalog table schema of debezium json (#9525)
---
 .../cdc/base/source/IncrementalSource.java         |   5 +-
 .../debezium/format/DebeziumJsonFormatTest.java    | 122 +++++++++++++++++++++
 2 files changed, 126 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index b75900c438..153514a53f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -128,7 +128,10 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
                 readonlyConfig.get(JdbcSourceOptions.FORMAT))) {
             return Collections.singletonList(
                     CatalogTableUtil.getCatalogTable(
-                            "default.default",
+                            "schema",
+                            "default",
+                            "default",
+                            "default",
                             
CompatibleDebeziumJsonDeserializationSchema.DEBEZIUM_DATA_ROW_TYPE));
         }
         return catalogTables;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
new file mode 100644
index 0000000000..adb5b7389d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium.format;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+class DebeziumJsonFormatTest {
+
+    public static final SingleChoiceOption STARTUP_MODE =
+            Options.key(SourceOptions.STARTUP_MODE_KEY)
+                    .singleChoice(
+                            StartupMode.class,
+                            Arrays.asList(
+                                    StartupMode.INITIAL,
+                                    StartupMode.EARLIEST,
+                                    StartupMode.LATEST,
+                                    StartupMode.SPECIFIC))
+                    .defaultValue(StartupMode.INITIAL)
+                    .withDescription(
+                            "Optional startup mode for CDC source, valid 
enumerations are "
+                                    + "\"initial\", \"earliest\", \"latest\" 
or \"specific\"");
+
+    public static final SingleChoiceOption STOP_MODE =
+            Options.key(SourceOptions.STOP_MODE_KEY)
+                    .singleChoice(
+                            StopMode.class,
+                            Arrays.asList(StopMode.LATEST, StopMode.SPECIFIC, 
StopMode.NEVER))
+                    .defaultValue(StopMode.NEVER)
+                    .withDescription(
+                            "Optional stop mode for CDC source, valid 
enumerations are "
+                                    + "\"never\", \"latest\" or \"specific\"");
+
+    static class TestIncrementalSource extends IncrementalSource<Object, 
SourceConfig> {
+        public TestIncrementalSource(ReadonlyConfig options, 
List<CatalogTable> catalogTables) {
+            super(options, catalogTables);
+        }
+
+        @Override
+        public Option<StartupMode> getStartupModeOption() {
+            return STARTUP_MODE;
+        }
+
+        @Override
+        public Option<StopMode> getStopModeOption() {
+            return STOP_MODE;
+        }
+
+        @Override
+        public SourceConfig.Factory<SourceConfig> 
createSourceConfigFactory(ReadonlyConfig config) {
+            return null;
+        }
+
+        @Override
+        public DebeziumDeserializationSchema<Object> 
createDebeziumDeserializationSchema(
+                ReadonlyConfig config) {
+            return null;
+        }
+
+        @Override
+        public DataSourceDialect<SourceConfig> 
createDataSourceDialect(ReadonlyConfig config) {
+            return null;
+        }
+
+        @Override
+        public OffsetFactory createOffsetFactory(ReadonlyConfig config) {
+            return null;
+        }
+
+        @Override
+        public String getPluginName() {
+            return "";
+        }
+    }
+
+    @Test
+    void testGetProducedCatalogTablesWithCompatibleDebeziumJson() {
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                JdbcSourceOptions.FORMAT.key(), 
"compatible_debezium_json"));
+        TestIncrementalSource source = new TestIncrementalSource(config, 
Collections.emptyList());
+        List<CatalogTable> tables = source.getProducedCatalogTables();
+        Assertions.assertEquals(1, tables.size());
+        Assertions.assertEquals(
+                "default.default.default", 
tables.get(0).getTableId().toTablePath().getFullName());
+    }
+}

Reply via email to