This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3433da6e [testcase](cdc) add e2e test for MySql to Doris (#445)
3433da6e is described below

commit 3433da6e06a2320fb43a43834c6caa87234531a4
Author: Petrichor <[email protected]>
AuthorDate: Thu Jul 25 16:57:31 2024 +0800

    [testcase](cdc) add e2e test for MySql to Doris (#445)
---
 .../doris/flink/tools/cdc/MySQLDorisE2ECase.java   | 54 ++++++++++++++++++----
 1 file changed, 44 insertions(+), 10 deletions(-)

diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index ab1dfe77..6a613841 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -62,6 +62,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
     private static final String TABLE_2 = "tbl2";
     private static final String TABLE_3 = "tbl3";
     private static final String TABLE_4 = "tbl4";
+    private static final String TABLE_5 = "tbl5";
     private static final String TABLE_SQL_PARSE = "tbl_sql_parse";
 
     private static final MySQLContainer MYSQL_CONTAINER =
@@ -93,10 +94,13 @@ public class MySQLDorisE2ECase extends DorisTestBase {
         JobClient jobClient = submitJob();
         // wait 2 times checkpoint
         Thread.sleep(20000);
-        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3");
+        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3", "doris_5,5");
         String sql =
-                "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s ) res order by 1";
-        String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, 
TABLE_2, DATABASE, TABLE_3);
+                "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s union all select * from %s.%s) res order by 
1";
+        String query1 =
+                String.format(
+                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, 
TABLE_3, DATABASE,
+                        TABLE_5);
         checkResult(expected, query1, 2);
 
         // add incremental data
@@ -306,10 +310,13 @@ public class MySQLDorisE2ECase extends DorisTestBase {
 
         // wait 2 times checkpoint
         Thread.sleep(20000);
-        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3");
+        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3", "doris_5,5");
         String sql =
-                "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s ) res order by 1";
-        String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, 
TABLE_2, DATABASE, TABLE_3);
+                "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s union all select * from %s.%s) res order by 
1";
+        String query1 =
+                String.format(
+                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, 
TABLE_3, DATABASE,
+                        TABLE_5);
         checkResult(expected, query1, 2);
 
         // add incremental data
@@ -431,7 +438,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
         Map<String, String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
 
-        String includingTables = "tbl1|tbl2|tbl3";
+        String includingTables = "tbl1|tbl2|tbl3|tbl5";
         String excludingTables = "";
         DatabaseSync databaseSync = new MysqlDatabaseSync();
         databaseSync
@@ -457,10 +464,13 @@ public class MySQLDorisE2ECase extends DorisTestBase {
 
         // wait 2 times checkpoint
         Thread.sleep(20000);
-        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3");
+        List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3", "doris_5,5");
         String sql =
-                "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s ) res order by 1";
-        String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, 
TABLE_2, DATABASE, TABLE_3);
+                "select * from ( select * from %s.%s union all select * from 
%s.%s union all select * from %s.%s union all select * from %s.%s) res order by 
1";
+        String query1 =
+                String.format(
+                        sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, 
TABLE_3, DATABASE,
+                        TABLE_5);
         checkResult(expected, query1, 2);
 
         // add incremental data
@@ -503,6 +513,20 @@ public class MySQLDorisE2ECase extends DorisTestBase {
             statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, TABLE_2));
             statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, TABLE_3));
             statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, TABLE_4));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, TABLE_5));
+            // create a table in Doris
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE %s.%s ( \n"
+                                    + "`name` varchar(256),\n"
+                                    + "`age` int\n"
+                                    + ")\n"
+                                    + "UNIQUE KEY(`name`)\n"
+                                    + "DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+                                    + "PROPERTIES ( \n"
+                                    + "\"replication_num\" = \"1\" \n"
+                                    + ");\n",
+                            DATABASE, TABLE_5));
         }
     }
 
@@ -596,6 +620,7 @@ public class MySQLDorisE2ECase extends DorisTestBase {
             statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, TABLE_2));
             statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, TABLE_3));
             statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, TABLE_4));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", 
DATABASE, TABLE_5));
             statement.execute(
                     String.format(
                             "CREATE TABLE %s.%s ( \n"
@@ -617,6 +642,13 @@ public class MySQLDorisE2ECase extends DorisTestBase {
                                     + "`age` int\n"
                                     + ")",
                             DATABASE, TABLE_3));
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE %s.%s ( \n"
+                                    + "`name` varchar(256) primary key,\n"
+                                    + "`age` int\n"
+                                    + ")",
+                            DATABASE, TABLE_5));
             // mock stock data
             statement.execute(
                     String.format("insert into %s.%s  values ('doris_1',1)", 
DATABASE, TABLE_1));
@@ -624,6 +656,8 @@ public class MySQLDorisE2ECase extends DorisTestBase {
                     String.format("insert into %s.%s  values ('doris_2',2)", 
DATABASE, TABLE_2));
             statement.execute(
                     String.format("insert into %s.%s  values ('doris_3',3)", 
DATABASE, TABLE_3));
+            statement.execute(
+                    String.format("insert into %s.%s  values ('doris_5',5)", 
DATABASE, TABLE_5));
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to