This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new ddb5f00df [hotfix][tests] Fix unstable `OceanBaseMySQLModelITCase` 
(#3831)
ddb5f00df is described below

commit ddb5f00df5bab21e8c91e4a31574d446d8b7a3f7
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Fri Jan 3 13:43:10 2025 +0800

    [hotfix][tests] Fix unstable `OceanBaseMySQLModelITCase` (#3831)
    
    Signed-off-by: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
---
 .../oceanbase/table/OceanBaseMySQLModeITCase.java   | 21 ++++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
index 27d30b26b..a74bf035e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
@@ -104,10 +104,24 @@ public class OceanBaseMySQLModeITCase extends 
OceanBaseTestBase {
                 + String.format(" 'rootserver-list' = '%s'", 
METADATA.getRsList());
     }
 
+    /**
+     * Current OceanBase connector uses timestamp (in seconds) to mark the 
offset during the
+     * transition from {@code SNAPSHOT} to {@code STREAMING} mode. Thus, if 
some snapshot inserting
+     * events are too close to the transitioning offset, snapshot inserting 
events might be emitted
+     * multiple times. <br>
+     * This could be safely removed after switching to incremental snapshot 
framework which provides
+     * Exactly-once guarantee.
+     */
+    private void waitForTableInitialization() throws InterruptedException {
+        Thread.sleep(5000L);
+    }
+
     @Test
     public void testTableList() throws Exception {
         inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory");
         inventoryDatabase.createAndInitialize("mysql");
+        waitForTableInitialization();
+
         String sourceDDL =
                 String.format(
                         "CREATE TABLE ob_source ("
@@ -212,6 +226,8 @@ public class OceanBaseMySQLModeITCase extends 
OceanBaseTestBase {
     public void testMetadataColumns() throws Exception {
         inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory");
         inventoryDatabase.createAndInitialize("mysql");
+        waitForTableInitialization();
+
         String sourceDDL =
                 String.format(
                         "CREATE TABLE ob_source ("
@@ -297,6 +313,7 @@ public class OceanBaseMySQLModeITCase extends 
OceanBaseTestBase {
 
         columnTypesDatabase = new UniqueDatabase(OB_SERVER, 
"column_type_test");
         columnTypesDatabase.createAndInitialize("mysql");
+        waitForTableInitialization();
 
         String sourceDDL =
                 String.format(
@@ -488,6 +505,7 @@ public class OceanBaseMySQLModeITCase extends 
OceanBaseTestBase {
 
         columnTypesDatabase = new UniqueDatabase(OB_SERVER, 
"column_type_test");
         columnTypesDatabase.createAndInitialize("mysql");
+        waitForTableInitialization();
 
         String sourceDDL =
                 String.format(
@@ -559,6 +577,7 @@ public class OceanBaseMySQLModeITCase extends 
OceanBaseTestBase {
     public void testSnapshotOnly() throws Exception {
         inventoryDatabase = new UniqueDatabase(OB_SERVER, "inventory");
         inventoryDatabase.createAndInitialize("mysql");
+        waitForTableInitialization();
 
         String sourceDDL =
                 String.format(
@@ -611,7 +630,7 @@ public class OceanBaseMySQLModeITCase extends 
OceanBaseTestBase {
 
         while 
(result.getJobClient().get().getJobStatus().get().equals(JobStatus.RUNNING)) {
             Thread.sleep(100);
-            // Waiting for job to quit, in case if
+            // Waiting for job to finish (SNAPSHOT job will end spontaneously)
         }
     }
 }

Reply via email to