This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new a33cc788609 branch-4.0: [Fix](Streamingjob) Fix the issue of
synchronization failure under empty tables #59735 (#59790)
a33cc788609 is described below
commit a33cc788609d4f140c2bee7561390d5747a550de
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jan 13 10:05:49 2026 +0800
branch-4.0: [Fix](Streamingjob) Fix the issue of synchronization failure
under empty tables #59735 (#59790)
Cherry-picked from #59735
Co-authored-by: wudi <[email protected]>
---
.../cdcclient/service/PipelineCoordinator.java | 62 +++++++---------------
.../doris/cdcclient/sink/DorisBatchStreamLoad.java | 13 +++++
.../source/reader/JdbcIncrementalSourceReader.java | 5 +-
.../streaming_job/cdc/test_streaming_mysql_job.out | 2 +
.../cdc/test_streaming_mysql_job.groovy | 14 ++++-
5 files changed, 49 insertions(+), 47 deletions(-)
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 591c4790e6c..187003ad0e6 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -25,7 +25,6 @@ import
org.apache.doris.cdcclient.source.reader.SplitReadResult;
import org.apache.doris.job.cdc.request.FetchRecordRequest;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
import org.apache.doris.job.cdc.split.BinlogSplit;
-import org.apache.doris.job.cdc.split.SnapshotSplit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.connector.source.SourceSplit;
@@ -125,32 +124,19 @@ public class PipelineCoordinator {
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
offsetRes.put(SPLIT_ID, split.splitId());
recordResponse.setMeta(offsetRes);
- return recordResponse;
}
+
// set meta for binlog event
if (sourceReader.isBinlogSplit(split)) {
Map<String, String> offsetRes =
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- }
- }
-
- // no data in this split, set meta info
- if (CollectionUtils.isEmpty(recordResponse.getRecords())) {
- if (sourceReader.isBinlogSplit(split)) {
- Map<String, String> offsetRes =
-
sourceReader.extractBinlogOffset(readResult.getSplit());
- offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
recordResponse.setMeta(offsetRes);
- } else {
- SnapshotSplit snapshotSplit =
- objectMapper.convertValue(fetchRecord.getMeta(),
SnapshotSplit.class);
- Map<String, String> meta = new HashMap<>();
- meta.put(SPLIT_ID, snapshotSplit.getSplitId());
- // chunk no data
- recordResponse.setMeta(meta);
}
+ } else {
+ throw new RuntimeException("split state is null");
}
+
sourceReader.commitSourceOffset(fetchRecord.getJobId(),
readResult.getSplit());
return recordResponse;
}
@@ -188,7 +174,6 @@ public class PipelineCoordinator {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
Map<String, String> metaResponse = new HashMap<>();
- boolean hasData = false;
long scannedRows = 0L;
long scannedBytes = 0L;
SplitReadResult readResult = null;
@@ -215,21 +200,12 @@ public class PipelineCoordinator {
if (!CollectionUtils.isEmpty(serializedRecords)) {
String database = writeRecordRequest.getTargetDb();
String table = extractTable(element);
- hasData = true;
for (String record : serializedRecords) {
scannedRows++;
byte[] dataBytes = record.getBytes();
scannedBytes += dataBytes.length;
batchStreamLoad.writeRecord(database, table,
dataBytes);
}
-
- if (sourceReader.isBinlogSplit(readResult.getSplit())) {
- // put offset for event
- Map<String, String> lastMeta =
-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
- lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- metaResponse = lastMeta;
- }
}
// Check if maxInterval has been exceeded
long elapsedTime = System.currentTimeMillis() - startTime;
@@ -245,29 +221,29 @@ public class PipelineCoordinator {
}
try {
- if (!hasData) {
- // todo: need return the lastest heartbeat offset, means the
maximum offset that the
- // current job can recover.
+ if (readResult.getSplitState() != null) {
+ // Set meta information for hw
+ if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
+ Map<String, String> offsetRes =
+
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
+ offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
+ metaResponse = offsetRes;
+ }
+
+ // set meta for binlog event
if (sourceReader.isBinlogSplit(readResult.getSplit())) {
Map<String, String> offsetRes =
-
sourceReader.extractBinlogOffset(readResult.getSplit());
+
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
- batchStreamLoad.commitOffset(offsetRes, scannedRows,
scannedBytes);
- return;
- } else {
- throw new RuntimeException("should not happen");
+ metaResponse = offsetRes;
}
+ } else {
+ throw new RuntimeException("split state is null");
}
// wait all stream load finish
batchStreamLoad.forceFlush();
- // update offset meta
- if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
- Map<String, String> offsetRes =
-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
- offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
- metaResponse = offsetRes;
- }
+
// request fe api
batchStreamLoad.commitOffset(metaResponse, scannedRows,
scannedBytes);
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index bf6a4102801..1604b1c0305 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
@@ -80,6 +81,7 @@ public class DorisBatchStreamLoad implements Serializable {
private BlockingQueue<BatchRecordBuffer> flushQueue;
private final AtomicBoolean started;
private volatile boolean loadThreadAlive = false;
+ private final CountDownLatch loadThreadStarted = new CountDownLatch(1);
private AtomicReference<Throwable> exception = new AtomicReference<>(null);
private long maxBlockedBytes;
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
@@ -110,6 +112,16 @@ public class DorisBatchStreamLoad implements Serializable {
this.loadExecutorService.execute(loadAsyncExecutor);
this.targetDb = targetDb;
this.jobId = jobId;
+ // Wait for the load thread to start
+ try {
+ if (!loadThreadStarted.await(10, TimeUnit.SECONDS)) {
+ throw new RuntimeException("LoadAsyncExecutor thread startup
timed out");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Thread interrupted while waiting for load thread to
start", e);
+ }
}
/**
@@ -310,6 +322,7 @@ public class DorisBatchStreamLoad implements Serializable {
public void run() {
LOG.info("LoadAsyncExecutor start for jobId {}", jobId);
loadThreadAlive = true;
+ loadThreadStarted.countDown();
List<BatchRecordBuffer> recordList = new
ArrayList<>(flushQueueSize);
while (started.get()) {
recordList.clear();
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index f9e11f6b029..541e3354828 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -166,8 +166,9 @@ public abstract class JdbcIncrementalSourceReader
implements SourceReader {
Fetcher<SourceRecords, SourceSplitBase> currentReader =
this.getCurrentReader();
if (currentReader == null || baseReq.isReload()) {
LOG.info(
- "No current reader or reload {}, create new split
reader",
- baseReq.isReload());
+ "No current reader or reload {}, create new split
reader for job {}",
+ baseReq.isReload(),
+ baseReq.getJobId());
// build split
Tuple2<SourceSplitBase, Boolean> splitFlag =
createSourceSplit(offsetMeta, baseReq);
split = splitFlag.f0;
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out
index aebbb6815e3..f5e148123ae 100644
--- a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out
+++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job.out
@@ -7,6 +7,8 @@ B1 2
A2 1
B2 2
+-- !select_snapshot_table3 --
+
-- !select_binlog_table1 --
B1 10
Doris 18
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
index d77e2b769bb..a6bc1d17431 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
@@ -25,11 +25,13 @@ suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_do
def currentDb = (sql "select database()")[0][0]
def table1 = "user_info_normal1"
def table2 = "user_info_normal2"
+ def table3 = "user_info_normal3_empty"
def mysqlDb = "test_cdc_db"
sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
sql """drop table if exists ${currentDb}.${table1} force"""
sql """drop table if exists ${currentDb}.${table2} force"""
+ sql """drop table if exists ${currentDb}.${table3} force"""
// Pre-create table2
sql """
@@ -71,6 +73,11 @@ suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_do
// mock snapshot data
sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('A2',
1);"""
sql """INSERT INTO ${mysqlDb}.${table2} (name, age) VALUES ('B2',
2);"""
+ sql """CREATE TABLE ${mysqlDb}.${table3} (
+ `name` varchar(200) NOT NULL,
+ `age` int DEFAULT NULL,
+ PRIMARY KEY (`name`)
+ ) ENGINE=InnoDB"""
}
sql """CREATE JOB ${jobName}
@@ -82,7 +89,7 @@ suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_do
"user" = "root",
"password" = "123456",
"database" = "${mysqlDb}",
- "include_tables" = "${table1},${table2}",
+ "include_tables" = "${table3},${table1},${table2}",
"offset" = "initial"
)
TO DATABASE ${currentDb} (
@@ -96,6 +103,8 @@ suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_do
assert showTables.size() == 1
def showTables2 = sql """ show tables from ${currentDb} like
'${table2}'; """
assert showTables2.size() == 1
+ def showTables3 = sql """ show tables from ${currentDb} like
'${table3}'; """
+ assert showTables3.size() == 1
// check table schema correct
def showTbl1 = sql """show create table ${currentDb}.${table1}"""
@@ -113,7 +122,7 @@ suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_do
def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
log.info("jobSuccendCount: " + jobSuccendCount)
// check job status and succeed task count larger than
2
- jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
+ jobSuccendCount.size() == 1 && '3' <=
jobSuccendCount.get(0).get(0)
}
)
} catch (Exception ex){
@@ -127,6 +136,7 @@ suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_do
// check snapshot data
qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name
asc """
qt_select_snapshot_table2 """ SELECT * FROM ${table2} order by name
asc """
+ qt_select_snapshot_table3 """ SELECT * FROM ${table3} order by name
asc """
// mock mysql incremental into
connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]