This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new cb1b23279 [minor][tests] Fix test 
testDanglingDroppingTableDuringBinlogMode due to imprecise timestamp startup
cb1b23279 is described below

commit cb1b232794734404fa0755cf5a55e7bb02241d88
Author: yuxiqian <[email protected]>
AuthorDate: Wed Aug 28 21:45:48 2024 +0800

    [minor][tests] Fix test testDanglingDroppingTableDuringBinlogMode due to 
imprecise timestamp startup
    
    This closes  #3580
---
 .../mysql/source/MySqlPipelineITCase.java          | 74 ++++++++++++++++++++++
 .../flink/cdc/pipeline/tests/MysqlE2eITCase.java   | 46 ++++++++++----
 2 files changed, 108 insertions(+), 12 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 82f7b5e7b..f2b4ccb82 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.cdc.common.types.CharType;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
 import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
@@ -57,6 +58,7 @@ import org.junit.Test;
 import org.testcontainers.lifecycle.Startables;
 
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -689,6 +691,78 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                 
actual.stream().map(Object::toString).collect(Collectors.toList()));
     }
 
+    @Test
+    public void testDanglingDropTableEventInBinlog() throws Exception {
+        env.setParallelism(1);
+        inventoryDatabase.createAndInitialize();
+
+        // Create a new table for later deletion
+        try (Connection connection = inventoryDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("CREATE TABLE live_fast(ID INT PRIMARY KEY);");
+        }
+
+        String logFileName = null;
+        Long logPosition = null;
+
+        try (Connection connection = inventoryDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            ResultSet rs = statement.executeQuery("SHOW BINARY LOGS;");
+            while (rs.next()) {
+                logFileName = rs.getString("Log_name");
+                logPosition = rs.getLong("File_size");
+            }
+        }
+
+        // We start reading binlog from the tail of current position and file 
to avoid reading
+        // previous events. The next DDL event (DROP TABLE) will push binlog 
position forward.
+        Preconditions.checkNotNull(logFileName, "Log file name must not be 
null");
+        Preconditions.checkNotNull(logPosition, "Log position name must not be 
null");
+        LOG.info("Trying to restore from {} @ {}...", logFileName, 
logPosition);
+
+        try (Connection connection = inventoryDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("DROP TABLE live_fast;");
+        }
+
+        MySqlSourceConfigFactory configFactory =
+                new MySqlSourceConfigFactory()
+                        .hostname(MYSQL8_CONTAINER.getHost())
+                        .port(MYSQL8_CONTAINER.getDatabasePort())
+                        .username(TEST_USER)
+                        .password(TEST_PASSWORD)
+                        .databaseList(inventoryDatabase.getDatabaseName())
+                        .tableList(inventoryDatabase.getDatabaseName() + ".*")
+                        
.startupOptions(StartupOptions.specificOffset(logFileName, logPosition))
+                        .serverId(getServerId(env.getParallelism()))
+                        .serverTimeZone("UTC")
+                        
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider) new 
MySqlDataSource(configFactory).getEventSourceProvider();
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                MySqlDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+        Thread.sleep(5_000);
+
+        List<Event> expectedEvents =
+                new ArrayList<>(
+                        
getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName()));
+
+        expectedEvents.add(
+                new DropTableEvent(
+                        TableId.tableId(inventoryDatabase.getDatabaseName(), 
"live_fast")));
+
+        List<Event> actual = fetchResults(events, expectedEvents.size());
+        assertEqualsInAnyOrder(
+                
expectedEvents.stream().map(Object::toString).collect(Collectors.toList()),
+                
actual.stream().map(Object::toString).collect(Collectors.toList()));
+    }
+
     private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
         return new CreateTableEvent(
                 tableId,
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index ccdbbbad7..a8c7a8c5f 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.pipeline.tests;
 
 import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
 import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
 import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
@@ -36,6 +37,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import java.nio.file.Path;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Duration;
@@ -333,16 +335,34 @@ public class MysqlE2eITCase extends 
PipelineTestEnvironment {
     }
 
     @Test
-    public void testDroppingTable() throws Exception {
-        Thread.sleep(5000);
-        LOG.info("Sleep 5 seconds to distinguish initial DDL events with 
dropping table events...");
-        long ddlTimestamp = System.currentTimeMillis();
-        Thread.sleep(5000);
-        LOG.info("Going to drop tables after timestamp {}", ddlTimestamp);
+    public void testDanglingDropTableEventInBinlog() throws Exception {
+        // Create a new table for later deletion
+        try (Connection connection = 
mysqlInventoryDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("CREATE TABLE live_fast(ID INT PRIMARY KEY);");
+        }
+
+        String logFileName = null;
+        Long logPosition = null;
+
+        try (Connection connection = 
mysqlInventoryDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            ResultSet rs = statement.executeQuery("SHOW BINARY LOGS;");
+            while (rs.next()) {
+                logFileName = rs.getString("Log_name");
+                logPosition = rs.getLong("File_size");
+            }
+        }
+
+        // We start reading binlog from the tail of current position and file 
to avoid reading
+        // previous events. The next DDL event (DROP TABLE) will push binlog 
position forward.
+        Preconditions.checkNotNull(logFileName, "Log file name must not be 
null");
+        Preconditions.checkNotNull(logPosition, "Log position name must not be 
null");
+        LOG.info("Trying to restore from {} @ {}...", logFileName, 
logPosition);
 
         try (Connection connection = 
mysqlInventoryDatabase.getJdbcConnection();
                 Statement statement = connection.createStatement()) {
-            statement.execute("DROP TABLE products;");
+            statement.execute("DROP TABLE live_fast;");
         }
 
         String pipelineJob =
@@ -356,8 +376,9 @@ public class MysqlE2eITCase extends PipelineTestEnvironment 
{
                                 + "  tables: %s.\\.*\n"
                                 + "  server-id: 5400-5404\n"
                                 + "  server-time-zone: UTC\n"
-                                + "  scan.startup.mode: timestamp\n"
-                                + "  scan.startup.timestamp-millis: %d\n"
+                                + "  scan.startup.mode: specific-offset\n"
+                                + "  scan.startup.specific-offset.file: %s\n"
+                                + "  scan.startup.specific-offset.pos: %d\n"
                                 + "  scan.binlog.newly-added-table.enabled: 
true\n"
                                 + "\n"
                                 + "sink:\n"
@@ -370,7 +391,8 @@ public class MysqlE2eITCase extends PipelineTestEnvironment 
{
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         mysqlInventoryDatabase.getDatabaseName(),
-                        ddlTimestamp,
+                        logFileName,
+                        logPosition,
                         parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
@@ -380,13 +402,13 @@ public class MysqlE2eITCase extends 
PipelineTestEnvironment {
         LOG.info("Pipeline job is running");
         waitUntilSpecificEvent(
                 String.format(
-                        "Table %s.products received SchemaChangeEvent 
DropTableEvent{tableId=%s.products} and start to be blocked.",
+                        "Table %s.live_fast received SchemaChangeEvent 
DropTableEvent{tableId=%s.live_fast} and start to be blocked.",
                         mysqlInventoryDatabase.getDatabaseName(),
                         mysqlInventoryDatabase.getDatabaseName()));
 
         waitUntilSpecificEvent(
                 String.format(
-                        "Schema change event 
DropTableEvent{tableId=%s.products} has been handled in another subTask 
already.",
+                        "Schema change event 
DropTableEvent{tableId=%s.live_fast} has been handled in another subTask 
already.",
                         mysqlInventoryDatabase.getDatabaseName()));
     }
 

Reply via email to