This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new a33cc788609 branch-4.0: [Fix](Streamingjob) Fix the issue of 
synchronization failure under empty tables #59735 (#59790)
a33cc788609 is described below

commit a33cc788609d4f140c2bee7561390d5747a550de
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jan 13 10:05:49 2026 +0800

    branch-4.0: [Fix](Streamingjob) Fix the issue of synchronization failure 
under empty tables #59735 (#59790)
    
    Cherry-picked from #59735
    
    Co-authored-by: wudi <[email protected]>
---
 .../cdcclient/service/PipelineCoordinator.java     | 62 +++++++---------------
 .../doris/cdcclient/sink/DorisBatchStreamLoad.java | 13 +++++
 .../source/reader/JdbcIncrementalSourceReader.java |  5 +-
 .../streaming_job/cdc/test_streaming_mysql_job.out |  2 +
 .../cdc/test_streaming_mysql_job.groovy            | 14 ++++-
 5 files changed, 49 insertions(+), 47 deletions(-)

diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 591c4790e6c..187003ad0e6 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -25,7 +25,6 @@ import 
org.apache.doris.cdcclient.source.reader.SplitReadResult;
 import org.apache.doris.job.cdc.request.FetchRecordRequest;
 import org.apache.doris.job.cdc.request.WriteRecordRequest;
 import org.apache.doris.job.cdc.split.BinlogSplit;
-import org.apache.doris.job.cdc.split.SnapshotSplit;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.api.connector.source.SourceSplit;
@@ -125,32 +124,19 @@ public class PipelineCoordinator {
                         
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
                 offsetRes.put(SPLIT_ID, split.splitId());
                 recordResponse.setMeta(offsetRes);
-                return recordResponse;
             }
+
             // set meta for binlog event
             if (sourceReader.isBinlogSplit(split)) {
                 Map<String, String> offsetRes =
                         
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
                 offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-            }
-        }
-
-        // no data in this split, set meta info
-        if (CollectionUtils.isEmpty(recordResponse.getRecords())) {
-            if (sourceReader.isBinlogSplit(split)) {
-                Map<String, String> offsetRes =
-                        
sourceReader.extractBinlogOffset(readResult.getSplit());
-                offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
                 recordResponse.setMeta(offsetRes);
-            } else {
-                SnapshotSplit snapshotSplit =
-                        objectMapper.convertValue(fetchRecord.getMeta(), 
SnapshotSplit.class);
-                Map<String, String> meta = new HashMap<>();
-                meta.put(SPLIT_ID, snapshotSplit.getSplitId());
-                // chunk no data
-                recordResponse.setMeta(meta);
             }
+        } else {
+            throw new RuntimeException("split state is null");
         }
+
         sourceReader.commitSourceOffset(fetchRecord.getJobId(), 
readResult.getSplit());
         return recordResponse;
     }
