This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 087087e592 [Fix][MySQL-CDC]fix recovery task failure caused by binlog 
deletion (#8587)
087087e592 is described below

commit 087087e59284bc2e3e172011e6aa35349ac89f1c
Author: limin <[email protected]>
AuthorDate: Sat Feb 8 10:57:46 2025 +0800

    [Fix][MySQL-CDC]fix recovery task failure caused by binlog deletion (#8587)
---
 .../reader/IncrementalSourceRecordEmitter.java     |   3 +-
 .../cdc/mysql/MysqlCDCWithBinlogDeleteIT.java      | 336 +++++++++++++++++++++
 .../mysqlcdc_to_mysql_with_binlog_delete.conf      |  50 +++
 3 files changed, 388 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
index 0b09f048bf..085354309e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -50,6 +50,7 @@ import static 
org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.W
 import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getFetchTimestamp;
 import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getMessageTimestamp;
 import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isDataChangeRecord;
+import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isHeartbeatRecord;
 import static 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.isSchemaChangeEvent;
 
 /**
@@ -144,7 +145,7 @@ public class IncrementalSourceRecordEmitter<T>
             Offset position = getOffsetPosition(element);
             splitState.asIncrementalSplitState().setStartupOffset(position);
             emitElement(element, output);
-        } else if (isDataChangeRecord(element)) {
+        } else if (isDataChangeRecord(element) || isHeartbeatRecord(element)) {
             if (splitState.isIncrementalSplitState()) {
                 Offset position = getOffsetPosition(element);
                 
splitState.asIncrementalSplitState().setStartupOffset(position);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithBinlogDeleteIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithBinlogDeleteIT.java
new file mode 100644
index 0000000000..1f0ce5e5a6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithBinlogDeleteIT.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "Currently SPARK and FLINK do not support restore")
+public class MysqlCDCWithBinlogDeleteIT extends TestSuiteBase implements 
TestResource {
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "mysqluser";
+    private static final String MYSQL_USER_PASSWORD = "mysqlpw";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V8_0);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(
+                    MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", 
MYSQL_DATABASE);
+
+    // mysql source table query sql
+    private static final String SOURCE_SQL_TEMPLATE =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as 
char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, 
cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as 
f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, 
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, 
f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, 
f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, 
f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, f_year from %s.%s";
+    // mysql sink table query sql
+    private static final String SINK_SQL_TEMPLATE =
+            "select id, cast(f_binary as char) as f_binary, cast(f_blob as 
char) as f_blob, cast(f_long_varbinary as char) as f_long_varbinary,"
+                    + " cast(f_longblob as char) as f_longblob, 
cast(f_tinyblob as char) as f_tinyblob, cast(f_varbinary as char) as 
f_varbinary,"
+                    + " f_smallint, f_smallint_unsigned, f_mediumint, 
f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned,"
+                    + " f_bigint, f_bigint_unsigned, f_numeric, f_decimal, 
f_float, f_double, f_double_precision, f_longtext, f_mediumtext,"
+                    + " f_text, f_tinytext, f_varchar, f_date, f_datetime, 
f_timestamp, f_bit1, cast(f_bit64 as char) as f_bit64, f_char,"
+                    + " f_enum, cast(f_mediumblob as char) as f_mediumblob, 
f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned,"
+                    + " f_json, cast(f_year as year) from %s.%s";
+    private static final String SOURCE_TABLE = "mysql_cdc_e2e_source_table";
+    private static final String SINK_TABLE = "mysql_cdc_e2e_sink_table";
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        return new MySqlContainer(version)
+                .withConfigurationOverride("docker/server-gtids/my.cnf")
+                .withSetupSQL("docker/setup.sql")
+                .withNetwork(NETWORK)
+                .withNetworkAliases(MYSQL_HOST)
+                .withDatabaseName(MYSQL_DATABASE)
+                .withUsername(MYSQL_USER_NAME)
+                .withPassword(MYSQL_USER_PASSWORD)
+                .withLogConsumer(
+                        new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
+    }
+
+    private String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";;
+    }
+
+    @TestContainerExtension
+    protected final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib 
&& cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget "
+                                        + driverUrl());
+                Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
+            };
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+        log.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        log.info("Mysql Containers are started");
+        inventoryDatabase.createAndInitialize();
+        log.info("Mysql ddl execution is complete");
+    }
+
+    @Override
+    @AfterAll
+    public void tearDown() {
+        // close Container
+        if (MYSQL_CONTAINER != null) {
+            MYSQL_CONTAINER.close();
+        }
+    }
+
+    @TestTemplate
+    public void testRestoreTaskWhenBinlogDelete(TestContainer container)
+            throws InterruptedException, IOException {
+        // Clear related content to ensure that multiple operations are not 
affected
+        clearTable(MYSQL_DATABASE, SINK_TABLE);
+        // execute task
+        Long jobId = JobIdGenerator.newJobId();
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        return container.executeJob(
+                                "/mysqlcdc_to_mysql_with_binlog_delete.conf",
+                                String.valueOf(jobId));
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                });
+        // wait for data written to sink
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    query(getSourceQuerySQL(MYSQL_DATABASE, 
SOURCE_TABLE)),
+                                    query(getSinkQuerySQL(MYSQL_DATABASE, 
SINK_TABLE)));
+                        });
+        // flush binary logs
+        executeSql("flush binary logs");
+        // wait a moment for binlog heartbeat event
+        TimeUnit.SECONDS.sleep(60);
+        // pause task
+        Assertions.assertEquals(0, 
container.savepointJob(String.valueOf(jobId)).getExitCode());
+        // purge binary logs
+        List<List<Object>> masterStatus = query("show master status");
+        String binlogName = masterStatus.get(0).get(0).toString();
+        executeSql("purge binary logs to '" + binlogName + "'");
+        // restore task
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        container.restoreJob(
+                                "/mysqlcdc_to_mysql_with_binlog_delete.conf",
+                                String.valueOf(jobId));
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+        await().atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            String jobStatus = 
container.getJobStatus(String.valueOf(jobId));
+                            Assertions.assertEquals("RUNNING", jobStatus);
+                        });
+        // write data again, check no problem
+        upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE);
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    query(getSourceQuerySQL(MYSQL_DATABASE, 
SOURCE_TABLE)),
+                                    query(getSinkQuerySQL(MYSQL_DATABASE, 
SINK_TABLE)));
+                        });
+        // check no error
+        log.info("****************** container logs start ******************");
+        String containerLogs = container.getServerLogs();
+        log.info(containerLogs);
+        Assertions.assertFalse(containerLogs.contains("ERROR"));
+        log.info("****************** container logs end ******************");
+    }
+
+    private void clearTable(String database, String tableName) {
+        executeSql("truncate table " + database + "." + tableName);
+    }
+
+    // Execute SQL
+    private void executeSql(String sql) {
+        try (Connection connection = getJdbcConnection()) {
+            connection.createStatement().execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Connection getJdbcConnection() throws SQLException {
+        return DriverManager.getConnection(
+                MYSQL_CONTAINER.getJdbcUrl(),
+                MYSQL_CONTAINER.getUsername(),
+                MYSQL_CONTAINER.getPassword());
+    }
+
+    private void changeSourceTable(String database, String tableName) {
+        for (int i = 100; i < 110; i++) {
+            executeSql(
+                    "INSERT INTO "
+                            + database
+                            + "."
+                            + tableName
+                            + " ( id, f_binary, f_blob, f_long_varbinary, 
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+                            + "                                         
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, 
f_integer,\n"
+                            + "                                         
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, 
f_double,\n"
+                            + "                                         
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, 
f_date, f_datetime,\n"
+                            + "                                         
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time,\n"
+                            + "                                         
f_tinyint, f_tinyint_unsigned, f_json, f_year )\n"
+                            + "VALUES ( "
+                            + i
+                            + ", 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                            + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+                            + "         0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321,\n"
+                            + "         123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+                            + "         'This is a text field', 'This is a 
tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 
14:30:00',\n"
+                            + "         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                            + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                            + "         12.345, '14:30:00', -128, 255, '{ 
\"key\": \"value\" }', 1992 )");
+        }
+
+        executeSql("DELETE FROM " + database + "." + tableName + " where id > 
100");
+
+        executeSql("UPDATE " + database + "." + tableName + " SET f_bigint = 
10000 where id < 10");
+    }
+
+    private void upsertDeleteSourceTable(String database, String tableName) {
+
+        executeSql(
+                "INSERT INTO "
+                        + database
+                        + "."
+                        + tableName
+                        + " ( id, f_binary, f_blob, f_long_varbinary, 
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+                        + "                                         
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, 
f_integer,\n"
+                        + "                                         
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, 
f_double,\n"
+                        + "                                         
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, 
f_date, f_datetime,\n"
+                        + "                                         
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time,\n"
+                        + "                                         f_tinyint, 
f_tinyint_unsigned, f_json, f_year )\n"
+                        + "VALUES ( 5, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                        + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+                        + "         0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321,\n"
+                        + "         123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+                        + "         'This is a text field', 'This is a tiny 
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+                        + "         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                        + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                        + "         12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992 )");
+        executeSql(
+                "INSERT INTO "
+                        + database
+                        + "."
+                        + tableName
+                        + " ( id, f_binary, f_blob, f_long_varbinary, 
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+                        + "                                         
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, 
f_integer,\n"
+                        + "                                         
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, 
f_double,\n"
+                        + "                                         
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, 
f_date, f_datetime,\n"
+                        + "                                         
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time,\n"
+                        + "                                         f_tinyint, 
f_tinyint_unsigned, f_json, f_year )\n"
+                        + "VALUES ( 6, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                        + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+                        + "         0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321,\n"
+                        + "         123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+                        + "         'This is a text field', 'This is a tiny 
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+                        + "         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                        + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                        + "         12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1999 )");
+        executeSql("DELETE FROM " + database + "." + tableName + " where id = 
2");
+
+        executeSql("UPDATE " + database + "." + tableName + " SET f_bigint = 
10000 where id = 3");
+    }
+
+    private List<List<Object>> query(String sql) {
+        try (Connection connection = getJdbcConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
+            List<List<Object>> result = new ArrayList<>();
+            int columnCount = resultSet.getMetaData().getColumnCount();
+            while (resultSet.next()) {
+                ArrayList<Object> objects = new ArrayList<>();
+                for (int i = 1; i <= columnCount; i++) {
+                    objects.add(resultSet.getObject(i));
+                }
+                log.debug(String.format("Print MySQL-CDC query, sql: %s, data: 
%s", sql, objects));
+                result.add(objects);
+            }
+            return result;
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private String getSourceQuerySQL(String database, String tableName) {
+        return String.format(SOURCE_SQL_TEMPLATE, database, tableName);
+    }
+
+    private String getSinkQuerySQL(String database, String tableName) {
+        return String.format(SINK_SQL_TEMPLATE, database, tableName);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_binlog_delete.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_binlog_delete.conf
new file mode 100644
index 0000000000..4723f85ac6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_binlog_delete.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5652
+    username = "st_user_source"
+    password = "mysqlpw"
+    table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+    startup.mode = "initial"
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "st_user_sink"
+    password = "mysqlpw"
+    generate_sink_sql = true
+    database = mysql_cdc
+    table = mysql_cdc_e2e_sink_table
+    primary_keys = ["id"]
+  }
+}
\ No newline at end of file

Reply via email to