Copilot commented on code in PR #59798:
URL: https://github.com/apache/doris/pull/59798#discussion_r2685995110


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -172,7 +172,7 @@ public SplitReadResult 
readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
                 // build split
                 Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
                 split = splitFlag.f0;
-                closeBinlogReader();
+                // closeBinlogReader();

Review Comment:
   Instead of leaving the commented-out code in place, consider removing it 
entirely since the functionality has been moved to the finishSplitRecords 
method. Leaving commented code reduces readability and can cause confusion 
about whether it might be needed in the future.
   ```suggestion
   
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -430,6 +431,10 @@ public void splitChunks(List<String> createTbls) throws 
JobException {
             this.remainingSplits = tableSplits.values().stream()
                     .flatMap(List::stream)
                     .collect(Collectors.toList());
+        } else {
+            // The source reader is automatically initialized when the split 
is obtained.
+            // In latest mode, a separate init is required.init source reader

Review Comment:
   The comment contains a grammatical error with a duplicate phrase. The 
sentence "In latest mode, a separate init is required.init source reader" 
appears to have accidentally duplicated "init" at the end. It should read "In 
latest mode, a separate init is required to initialize the source reader" or 
similar.
   ```suggestion
               // In latest mode, a separate init is required to initialize the 
source reader.
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy:
##########
@@ -197,12 +197,30 @@ suite("test_streaming_mysql_job_priv", 
"p0,external,mysql,external_docker,extern
             sql """FLUSH PRIVILEGES"""
         }
 
-        sleep(30000)
+        def jobSucceedTaskCnt = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+        log.info("jobSucceedTaskCnt: " + jobSucceedTaskCnt)
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobStatus = sql """ select status, 
SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobStatus: " + jobStatus)
+                        // check job status running and increase a success task
+                        jobStatus.size() == 1 && 'RUNNING' == 
jobStatus.get(0).get(0) && jobStatus.get(0).get(1) > 
jobSucceedTaskCnt.get(0).get(0)

Review Comment:
   The comparison uses string comparison '>=' for numeric values, which may 
lead to incorrect results. The SucceedTaskCount should be compared as an 
integer. For example, the string "9" is considered greater than "10" in 
lexicographic comparison. Consider converting to integer before comparison: 
jobStatus.get(0).get(1) as Integer > jobSucceedTaskCnt.get(0).get(0) as Integer
   ```suggestion
                           jobStatus.size() == 1 && 'RUNNING' == 
jobStatus.get(0).get(0) && (jobStatus.get(0).get(1) as Integer) > 
(jobSucceedTaskCnt.get(0).get(0) as Integer)
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy:
##########
@@ -135,7 +138,14 @@ suite("test_streaming_postgres_job_priv", 
"p0,external,pg,external_docker,extern
             sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age) 
VALUES ('Doris',18);"""
         }
 
-        sleep(30000)
+        Awaitility.await().atMost(300, SECONDS)
+                .pollInterval(3, SECONDS).until(
+                {
+                    def jobSucceedTaskCount = sql """select SucceedTaskCount 
from jobs("type"="insert") where Name='${jobName}'"""
+                    log.info("jobSucceedTaskCount: " + jobSucceedTaskCount)
+                    jobSucceedTaskCount.size() == 1 && 
jobSucceedTaskCount.get(0).get(0) >= '2'

Review Comment:
   The comparison uses string comparison '>=' for numeric values, which may 
lead to incorrect results. The SucceedTaskCount should be compared as an 
integer. For example, the string "9" is considered greater than "10" in 
lexicographic comparison. Consider converting to integer before comparison: 
jobSucceedTaskCount.get(0).get(0) as Integer >= 2



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -188,7 +188,7 @@ public SplitReadResult 
readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
                 Tuple2<MySqlSplit, Boolean> splitFlag = 
createMySqlSplit(offsetMeta, baseReq);
                 split = splitFlag.f0;
                 // reset binlog reader
-                closeBinlogReader();
+                // closeBinlogReader();

Review Comment:
   Instead of leaving the commented-out code in place, consider removing it 
entirely since the functionality has been moved to the finishSplitRecords 
method. Leaving commented code reduces readability and can cause confusion 
about whether it might be needed in the future.
   ```suggestion
   
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy:
##########
@@ -135,7 +138,14 @@ suite("test_streaming_postgres_job_priv", 
"p0,external,pg,external_docker,extern
             sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age) 
VALUES ('Doris',18);"""
         }
 
-        sleep(30000)
+        Awaitility.await().atMost(300, SECONDS)
+                .pollInterval(3, SECONDS).until(
+                {
+                    def jobSucceedTaskCount = sql """select SucceedTaskCount 
from jobs("type"="insert") where Name='${jobName}'"""
+                    log.info("jobSucceedTaskCount: " + jobSucceedTaskCount)
+                    jobSucceedTaskCount.size() == 1 && 
jobSucceedTaskCount.get(0).get(0) >= '2'

Review Comment:
   The comparison uses string comparison '>=' for numeric values, which may 
lead to incorrect results. The SucceedTaskCount should be compared as an 
integer. For example, the string "9" is considered greater than "10" in 
lexicographic comparison. Consider converting to integer before comparison: 
jobSucceedTaskCount.get(0).get(0) as Integer >= 2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to