This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 087087e592 [Fix][MySQL-CDC]fix recovery task failure caused by binlog
deletion (#8587)
087087e592 is described below
commit 087087e59284bc2e3e172011e6aa35349ac89f1c
Author: limin <[email protected]>
AuthorDate: Sat Feb 8 10:57:46 2025 +0800
[Fix][MySQL-CDC]fix recovery task failure caused by binlog deletion (#8587)
---
.../reader/IncrementalSourceRecordEmitter.java | 3 +-
.../cdc/mysql/MysqlCDCWithBinlogDeleteIT.java | 336 +++++++++++++++++++++
.../mysqlcdc_to_mysql_with_binlog_delete.conf | 50 +++
3 files changed, 388 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
index 0b09f048bf..085354309e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -50,6 +50,7 @@ import static
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.W
import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getFetchTimestamp;
import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getMessageTimestamp;
import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isDataChangeRecord;
+import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isHeartbeatRecord;
import static
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isSchemaChangeEvent;
/**
@@ -144,7 +145,7 @@ public class IncrementalSourceRecordEmitter<T>
Offset position = getOffsetPosition(element);
splitState.asIncrementalSplitState().setStartupOffset(position);
emitElement(element, output);
- } else if (isDataChangeRecord(element)) {
+ } else if (isDataChangeRecord(element) || isHeartbeatRecord(element)) {
if (splitState.isIncrementalSplitState()) {
Offset position = getOffsetPosition(element);
splitState.asIncrementalSplitState().setStartupOffset(position);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithBinlogDeleteIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithBinlogDeleteIT.java
new file mode 100644
index 0000000000..1f0ce5e5a6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithBinlogDeleteIT.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support restore")
+public class MysqlCDCWithBinlogDeleteIT extends TestSuiteBase implements
TestResource {
+ private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_USER_NAME = "mysqluser";
+ private static final String MYSQL_USER_PASSWORD = "mysqlpw";
+ private static final String MYSQL_DATABASE = "mysql_cdc";
+ private static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(
+ MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw",
MYSQL_DATABASE);
+
+ // mysql source table query sql
+ private static final String SOURCE_SQL_TEMPLATE =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as
char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob,
cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as
f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint,
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal,
f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime,
f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob,
f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, f_year from %s.%s";
+ // mysql sink table query sql
+ private static final String SINK_SQL_TEMPLATE =
+ "select id, cast(f_binary as char) as f_binary, cast(f_blob as
char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+ + " cast(f_longblob as char) as f_longblob,
cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as
f_varbinary,"
+ + " f_smallint, f_smallint_unsigned, f_mediumint,
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+ + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal,
f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+ + " f_text, f_tinytext, f_varchar, f_date, f_datetime,
f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+ + " f_enum, cast(f_mediumblob as char) as f_mediumblob,
f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+ + " f_json, cast(f_year as year) from %s.%s";
+ private static final String SOURCE_TABLE = "mysql_cdc_e2e_source_table";
+ private static final String SINK_TABLE = "mysql_cdc_e2e_sink_table";
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ return new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName(MYSQL_DATABASE)
+ .withUsername(MYSQL_USER_NAME)
+ .withPassword(MYSQL_USER_PASSWORD)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
+ }
+
+ private String driverUrl() {
+ return
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+ }
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib
&& cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget "
+ + driverUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() {
+ log.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ log.info("Mysql Containers are started");
+ inventoryDatabase.createAndInitialize();
+ log.info("Mysql ddl execution is complete");
+ }
+
+ @Override
+ @AfterAll
+ public void tearDown() {
+ // close Container
+ if (MYSQL_CONTAINER != null) {
+ MYSQL_CONTAINER.close();
+ }
+ }
+
+ @TestTemplate
+ public void testRestoreTaskWhenBinlogDelete(TestContainer container)
+ throws InterruptedException, IOException {
+ // Clear related content to ensure that multiple operations are not
affected
+ clearTable(MYSQL_DATABASE, SINK_TABLE);
+ // execute task
+ Long jobId = JobIdGenerator.newJobId();
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob(
+ "/mysqlcdc_to_mysql_with_binlog_delete.conf",
+ String.valueOf(jobId));
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+ // wait for data written to sink
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(getSourceQuerySQL(MYSQL_DATABASE,
SOURCE_TABLE)),
+ query(getSinkQuerySQL(MYSQL_DATABASE,
SINK_TABLE)));
+ });
+ // flush binary logs
+ executeSql("flush binary logs");
+ // wait a moment for binlog heartbeat event
+ TimeUnit.SECONDS.sleep(60);
+ // pause task
+ Assertions.assertEquals(0,
container.savepointJob(String.valueOf(jobId)).getExitCode());
+ // purge binary logs
+ List<List<Object>> masterStatus = query("show master status");
+ String binlogName = masterStatus.get(0).get(0).toString();
+ executeSql("purge binary logs to '" + binlogName + "'");
+ // restore task
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.restoreJob(
+ "/mysqlcdc_to_mysql_with_binlog_delete.conf",
+ String.valueOf(jobId));
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+ await().atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ String jobStatus =
container.getJobStatus(String.valueOf(jobId));
+ Assertions.assertEquals("RUNNING", jobStatus);
+ });
+ // write data again, check no problem
+ upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE);
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(getSourceQuerySQL(MYSQL_DATABASE,
SOURCE_TABLE)),
+ query(getSinkQuerySQL(MYSQL_DATABASE,
SINK_TABLE)));
+ });
+ // check no error
+ log.info("****************** container logs start ******************");
+ String containerLogs = container.getServerLogs();
+ log.info(containerLogs);
+ Assertions.assertFalse(containerLogs.contains("ERROR"));
+ log.info("****************** container logs end ******************");
+ }
+
+ private void clearTable(String database, String tableName) {
+ executeSql("truncate table " + database + "." + tableName);
+ }
+
+ // Execute SQL
+ private void executeSql(String sql) {
+ try (Connection connection = getJdbcConnection()) {
+ connection.createStatement().execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword());
+ }
+
+ private void changeSourceTable(String database, String tableName) {
+ for (int i = 100; i < 110; i++) {
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " ( id, f_binary, f_blob, f_long_varbinary,
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+ + "
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned,
f_integer,\n"
+ + "
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float,
f_double,\n"
+ + "
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar,
f_date, f_datetime,\n"
+ + "
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar,
f_real, f_time,\n"
+ + "
f_tinyint, f_tinyint_unsigned, f_json, f_year )\n"
+ + "VALUES ( "
+ + i
+ + ",
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+ + " 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+ + " 0x74696E79626C6F62,
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321,
1234567, 7654321,\n"
+ + " 123456789, 987654321, 123, 789, 12.34,
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+ + " 'This is a text field', 'This is a
tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27
14:30:00',\n"
+ + " '2023-04-27 11:08:40', 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',\n"
+ + "
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',\n"
+ + " 12.345, '14:30:00', -128, 255, '{
\"key\": \"value\" }', 1992 )");
+ }
+
+ executeSql("DELETE FROM " + database + "." + tableName + " where id >
100");
+
+ executeSql("UPDATE " + database + "." + tableName + " SET f_bigint =
10000 where id < 10");
+ }
+
+ private void upsertDeleteSourceTable(String database, String tableName) {
+
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " ( id, f_binary, f_blob, f_long_varbinary,
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+ + "
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned,
f_integer,\n"
+ + "
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float,
f_double,\n"
+ + "
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar,
f_date, f_datetime,\n"
+ + "
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar,
f_real, f_time,\n"
+ + " f_tinyint,
f_tinyint_unsigned, f_json, f_year )\n"
+ + "VALUES ( 5,
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+ + " 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+ + " 0x74696E79626C6F62,
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321,
1234567, 7654321,\n"
+ + " 123456789, 987654321, 123, 789, 12.34,
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+ + " 'This is a text field', 'This is a tiny
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+ + " '2023-04-27 11:08:40', 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',\n"
+ + "
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',\n"
+ + " 12.345, '14:30:00', -128, 255, '{ \"key\":
\"value\" }', 1992 )");
+ executeSql(
+ "INSERT INTO "
+ + database
+ + "."
+ + tableName
+ + " ( id, f_binary, f_blob, f_long_varbinary,
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+ + "
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned,
f_integer,\n"
+ + "
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float,
f_double,\n"
+ + "
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar,
f_date, f_datetime,\n"
+ + "
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar,
f_real, f_time,\n"
+ + " f_tinyint,
f_tinyint_unsigned, f_json, f_year )\n"
+ + "VALUES ( 6,
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+ + " 0x68656C6C6F,
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+ + " 0x74696E79626C6F62,
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321,
1234567, 7654321,\n"
+ + " 123456789, 987654321, 123, 789, 12.34,
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+ + " 'This is a text field', 'This is a tiny
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+ + " '2023-04-27 11:08:40', 1,
b'0101010101010101010101010101010101010101010101010101010101010101', 'C',
'enum2',\n"
+ + "
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A,
'This is a long varchar field',\n"
+ + " 12.345, '14:30:00', -128, 255, '{ \"key\":
\"value\" }', 1999 )");
+ executeSql("DELETE FROM " + database + "." + tableName + " where id =
2");
+
+ executeSql("UPDATE " + database + "." + tableName + " SET f_bigint =
10000 where id = 3");
+ }
+
+ private List<List<Object>> query(String sql) {
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+ List<List<Object>> result = new ArrayList<>();
+ int columnCount = resultSet.getMetaData().getColumnCount();
+ while (resultSet.next()) {
+ ArrayList<Object> objects = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ objects.add(resultSet.getObject(i));
+ }
+ log.debug(String.format("Print MySQL-CDC query, sql: %s, data:
%s", sql, objects));
+ result.add(objects);
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getSourceQuerySQL(String database, String tableName) {
+ return String.format(SOURCE_SQL_TEMPLATE, database, tableName);
+ }
+
+ private String getSinkQuerySQL(String database, String tableName) {
+ return String.format(SINK_SQL_TEMPLATE, database, tableName);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_binlog_delete.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_binlog_delete.conf
new file mode 100644
index 0000000000..4723f85ac6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_binlog_delete.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+ startup.mode = "initial"
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user_sink"
+ password = "mysqlpw"
+ generate_sink_sql = true
+ database = mysql_cdc
+ table = mysql_cdc_e2e_sink_table
+ primary_keys = ["id"]
+ }
+}
\ No newline at end of file