This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 10cb84435b [Fix][Connector-V2] Update catalog table schema of debezium
json (#9525)
10cb84435b is described below
commit 10cb84435bad9792333ff01fef7523f70c8e778c
Author: corgy-w <[email protected]>
AuthorDate: Thu Jul 10 22:36:27 2025 +0800
[Fix][Connector-V2] Update catalog table schema of debezium json (#9525)
---
.../cdc/base/source/IncrementalSource.java | 5 +-
.../debezium/format/DebeziumJsonFormatTest.java | 122 +++++++++++++++++++++
2 files changed, 126 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index b75900c438..153514a53f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -128,7 +128,10 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
readonlyConfig.get(JdbcSourceOptions.FORMAT))) {
return Collections.singletonList(
CatalogTableUtil.getCatalogTable(
- "default.default",
+ "schema",
+ "default",
+ "default",
+ "default",
CompatibleDebeziumJsonDeserializationSchema.DEBEZIUM_DATA_ROW_TYPE));
}
return catalogTables;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
new file mode 100644
index 0000000000..adb5b7389d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/format/DebeziumJsonFormatTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium.format;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+class DebeziumJsonFormatTest {
+
+ public static final SingleChoiceOption STARTUP_MODE =
+ Options.key(SourceOptions.STARTUP_MODE_KEY)
+ .singleChoice(
+ StartupMode.class,
+ Arrays.asList(
+ StartupMode.INITIAL,
+ StartupMode.EARLIEST,
+ StartupMode.LATEST,
+ StartupMode.SPECIFIC))
+ .defaultValue(StartupMode.INITIAL)
+ .withDescription(
+ "Optional startup mode for CDC source, valid
enumerations are "
+ + "\"initial\", \"earliest\", \"latest\"
or \"specific\"");
+
+ public static final SingleChoiceOption STOP_MODE =
+ Options.key(SourceOptions.STOP_MODE_KEY)
+ .singleChoice(
+ StopMode.class,
+ Arrays.asList(StopMode.LATEST, StopMode.SPECIFIC,
StopMode.NEVER))
+ .defaultValue(StopMode.NEVER)
+ .withDescription(
+ "Optional stop mode for CDC source, valid
enumerations are "
+ + "\"never\", \"latest\" or \"specific\"");
+
+ static class TestIncrementalSource extends IncrementalSource<Object,
SourceConfig> {
+ public TestIncrementalSource(ReadonlyConfig options,
List<CatalogTable> catalogTables) {
+ super(options, catalogTables);
+ }
+
+ @Override
+ public Option<StartupMode> getStartupModeOption() {
+ return STARTUP_MODE;
+ }
+
+ @Override
+ public Option<StopMode> getStopModeOption() {
+ return STOP_MODE;
+ }
+
+ @Override
+ public SourceConfig.Factory<SourceConfig>
createSourceConfigFactory(ReadonlyConfig config) {
+ return null;
+ }
+
+ @Override
+ public DebeziumDeserializationSchema<Object>
createDebeziumDeserializationSchema(
+ ReadonlyConfig config) {
+ return null;
+ }
+
+ @Override
+ public DataSourceDialect<SourceConfig>
createDataSourceDialect(ReadonlyConfig config) {
+ return null;
+ }
+
+ @Override
+ public OffsetFactory createOffsetFactory(ReadonlyConfig config) {
+ return null;
+ }
+
+ @Override
+ public String getPluginName() {
+ return "";
+ }
+ }
+
+ @Test
+ void testGetProducedCatalogTablesWithCompatibleDebeziumJson() {
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ Collections.singletonMap(
+ JdbcSourceOptions.FORMAT.key(),
"compatible_debezium_json"));
+ TestIncrementalSource source = new TestIncrementalSource(config,
Collections.emptyList());
+ List<CatalogTable> tables = source.getProducedCatalogTables();
+ Assertions.assertEquals(1, tables.size());
+ Assertions.assertEquals(
+ "default.default.default",
tables.get(0).getTableId().toTablePath().getFullName());
+ }
+}