@@ -188,7 +174,6 @@ public class PipelineCoordinator {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         Map<String, String> metaResponse = new HashMap<>();
-        boolean hasData = false;
         long scannedRows = 0L;
         long scannedBytes = 0L;
         SplitReadResult readResult = null;
@@ -215,21 +200,12 @@ public class PipelineCoordinator {
                 if (!CollectionUtils.isEmpty(serializedRecords)) {
                     String database = writeRecordRequest.getTargetDb();
                     String table = extractTable(element);
-                    hasData = true;
                     for (String record : serializedRecords) {
                         scannedRows++;
                         byte[] dataBytes = record.getBytes();
                         scannedBytes += dataBytes.length;
                         batchStreamLoad.writeRecord(database, table, 
dataBytes);
                     }
-
-                    if (sourceReader.isBinlogSplit(readResult.getSplit())) {
-                        // put offset for event
-                        Map<String, String> lastMeta =
-                                
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
-                        lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                        metaResponse = lastMeta;
-                    }
                 }
                 // Check if maxInterval has been exceeded
                 long elapsedTime = System.currentTimeMillis() - startTime;
@@ -245,29 +221,29 @@ public class PipelineCoordinator {
         }
 
         try {
-            if (!hasData) {
-                // todo: need return the lastest heartbeat offset, means the 
maximum offset that the
-                //  current job can recover.
+            if (readResult.getSplitState() != null) {
+                // Set meta information for hw
+                if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
+                    Map<String, String> offsetRes =
+                            
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
+                    offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
+                    metaResponse = offsetRes;
+                }
+
+                // set meta for binlog event
                 if (sourceReader.isBinlogSplit(readResult.getSplit())) {
                     Map<String, String> offsetRes =
-                            
sourceReader.extractBinlogOffset(readResult.getSplit());
+                            
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
                     offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
-                    batchStreamLoad.commitOffset(offsetRes, scannedRows, 
scannedBytes);
-                    return;
-                } else {
-                    throw new RuntimeException("should not happen");
+                    metaResponse = offsetRes;
                 }
+            } else {
+                throw new RuntimeException("split state is null");
             }
 
             // wait all stream load finish
             batchStreamLoad.forceFlush();
-            // update offset meta
-            if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
-                Map<String, String> offsetRes =
-                        
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
-                offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
-                metaResponse = offsetRes;
-            }
+
             // request fe api
             batchStreamLoad.commitOffset(metaResponse, scannedRows, 
scannedBytes);
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index bf6a4102801..1604b1c0305 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -80,6 +81,7 @@ public class DorisBatchStreamLoad implements Serializable {
     private BlockingQueue<BatchRecordBuffer> flushQueue;
     private final AtomicBoolean started;
     private volatile boolean loadThreadAlive = false;
+    private final CountDownLatch loadThreadStarted = new CountDownLatch(1);
     private AtomicReference<Throwable> exception = new AtomicReference<>(null);
     private long maxBlockedBytes;
     private final AtomicLong currentCacheBytes = new AtomicLong(0L);
@@ -110,6 +112,16 @@ public class DorisBatchStreamLoad implements Serializable {
         this.loadExecutorService.execute(loadAsyncExecutor);
         this.targetDb = targetDb;
         this.jobId = jobId;
+        // Wait for the load thread to start
+        try {
+            if (!loadThreadStarted.await(10, TimeUnit.SECONDS)) {
+                throw new RuntimeException("LoadAsyncExecutor thread startup 
timed out");
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(
+                    "Thread interrupted while waiting for load thread to 
start", e);
+        }
     }
 
     /**
@@ -310,6 +322,7 @@ public class DorisBatchStreamLoad implements Serializable {
         public void run() {
             LOG.info("LoadAsyncExecutor start for jobId {}", jobId);
             loadThreadAlive = true;
+            loadThreadStarted.countDown();
             List<BatchRecordBuffer> recordList = new 
ArrayList<>(flushQueueSize);
             while (started.get()) {
                 recordList.clear();
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index f9e11f6b029..541e3354828 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -166,8 +166,9 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
             Fetcher<SourceRecords, SourceSplitBase> currentReader = 
this.getCurrentReader();
             if (currentReader == null || baseReq.isReload()) {
                 LOG.info(
-                        "No current reader or reload {}, create new split 
reader",
-                        baseReq.isReload());
+                        "No current reader or reload {}, create new split 
reader for job {}",
+                        baseReq.isReload(),
+                        baseReq.getJobId());
                 // build split
                 Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
                 split = splitFlag.f0;
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out
index aebbb6815e3..f5e148123ae 100644
--- a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out
+++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out
@@ -7,6 +7,8 @@ B1      2
 A2     1
 B2     2
 
+-- !select_snapshot_table3 --
+
 -- !select_binlog_table1 --
 B1     10
 Doris  18
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
index d77e2b769bb..a6bc1d17431 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
@@ -25,11 +25,13 @@ suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_do
     def currentDb = (sql "select database()")[0][0]
     def table1 = "user_info_normal1"
     def table2 = "user_info_normal2"
+    def table3 = "user_info_normal3_empty"
     def mysqlDb = "test_cdc_db"
 
     sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
     sql """drop table if exists ${currentDb}.${table1} force"""
     sql """drop table if exists ${currentDb}.${table2} force"""
+    sql """drop table if exists ${currentDb}.${table3} force"""
 
     // Pre-create table2
     sql """
@@ -71,6 +73,11 @@ suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_do
             // mock snapshot data
             sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('A2', 
1);"""
             sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('B2', 
2);"""
+            sql """CREATE TABLE ${mysqlDb}.${table3} (
+                  `name` varchar(200) NOT NULL,
+                  `age` int DEFAULT NULL,
+                  PRIMARY KEY (`name`)
+                ) ENGINE=InnoDB"""
         }
 
         sql """CREATE JOB ${jobName}
@@ -82,7 +89,7 @@ suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_do
                     "user" = "root",
                     "password" = "123456",
                     "database" = "${mysqlDb}",
-                    "include_tables" = "${table1},${table2}", 
+                    "include_tables" = "${table3},${table1},${table2}", 
                     "offset" = "initial"
                 )
                 TO DATABASE ${currentDb} (
@@ -96,6 +103,8 @@ suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_do
         assert showTables.size() == 1
         def showTables2 = sql """ show tables from ${currentDb} like 
'${table2}'; """
         assert showTables2.size() == 1
+        def showTables3 = sql """ show tables from ${currentDb} like 
'${table3}'; """
+        assert showTables3.size() == 1
 
         // check table schema correct
         def showTbl1 = sql """show create table ${currentDb}.${table1}"""
@@ -113,7 +122,7 @@ suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_do
                         def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
                         log.info("jobSuccendCount: " + jobSuccendCount)
                         // check job status and succeed task count larger than 
2
-                        jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
+                        jobSuccendCount.size() == 1 && '3' <= 
jobSuccendCount.get(0).get(0)
                     }
             )
         } catch (Exception ex){
@@ -127,6 +136,7 @@ suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_do
         // check snapshot data
         qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name 
asc """
         qt_select_snapshot_table2 """ SELECT * FROM ${table2} order by name 
asc """
+        qt_select_snapshot_table3 """ SELECT * FROM ${table3} order by name 
asc """
 
         // mock mysql incremental into
         connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to