This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new cb1b23279 [minor][tests] Fix test
testDanglingDroppingTableDuringBinlogMode due to imprecise timestamp startup
cb1b23279 is described below
commit cb1b232794734404fa0755cf5a55e7bb02241d88
Author: yuxiqian <[email protected]>
AuthorDate: Wed Aug 28 21:45:48 2024 +0800
[minor][tests] Fix test testDanglingDroppingTableDuringBinlogMode due to
imprecise timestamp startup
This closes #3580
---
.../mysql/source/MySqlPipelineITCase.java | 74 ++++++++++++++++++++++
.../flink/cdc/pipeline/tests/MysqlE2eITCase.java | 46 ++++++++++----
2 files changed, 108 insertions(+), 12 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 82f7b5e7b..f2b4ccb82 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.cdc.common.types.CharType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
@@ -57,6 +58,7 @@ import org.junit.Test;
import org.testcontainers.lifecycle.Startables;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
@@ -689,6 +691,78 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
actual.stream().map(Object::toString).collect(Collectors.toList()));
}
+ @Test
+ public void testDanglingDropTableEventInBinlog() throws Exception {
+ env.setParallelism(1);
+ inventoryDatabase.createAndInitialize();
+
+ // Create a new table for later deletion
+ try (Connection connection = inventoryDatabase.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE TABLE live_fast(ID INT PRIMARY KEY);");
+ }
+
+ String logFileName = null;
+ Long logPosition = null;
+
+ try (Connection connection = inventoryDatabase.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ ResultSet rs = statement.executeQuery("SHOW BINARY LOGS;");
+ while (rs.next()) {
+ logFileName = rs.getString("Log_name");
+ logPosition = rs.getLong("File_size");
+ }
+ }
+
+ // We start reading binlog from the tail of current position and file
to avoid reading
+ // previous events. The next DDL event (DROP TABLE) will push binlog
position forward.
+ Preconditions.checkNotNull(logFileName, "Log file name must not be
null");
+ Preconditions.checkNotNull(logPosition, "Log position name must not be
null");
+ LOG.info("Trying to restore from {} @ {}...", logFileName,
logPosition);
+
+ try (Connection connection = inventoryDatabase.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("DROP TABLE live_fast;");
+ }
+
+ MySqlSourceConfigFactory configFactory =
+ new MySqlSourceConfigFactory()
+ .hostname(MYSQL8_CONTAINER.getHost())
+ .port(MYSQL8_CONTAINER.getDatabasePort())
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(inventoryDatabase.getDatabaseName())
+ .tableList(inventoryDatabase.getDatabaseName() + ".*")
+
.startupOptions(StartupOptions.specificOffset(logFileName, logPosition))
+ .serverId(getServerId(env.getParallelism()))
+ .serverTimeZone("UTC")
+
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider) new
MySqlDataSource(configFactory).getEventSourceProvider();
+ CloseableIterator<Event> events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ MySqlDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+ Thread.sleep(5_000);
+
+ List<Event> expectedEvents =
+ new ArrayList<>(
+
getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName()));
+
+ expectedEvents.add(
+ new DropTableEvent(
+ TableId.tableId(inventoryDatabase.getDatabaseName(),
"live_fast")));
+
+ List<Event> actual = fetchResults(events, expectedEvents.size());
+ assertEqualsInAnyOrder(
+
expectedEvents.stream().map(Object::toString).collect(Collectors.toList()),
+
actual.stream().map(Object::toString).collect(Collectors.toList()));
+ }
+
private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
return new CreateTableEvent(
tableId,
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index ccdbbbad7..a8c7a8c5f 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.pipeline.tests;
import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
@@ -36,6 +37,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
@@ -333,16 +335,34 @@ public class MysqlE2eITCase extends
PipelineTestEnvironment {
}
@Test
- public void testDroppingTable() throws Exception {
- Thread.sleep(5000);
- LOG.info("Sleep 5 seconds to distinguish initial DDL events with
dropping table events...");
- long ddlTimestamp = System.currentTimeMillis();
- Thread.sleep(5000);
- LOG.info("Going to drop tables after timestamp {}", ddlTimestamp);
+ public void testDanglingDropTableEventInBinlog() throws Exception {
+ // Create a new table for later deletion
+ try (Connection connection =
mysqlInventoryDatabase.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE TABLE live_fast(ID INT PRIMARY KEY);");
+ }
+
+ String logFileName = null;
+ Long logPosition = null;
+
+ try (Connection connection =
mysqlInventoryDatabase.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ ResultSet rs = statement.executeQuery("SHOW BINARY LOGS;");
+ while (rs.next()) {
+ logFileName = rs.getString("Log_name");
+ logPosition = rs.getLong("File_size");
+ }
+ }
+
+ // We start reading binlog from the tail of current position and file
to avoid reading
+ // previous events. The next DDL event (DROP TABLE) will push binlog
position forward.
+ Preconditions.checkNotNull(logFileName, "Log file name must not be
null");
+ Preconditions.checkNotNull(logPosition, "Log position name must not be
null");
+ LOG.info("Trying to restore from {} @ {}...", logFileName,
logPosition);
try (Connection connection =
mysqlInventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
- statement.execute("DROP TABLE products;");
+ statement.execute("DROP TABLE live_fast;");
}
String pipelineJob =
@@ -356,8 +376,9 @@ public class MysqlE2eITCase extends PipelineTestEnvironment
{
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
- + " scan.startup.mode: timestamp\n"
- + " scan.startup.timestamp-millis: %d\n"
+ + " scan.startup.mode: specific-offset\n"
+ + " scan.startup.specific-offset.file: %s\n"
+ + " scan.startup.specific-offset.pos: %d\n"
+ " scan.binlog.newly-added-table.enabled:
true\n"
+ "\n"
+ "sink:\n"
@@ -370,7 +391,8 @@ public class MysqlE2eITCase extends PipelineTestEnvironment
{
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName(),
- ddlTimestamp,
+ logFileName,
+ logPosition,
parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
@@ -380,13 +402,13 @@ public class MysqlE2eITCase extends
PipelineTestEnvironment {
LOG.info("Pipeline job is running");
waitUntilSpecificEvent(
String.format(
- "Table %s.products received SchemaChangeEvent
DropTableEvent{tableId=%s.products} and start to be blocked.",
+ "Table %s.live_fast received SchemaChangeEvent
DropTableEvent{tableId=%s.live_fast} and start to be blocked.",
mysqlInventoryDatabase.getDatabaseName(),
mysqlInventoryDatabase.getDatabaseName()));
waitUntilSpecificEvent(
String.format(
- "Schema change event
DropTableEvent{tableId=%s.products} has been handled in another subTask
already.",
+ "Schema change event
DropTableEvent{tableId=%s.live_fast} has been handled in another subTask
already.",
mysqlInventoryDatabase.getDatabaseName()));
}