github-actions[bot] commented on code in PR #63471:
URL: https://github.com/apache/doris/pull/63471#discussion_r3279616995


##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_composite_pk.groovy:
##########
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_composite_pk", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_composite_pk_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_composite_pk_mysql"
+    def table2 = "streaming_full_pk_map_mysql"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+    sql """drop table if exists ${currentDb}.${table2} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // ===== Prepare MySQL side =====
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """
+            create table ${mysqlDb}.${table1} (
+              `tenant_id` int not null,
+              `order_id` bigint not null,
+              `order_no` varchar(64),
+              `amount` decimal(10,2),
+              primary key (`tenant_id`, `order_id`)
+            ) engine=innodb default charset=utf8;
+            """
+
+            // Snapshot rows: 2 tenants x 5 orders each.
+            sql """insert into ${mysqlDb}.${table1} values (1, 1001, 'A_001', 
10.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (1, 1002, 'A_002', 
20.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (1, 1003, 'A_003', 
30.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (1, 1004, 'A_004', 
40.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (1, 1005, 'A_005', 
50.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (2, 2001, 'B_001', 
100.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (2, 2002, 'B_002', 
200.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (2, 2003, 'B_003', 
300.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (2, 2004, 'B_004', 
400.00)"""
+            sql """insert into ${mysqlDb}.${table1} values (2, 2005, 'B_005', 
500.00)"""
+
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table2}"""
+            sql """
+            create table ${mysqlDb}.${table2} (
+              `user_id` int not null,
+              `role_id` int not null,
+              primary key (`user_id`, `role_id`)
+            ) engine=innodb default charset=utf8;
+            """
+            sql """insert into ${mysqlDb}.${table2} values (10, 100)"""
+            sql """insert into ${mysqlDb}.${table2} values (10, 101)"""
+            sql """insert into ${mysqlDb}.${table2} values (10, 102)"""
+            sql """insert into ${mysqlDb}.${table2} values (20, 200)"""
+            sql """insert into ${mysqlDb}.${table2} values (20, 201)"""
+            sql """insert into ${mysqlDb}.${table2} values (20, 202)"""
+        }
+
+        // snapshot_split_size=3 -> chunks cross the tenant boundary.
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${table1},${table2}",
+                    "offset" = "initial",
+                    "snapshot_split_size" = "3"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def cnt1 = sql """select count(1) from 
${currentDb}.${table1}"""
+                        def cnt2 = sql """select count(1) from 
${currentDb}.${table2}"""
+                        log.info("snapshot row count table1=${cnt1} 
table2=${cnt2}")
+                        cnt1.get(0).get(0) == 10 && cnt2.get(0).get(0) == 6
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        def showTbl = sql """show create table ${currentDb}.${table1}"""
+        def createInfo = showTbl[0][1]
+        log.info("create table: " + createInfo)
+        assert createInfo.contains("UNIQUE KEY(`order_id`, `tenant_id`)")

Review Comment:
   This assertion codifies a schema order that is the reverse of the MySQL 
source table (`PRIMARY KEY (tenant_id, order_id)` at lines 47-53). The second 
table does the same at line 129 (`role_id, user_id` vs source `user_id, 
role_id`), while the PostgreSQL composite-PK test in this PR expects source 
order. That means this regression would pass even if the MySQL CDC table 
creation is reversing composite-key columns, and the generated `.out` also 
records the reversed `desc` order. Please either fix the MySQL mapping so the 
Doris UNIQUE KEY preserves the source PK order, or adjust the test only if 
there is a documented reason MySQL must intentionally reverse it.



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_pause_resume.groovy:
##########
@@ -114,19 +114,32 @@ 
suite("test_streaming_postgres_job_async_split_pause_resume",
             throw ex
         }
 
-        // Capture state, sleep, recapture — succeedTaskCount must not grow 
while paused.
+        // PAUSE is best-effort on the FE side: it stops new chunk dispatch, 
but any chunk
+        // already in flight on cdc_client keeps running until its stream load 
commits. Wait
+        // for the in-flight chunk(s) to settle (row count stable across two 
samples) before
+        // sampling, otherwise the post-PAUSE row count would still be growing 
under us.
+        long lastRows = -1L
+        Awaitility.await().atMost(60, SECONDS).pollInterval(3, SECONDS).until({
+            long cur = sql("""SELECT COUNT(*) FROM 
${currentDb}.${table1}""").get(0).get(0) as long
+            boolean stable = cur == lastRows

Review Comment:
   Two equal row-count samples 3 seconds apart do not prove that all in-flight 
snapshot chunks have committed. A slow chunk can take longer than this before 
its stream-load commit makes more rows visible, so this wait can succeed and 
then the following 15-second sleep still observes row growth, making the 
regression flaky under slower BE/load conditions. Please wait on a stronger 
signal that no task is still running/in-flight, or require stability for a 
window longer than the subsequent assertion interval.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to