This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 12e0e1ec046 branch-4.0: [Fix](StreamingJob) fix the first split task
scheduled and fe restart remainsplit relay problem #59883 (#59902)
12e0e1ec046 is described below
commit 12e0e1ec04678f2eb2043ff3ecc821abe5fcb69a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jan 15 18:01:40 2026 +0800
branch-4.0: [Fix](StreamingJob) fix the first split task scheduled and fe
restart remainsplit relay problem #59883 (#59902)
Cherry-picked from #59883
Co-authored-by: wudi <[email protected]>
---
.../insert/streaming/StreamingInsertJob.java | 2 +-
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 42 +++++++++++++---------
.../apache/doris/job/util/StreamingJobUtils.java | 19 ++++++----
.../source/reader/mysql/MySqlSourceReader.java | 2 +-
4 files changed, 40 insertions(+), 25 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 30f115b3ba5..4dade2b4ec8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -1133,7 +1133,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
public void replayOffsetProviderIfNeed() throws JobException {
- if (this.offsetProviderPersist != null && offsetProvider != null) {
+ if (offsetProvider != null) {
offsetProvider.replayIfNeed(this);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 0c114ae8e64..51f3c07aa41 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -332,19 +332,19 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
@Override
public void replayIfNeed(StreamingInsertJob job) throws JobException {
String offsetProviderPersist = job.getOffsetProviderPersist();
- if (job.getOffsetProviderPersist() == null) {
- return;
- }
- JdbcSourceOffsetProvider replayFromPersist =
GsonUtils.GSON.fromJson(offsetProviderPersist,
- JdbcSourceOffsetProvider.class);
- this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist();
- this.chunkHighWatermarkMap =
replayFromPersist.getChunkHighWatermarkMap();
-
- if (MapUtils.isNotEmpty(binlogOffsetPersist)) {
- currentOffset = new JdbcOffset();
- currentOffset.setSplit(new BinlogSplit(binlogOffsetPersist));
- } else {
- try {
+ if (offsetProviderPersist != null) {
+ JdbcSourceOffsetProvider replayFromPersist =
GsonUtils.GSON.fromJson(offsetProviderPersist,
+ JdbcSourceOffsetProvider.class);
+ this.binlogOffsetPersist =
replayFromPersist.getBinlogOffsetPersist();
+ this.chunkHighWatermarkMap =
replayFromPersist.getChunkHighWatermarkMap();
+ log.info("Replaying offset provider for job {}, binlogOffset size
{}, chunkHighWatermark size {}",
+ getJobId(),
+ binlogOffsetPersist == null ? 0 :
binlogOffsetPersist.size(),
+ chunkHighWatermarkMap == null ? 0 :
chunkHighWatermarkMap.size());
+ if (MapUtils.isNotEmpty(binlogOffsetPersist)) {
+ currentOffset = new JdbcOffset();
+ currentOffset.setSplit(new BinlogSplit(binlogOffsetPersist));
+ } else {
Map<String, List<SnapshotSplit>> snapshotSplits =
StreamingJobUtils.restoreSplitsToJob(job.getJobId());
if (MapUtils.isNotEmpty(chunkHighWatermarkMap) &&
MapUtils.isNotEmpty(snapshotSplits)) {
SnapshotSplit lastSnapshotSplit =
recalculateRemainingSplits(chunkHighWatermarkMap, snapshotSplits);
@@ -353,10 +353,20 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
currentOffset.setSplit(lastSnapshotSplit);
}
}
- } catch (Exception ex) {
- log.warn("Replay snapshot splits error with job {} ",
job.getJobId(), ex);
- throw new JobException(ex);
}
+ } else if (checkNeedSplitChunks(sourceProperties)
+ && CollectionUtils.isEmpty(remainingSplits)
+ && CollectionUtils.isEmpty(finishedSplits)
+ && MapUtils.isEmpty(chunkHighWatermarkMap)
+ && MapUtils.isEmpty(binlogOffsetPersist)) {
+ // After the Job is created for the first time, starting from the
initial offset,
+ // the task for the first split is scheduled, When the task status
is running or failed,
+ // If FE restarts, the split needs to be restore from the meta
again.
+ log.info("Replaying offset provider for job {},
offsetProviderPersist is empty", getJobId());
+ Map<String, List<SnapshotSplit>> snapshotSplits =
StreamingJobUtils.restoreSplitsToJob(job.getJobId());
+ recalculateRemainingSplits(new HashMap<>(), snapshotSplits);
+ } else {
+ log.info("No need to replay offset provider for job {}",
getJobId());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index bac12ae3eba..05c75707124 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -120,7 +120,7 @@ public class StreamingJobUtils {
}
}
- public static Map<String, List<SnapshotSplit>> restoreSplitsToJob(Long
jobId) throws IOException {
+ public static Map<String, List<SnapshotSplit>> restoreSplitsToJob(Long
jobId) throws JobException {
List<ResultRow> resultRows;
String sql = String.format(SELECT_SPLITS_TABLE_TEMPLATE, jobId);
try (AutoCloseConnectContext context
@@ -130,12 +130,17 @@ public class StreamingJobUtils {
}
Map<String, List<SnapshotSplit>> tableSplits = new LinkedHashMap<>();
- for (ResultRow row : resultRows) {
- String tableName = row.get(0);
- String chunkListStr = row.get(1);
- List<SnapshotSplit> splits =
- new
ArrayList<>(Arrays.asList(objectMapper.readValue(chunkListStr,
SnapshotSplit[].class)));
- tableSplits.put(tableName, splits);
+ try {
+ for (ResultRow row : resultRows) {
+ String tableName = row.get(0);
+ String chunkListStr = row.get(1);
+ List<SnapshotSplit> splits =
+ new
ArrayList<>(Arrays.asList(objectMapper.readValue(chunkListStr,
SnapshotSplit[].class)));
+ tableSplits.put(tableName, splits);
+ }
+ } catch (IOException ex) {
+ log.warn("Failed to deserialize snapshot splits from job {} meta
table: {}", jobId, ex.getMessage());
+ throw new JobException(ex);
}
return tableSplits;
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index a3f14a953b6..795deb55c26 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -433,7 +433,7 @@ public class MySqlSourceReader implements SourceReader {
SourceRecords sourceRecords = null;
String currentSplitId = null;
DebeziumReader<SourceRecords, MySqlSplit> currentReader = null;
- LOG.info("Get a split: {}", split.splitId());
+ LOG.info("Get a split: {}", split.toString());
if (split instanceof MySqlSnapshotSplit) {
currentReader = getSnapshotSplitReader(jobConfig);
} else if (split instanceof MySqlBinlogSplit) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]