This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 29cf3a76c7 [Fix][Connector-V2] Fix postgres cdc with debezium_json
format can not parse number without scale (#9052)
29cf3a76c7 is described below
commit 29cf3a76c7f778b258324e11513eb83b71b3245d
Author: Daniel Duan <[email protected]>
AuthorDate: Wed Apr 9 17:16:00 2025 +0800
[Fix][Connector-V2] Fix postgres cdc with debezium_json format can not
parse number without scale (#9052)
---
.../postgres/source/PostgresIncrementalSource.java | 12 ++
.../connector-cdc-postgres-e2e/pom.xml | 14 ++
.../seatunnel/cdc/postgres/PostgresCDCIT.java | 180 ++++++++++++++++++++-
.../src/test/resources/ddl/inventory.sql | 44 ++++-
...grescdc_to_postgres_with_debezium_to_kafka.conf | 67 ++++++++
5 files changed, 308 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
index 2454671267..f47f4d04e2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
@@ -32,6 +32,8 @@ import
org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
+import
org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffsetFactory;
@@ -93,11 +95,21 @@ public class PostgresIncrementalSource<T> extends
IncrementalSource<T, JdbcSourc
@Override
public DebeziumDeserializationSchema<T>
createDebeziumDeserializationSchema(
ReadonlyConfig config) {
+ Map<TableId, Struct> tableIdTableChangeMap = tableChanges();
+ if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
+ config.get(JdbcSourceOptions.FORMAT))) {
+ return (DebeziumDeserializationSchema<T>)
+ new DebeziumJsonDeserializeSchema(
+ config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES),
+ tableIdTableChangeMap);
+ }
+
String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
return (DebeziumDeserializationSchema<T>)
SeaTunnelRowDebeziumDeserializeSchema.builder()
.setTables(catalogTables)
.setServerTimeZone(ZoneId.of(zoneId))
+ .setTableIdTableChangeMap(tableIdTableChangeMap)
.build();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
index bb152c2795..b64cb088d3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
@@ -74,6 +74,20 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-kafka</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<!-- fix CVE-2022-26520
https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520 -->
<groupId>org.postgresql</groupId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index d171d10405..6be7bd9377 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -32,6 +32,17 @@ import
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
+
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -40,10 +51,12 @@ import org.junit.jupiter.api.TestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
+import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
@@ -59,19 +72,24 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertNotNull;
+import static org.awaitility.Awaitility.given;
@Slf4j
@DisabledOnContainer(
@@ -99,8 +117,21 @@ public class PostgresCDCIT extends TestSuiteBase implements
TestResource {
private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
"full_types_no_primary_key";
+ private static final String SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM =
+ "full_types_no_primary_key_with_debezium";
+
private static final String SOURCE_SQL_TEMPLATE = "select * from %s.%s
order by id";
+ // kafka container
+ private static final String KAFKA_IMAGE_NAME =
"confluentinc/cp-kafka:7.0.9";
+
+ private static final String KAFKA_HOST = "kafka_e2e";
+
+ private static KafkaContainer KAFKA_CONTAINER;
+
+ private static KafkaConsumer<String, String> kafkaConsumer;
+
+ private static final String DEBEZIUM_JSON_TOPIC = "debezium_json_topic";
// use newer version of postgresql image to support pgoutput plugin
// when testing postgres 13, only 13-alpine supports both amd64 and arm64
protected static final DockerImageName PG_IMAGE =
@@ -122,6 +153,16 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
"-c",
"max_replication_slots=20");
+ private void createKafkaContainer() {
+ KAFKA_CONTAINER =
+ new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+ }
+
private String driverUrl() {
return
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.1/postgresql-42.5.1.jar";
}
@@ -149,8 +190,136 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
PostgreSQLContainer.POSTGRESQL_PORT,
PostgreSQLContainer.POSTGRESQL_PORT)));
Startables.deepStart(Stream.of(POSTGRES_CONTAINER)).join();
+
log.info("Postgres Containers are started");
initializePostgresTable(POSTGRES_CONTAINER, "inventory");
+
+ LOG.info("The third stage: Starting Kafka containers...");
+ createKafkaContainer();
+ Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+ LOG.info("Kafka Containers are started");
+
+ given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::createTopic);
+ LOG.info("Kafka create topic: " + DEBEZIUM_JSON_TOPIC);
+ }
+
+ // Initialize the kafka Topic
+ private void createTopic() {
+ Properties props = new Properties();
+ props.put(
+ AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
+
+ try (AdminClient adminClient = AdminClient.create(props)) {
+ // Create a new topic
+ NewTopic newTopic = new NewTopic(DEBEZIUM_JSON_TOPIC, 1, (short)
1);
+
+ // Create the topic (async operation)
+
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+
+ System.out.println("Topic " + DEBEZIUM_JSON_TOPIC + " created
successfully");
+ } catch (InterruptedException | ExecutionException e) {
+ System.err.println("Error creating topic: " + e.getMessage());
+ }
+ }
+ // Initialize the kafka Consumer
+
+ private Properties kafkaConsumerConfig() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
+ props.put(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+ props.put(
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG,
+ IsolationLevel.READ_COMMITTED.name().toLowerCase());
+ props.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+ return props;
+ }
+
+ private List<String> getKafkaData() {
+ long endOffset;
+ long lastProcessedOffset = -1L;
+ List<String> data = new ArrayList<>();
+
kafkaConsumer.subscribe(Collections.singletonList(PostgresCDCIT.DEBEZIUM_JSON_TOPIC));
+ Map<TopicPartition, Long> offsets =
+ kafkaConsumer.endOffsets(
+ Collections.singletonList(
+ new
TopicPartition(PostgresCDCIT.DEBEZIUM_JSON_TOPIC, 0)));
+ endOffset = offsets.entrySet().iterator().next().getValue();
+ log.info("End offset: {}", endOffset);
+ do {
+ ConsumerRecords<String, String> consumerRecords =
+ kafkaConsumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord<String, String> record : consumerRecords) {
+ data.add(record.value());
+ lastProcessedOffset = record.offset();
+ }
+ log.info("Data size: {}", data.size());
+ } while (lastProcessedOffset < endOffset - 1);
+
+ return data;
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently Only support Zeta engine")
+ public void testPostgresCdcWithDebeziumJsonFormat(TestContainer container)
{
+ try {
+
+ log.info(
+ "Table {} has {} rows.",
+ SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM,
+ query(getQuerySQL(POSTGRESQL_SCHEMA,
SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM)));
+
+ Properties props = kafkaConsumerConfig();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
"group-debezium-json-format");
+ kafkaConsumer = new KafkaConsumer<>(props);
+
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/postgrescdc_to_postgres_with_debezium_to_kafka.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" +
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ AtomicReference<Integer> dataSize = new AtomicReference<>(0);
+
+ await().atMost(1000 * 60 * 3, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ dataSize.updateAndGet(v -> v +
getKafkaData().size());
+ Assertions.assertEquals(1, dataSize.get());
+ });
+ // insert update delete
+ upsertDeleteSourceTable(POSTGRESQL_SCHEMA,
SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM);
+
+ await().atMost(1000 * 60 * 3, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ dataSize.updateAndGet(v -> v +
getKafkaData().size());
+ Assertions.assertEquals(5, dataSize.get());
+ });
+ } finally {
+ clearTable(POSTGRESQL_SCHEMA,
SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM);
+ kafkaConsumer.close();
+ }
}
@TestTemplate
@@ -555,8 +724,7 @@ public class PostgresCDCIT extends TestSuiteBase implements
TestResource {
}
@TestTemplate
- public void testPostgresCdcCheckDataWithCustomPrimaryKey(TestContainer
container)
- throws Exception {
+ public void testPostgresCdcCheckDataWithCustomPrimaryKey(TestContainer
container) {
try {
CompletableFuture.supplyAsync(
@@ -639,7 +807,7 @@ public class PostgresCDCIT extends TestSuiteBase implements
TestResource {
protected void initializePostgresTable(PostgreSQLContainer container,
String sqlFile) {
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
final URL ddlTestFile =
PostgresCDCIT.class.getClassLoader().getResource(ddlFile);
- assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
+ Assertions.assertNotNull(ddlTestFile, "Cannot locate " + ddlFile);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
final List<String> statements =
@@ -723,7 +891,7 @@ public class PostgresCDCIT extends TestSuiteBase implements
TestResource {
+ tableName
+ " VALUES (2, '2', 32767, 65535, 2147483647, 5.5,
6.6, 123.12345, 404.4443, true,\n"
+ " 'Hello World', 'a', 'abc', 'abcd..xyz',
'2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',\n"
- + " '2020-07-17', '18:00:22',
500,'192.168.1.1');");
+ + " '2020-07-17', '18:00:22', 500, 88,
'192.168.1.1');");
executeSql(
"INSERT INTO "
@@ -732,7 +900,7 @@ public class PostgresCDCIT extends TestSuiteBase implements
TestResource {
+ tableName
+ " VALUES (3, '2', 32767, 65535, 2147483647, 5.5,
6.6, 123.12345, 404.4443, true,\n"
+ " 'Hello World', 'a', 'abc', 'abcd..xyz',
'2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',\n"
- + " '2020-07-17', '18:00:22',
500,'192.168.1.1');");
+ + " '2020-07-17', '18:00:22', 500,
88,'192.168.1.1');");
executeSql("DELETE FROM " + database + "." + tableName + " where id =
2;");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
index 1372f98a44..59875092ef 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
@@ -47,6 +47,7 @@ CREATE TABLE postgres_cdc_table_1
f_date DATE,
f_time TIME(0),
f_default_numeric NUMERIC,
+ f_numeric_no_scale NUMERIC(24),
f_inet INET,
PRIMARY KEY (id)
);
@@ -72,6 +73,7 @@ CREATE TABLE postgres_cdc_table_2
f_date DATE,
f_time TIME(0),
f_default_numeric NUMERIC,
+ f_numeric_no_scale NUMERIC(24),
f_inet INET,
PRIMARY KEY (id)
);
@@ -97,6 +99,7 @@ CREATE TABLE sink_postgres_cdc_table_1
f_date DATE,
f_time TIME(0),
f_default_numeric NUMERIC,
+ f_numeric_no_scale NUMERIC(24),
f_inet INET,
PRIMARY KEY (id)
);
@@ -122,6 +125,7 @@ CREATE TABLE sink_postgres_cdc_table_2
f_date DATE,
f_time TIME(0),
f_default_numeric NUMERIC,
+ f_numeric_no_scale NUMERIC(24),
f_inet INET,
PRIMARY KEY (id)
);
@@ -147,6 +151,32 @@ CREATE TABLE full_types_no_primary_key
f_date DATE,
f_time TIME(0),
f_default_numeric NUMERIC,
+ f_numeric_no_scale NUMERIC(24),
+ f_inet INET
+);
+
+CREATE TABLE full_types_no_primary_key_with_debezium
+(
+ id INTEGER NOT NULL,
+ f_bytea BYTEA,
+ f_small SMALLINT,
+ f_int INTEGER,
+ f_big BIGINT,
+ f_real REAL,
+ f_double_precision DOUBLE PRECISION,
+ f_numeric NUMERIC(10, 5),
+ f_decimal DECIMAL(10, 1),
+ f_boolean BOOLEAN,
+ f_text TEXT,
+ f_char CHAR,
+ f_character CHARACTER(3),
+ f_character_varying CHARACTER VARYING(20),
+ f_timestamp3 TIMESTAMP(3),
+ f_timestamp6 TIMESTAMP(6),
+ f_date DATE,
+ f_time TIME(0),
+ f_default_numeric NUMERIC,
+ f_numeric_no_scale NUMERIC(24),
f_inet INET
);
@@ -186,15 +216,18 @@ ALTER TABLE sink_postgres_cdc_table_2
ALTER TABLE full_types_no_primary_key
REPLICA IDENTITY FULL;
+ALTER TABLE full_types_no_primary_key_with_debezium
+ REPLICA IDENTITY FULL;
+
INSERT INTO postgres_cdc_table_1
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
- '2020-07-17', '18:00:22', 500,'192.168.1.1');
+ '2020-07-17', '18:00:22', 500,88,'192.168.1.1');
INSERT INTO postgres_cdc_table_2
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
- '2020-07-17', '18:00:22', 500,'192.168.1.1');
+ '2020-07-17', '18:00:22', 500,88,'192.168.1.1');
INSERT INTO postgres_cdc_table_3
VALUES (1, '2', 32767, 65535);
@@ -202,4 +235,9 @@ VALUES (1, '2', 32767, 65535);
INSERT INTO full_types_no_primary_key
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
- '2020-07-17', '18:00:22', 500,'192.168.1.1');
+ '2020-07-17', '18:00:22', 500, 88,'192.168.1.1');
+
+INSERT INTO full_types_no_primary_key_with_debezium
+VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
+ 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
+ '2020-07-17', '18:00:22', 500, 88,'192.168.1.1');
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_debezium_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_debezium_to_kafka.conf
new file mode 100644
index 0000000000..d915f7cb28
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_debezium_to_kafka.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ read_limit.bytes_per_second = 7000000
+ read_limit.rows_per_second = 400
+ checkpoint.interval = 5000
+}
+
+source {
+ Postgres-CDC {
+ plugin_output = "customers_postgres_cdc"
+ username = "postgres"
+ password = "postgres"
+ database-names = ["postgres_cdc"]
+ schema-names = ["inventory"]
+ table-names =
["postgres_cdc.inventory.full_types_no_primary_key_with_debezium"]
+ base-url =
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+ decoding.plugin.name = "decoderbufs"
+ exactly_once = true
+ table-names-config = [
+ {
+ table =
"postgres_cdc.inventory.full_types_no_primary_key_with_debezium"
+ primaryKeys = ["id"]
+ }
+ ]
+ format = "compatible_debezium_json"
+ debezium = {
+ "key.converter.schemas.enable": false,
+ "value.converter.schemas.enable": false
+ }
+ }
+}
+
+transform {
+
+}
+
+sink {
+ kafka {
+ topic = "debezium_json_topic"
+ bootstrap.servers = "kafka_e2e:9092"
+ format = compatible_debezium_json
+ debezium = {
+ "key.converter.schemas.enable": false,
+ "value.converter.schemas.enable": false
+ }
+ }
+}
\ No newline at end of file