This is an automated email from the ASF dual-hosted git repository. ruanhang1993 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new ddb5f00df [hotfix][tests] Fix unstable `OceanBaseMySQLModelITCase` (#3831) ddb5f00df is described below commit ddb5f00df5bab21e8c91e4a31574d446d8b7a3f7 Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Fri Jan 3 13:43:10 2025 +0800 [hotfix][tests] Fix unstable `OceanBaseMySQLModelITCase` (#3831) Signed-off-by: yuxiqian <34335406+yuxiq...@users.noreply.github.com> --- .../oceanbase/table/OceanBaseMySQLModeITCase.java | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java index 27d30b26b..a74bf035e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java @@ -104,10 +104,24 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { + String.format(" 'rootserver-list' = '%s'", METADATA.getRsList()); } + /** + * Current OceanBase connector uses timestamp (in seconds) to mark the offset during the + * transition from {@code SNAPSHOT} to {@code STREAMING} mode. Thus, if some snapshot inserting + * events are too close to the transitioning offset, snapshot inserting events might be emitted + * multiple times. <br> + * This could be safely removed after switching to incremental snapshot framework which provides + * Exactly-once guarantee. + */ + private void waitForTableInitialization() throws InterruptedException { + Thread.sleep(5000L); + } + @Test public void testTableList() throws Exception { inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory"); inventoryDatabase.createAndInitialize("mysql"); + waitForTableInitialization(); + String sourceDDL = String.format( "CREATE TABLE ob_source (" @@ -212,6 +226,8 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { public void testMetadataColumns() throws Exception { inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory"); inventoryDatabase.createAndInitialize("mysql"); + waitForTableInitialization(); + String sourceDDL = String.format( "CREATE TABLE ob_source (" @@ -297,6 +313,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { columnTypesDatabase = new UniqueDatabase(OB_SERVER, "column_type_test"); columnTypesDatabase.createAndInitialize("mysql"); + waitForTableInitialization(); String sourceDDL = String.format( @@ -488,6 +505,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { columnTypesDatabase = new UniqueDatabase(OB_SERVER, "column_type_test"); columnTypesDatabase.createAndInitialize("mysql"); + waitForTableInitialization(); String sourceDDL = String.format( @@ -559,6 +577,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { public void testSnapshotOnly() throws Exception { inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory"); inventoryDatabase.createAndInitialize("mysql"); + waitForTableInitialization(); String sourceDDL = String.format( @@ -611,7 +630,7 @@ public class OceanBaseMySQLModeITCase extends OceanBaseTestBase { while (result.getJobClient().get().getJobStatus().get().equals(JobStatus.RUNNING)) { Thread.sleep(100); - // Waiting for job to quit, in case if + // Waiting for job to finish (SNAPSHOT job will end spontaneously) } } }