Copilot commented on code in PR #59798:
URL: https://github.com/apache/doris/pull/59798#discussion_r2685995110
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -172,7 +172,7 @@ public SplitReadResult
readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
// build split
Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
split = splitFlag.f0;
- closeBinlogReader();
+ // closeBinlogReader();
Review Comment:
Instead of leaving the commented-out code in place, consider removing it
entirely since the functionality has been moved to the finishSplitRecords
method. Leaving commented code reduces readability and can cause confusion
about whether it might be needed in the future.
```suggestion
```
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -430,6 +431,10 @@ public void splitChunks(List<String> createTbls) throws
JobException {
this.remainingSplits = tableSplits.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
+ } else {
+ // The source reader is automatically initialized when the split
is obtained.
+ // In latest mode, a separate init is required.init source reader
Review Comment:
The comment contains a grammatical error with a duplicate phrase. The
sentence "In latest mode, a separate init is required.init source reader"
appears to have accidentally duplicated "init" at the end. It should read "In
latest mode, a separate init is required to initialize the source reader" or
similar.
```suggestion
// In latest mode, a separate init is required to initialize the
source reader.
```
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy:
##########
@@ -197,12 +197,30 @@ suite("test_streaming_mysql_job_priv",
"p0,external,mysql,external_docker,extern
sql """FLUSH PRIVILEGES"""
}
- sleep(30000)
+ def jobSucceedTaskCnt = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobSucceedTaskCnt: " + jobSucceedTaskCnt)
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobStatus = sql """ select status,
SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
+ log.info("jobStatus: " + jobStatus)
+ // check job status running and increase a success task
+ jobStatus.size() == 1 && 'RUNNING' ==
jobStatus.get(0).get(0) && jobStatus.get(0).get(1) >
jobSucceedTaskCnt.get(0).get(0)
Review Comment:
The comparison uses string comparison '>=' for numeric values, which may
lead to incorrect results. The SucceedTaskCount should be compared as an
integer. For example, the string "9" is considered greater than "10" in
lexicographic comparison. Consider converting to integer before comparison:
jobStatus.get(0).get(1) as Integer > jobSucceedTaskCnt.get(0).get(0) as Integer
```suggestion
jobStatus.size() == 1 && 'RUNNING' ==
jobStatus.get(0).get(0) && (jobStatus.get(0).get(1) as Integer) >
(jobSucceedTaskCnt.get(0).get(0) as Integer)
```
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy:
##########
@@ -135,7 +138,14 @@ suite("test_streaming_postgres_job_priv",
"p0,external,pg,external_docker,extern
sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age)
VALUES ('Doris',18);"""
}
- sleep(30000)
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(3, SECONDS).until(
+ {
+ def jobSucceedTaskCount = sql """select SucceedTaskCount
from jobs("type"="insert") where Name='${jobName}'"""
+ log.info("jobSucceedTaskCount: " + jobSucceedTaskCount)
+ jobSucceedTaskCount.size() == 1 &&
jobSucceedTaskCount.get(0).get(0) >= '2'
Review Comment:
The comparison uses string comparison '>=' for numeric values, which may
lead to incorrect results. The SucceedTaskCount should be compared as an
integer. For example, the string "9" is considered greater than "10" in
lexicographic comparison. Consider converting to integer before comparison:
jobSucceedTaskCount.get(0).get(0) as Integer >= 2
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -188,7 +188,7 @@ public SplitReadResult
readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
Tuple2<MySqlSplit, Boolean> splitFlag =
createMySqlSplit(offsetMeta, baseReq);
split = splitFlag.f0;
// reset binlog reader
- closeBinlogReader();
+ // closeBinlogReader();
Review Comment:
Instead of leaving the commented-out code in place, consider removing it
entirely since the functionality has been moved to the finishSplitRecords
method. Leaving commented code reduces readability and can cause confusion
about whether it might be needed in the future.
```suggestion
```
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy:
##########
@@ -135,7 +138,14 @@ suite("test_streaming_postgres_job_priv",
"p0,external,pg,external_docker,extern
sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age)
VALUES ('Doris',18);"""
}
- sleep(30000)
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(3, SECONDS).until(
+ {
+ def jobSucceedTaskCount = sql """select SucceedTaskCount
from jobs("type"="insert") where Name='${jobName}'"""
+ log.info("jobSucceedTaskCount: " + jobSucceedTaskCount)
+ jobSucceedTaskCount.size() == 1 &&
jobSucceedTaskCount.get(0).get(0) >= '2'
Review Comment:
The comparison uses string comparison '>=' for numeric values, which may
lead to incorrect results. The SucceedTaskCount should be compared as an
integer. For example, the string "9" is considered greater than "10" in
lexicographic comparison. Consider converting to integer before comparison:
jobSucceedTaskCount.get(0).get(0) as Integer >= 2
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]