This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 12e0e1ec046 branch-4.0: [Fix](StreamingJob) fix the first split task 
scheduled and fe restart remainsplit relay problem #59883 (#59902)
12e0e1ec046 is described below

commit 12e0e1ec04678f2eb2043ff3ecc821abe5fcb69a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jan 15 18:01:40 2026 +0800

    branch-4.0: [Fix](StreamingJob) fix the first split task scheduled and fe 
restart remainsplit relay problem #59883 (#59902)
    
    Cherry-picked from #59883
    
    Co-authored-by: wudi <[email protected]>
---
 .../insert/streaming/StreamingInsertJob.java       |  2 +-
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  | 42 +++++++++++++---------
 .../apache/doris/job/util/StreamingJobUtils.java   | 19 ++++++----
 .../source/reader/mysql/MySqlSourceReader.java     |  2 +-
 4 files changed, 40 insertions(+), 25 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 30f115b3ba5..4dade2b4ec8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -1133,7 +1133,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     public void replayOffsetProviderIfNeed() throws JobException {
-        if (this.offsetProviderPersist != null && offsetProvider != null) {
+        if (offsetProvider != null) {
             offsetProvider.replayIfNeed(this);
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index 0c114ae8e64..51f3c07aa41 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -332,19 +332,19 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     @Override
     public void replayIfNeed(StreamingInsertJob job) throws JobException {
         String offsetProviderPersist = job.getOffsetProviderPersist();
-        if (job.getOffsetProviderPersist() == null) {
-            return;
-        }
-        JdbcSourceOffsetProvider replayFromPersist = 
GsonUtils.GSON.fromJson(offsetProviderPersist,
-                JdbcSourceOffsetProvider.class);
-        this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist();
-        this.chunkHighWatermarkMap = 
replayFromPersist.getChunkHighWatermarkMap();
-
-        if (MapUtils.isNotEmpty(binlogOffsetPersist)) {
-            currentOffset = new JdbcOffset();
-            currentOffset.setSplit(new BinlogSplit(binlogOffsetPersist));
-        } else {
-            try {
+        if (offsetProviderPersist != null) {
+            JdbcSourceOffsetProvider replayFromPersist = 
GsonUtils.GSON.fromJson(offsetProviderPersist,
+                    JdbcSourceOffsetProvider.class);
+            this.binlogOffsetPersist = 
replayFromPersist.getBinlogOffsetPersist();
+            this.chunkHighWatermarkMap = 
replayFromPersist.getChunkHighWatermarkMap();
+            log.info("Replaying offset provider for job {}, binlogOffset size 
{}, chunkHighWatermark size {}",
+                    getJobId(),
+                    binlogOffsetPersist == null ? 0 : 
binlogOffsetPersist.size(),
+                    chunkHighWatermarkMap == null ? 0 : 
chunkHighWatermarkMap.size());
+            if (MapUtils.isNotEmpty(binlogOffsetPersist)) {
+                currentOffset = new JdbcOffset();
+                currentOffset.setSplit(new BinlogSplit(binlogOffsetPersist));
+            } else {
                 Map<String, List<SnapshotSplit>> snapshotSplits = 
StreamingJobUtils.restoreSplitsToJob(job.getJobId());
                 if (MapUtils.isNotEmpty(chunkHighWatermarkMap) && 
MapUtils.isNotEmpty(snapshotSplits)) {
                     SnapshotSplit lastSnapshotSplit = 
recalculateRemainingSplits(chunkHighWatermarkMap, snapshotSplits);
@@ -353,10 +353,20 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
                         currentOffset.setSplit(lastSnapshotSplit);
                     }
                 }
-            } catch (Exception ex) {
-                log.warn("Replay snapshot splits error with job {} ", 
job.getJobId(), ex);
-                throw new JobException(ex);
             }
+        } else if (checkNeedSplitChunks(sourceProperties)
+                    && CollectionUtils.isEmpty(remainingSplits)
+                    && CollectionUtils.isEmpty(finishedSplits)
+                    && MapUtils.isEmpty(chunkHighWatermarkMap)
+                    && MapUtils.isEmpty(binlogOffsetPersist)) {
+            // After the Job is created for the first time, starting from the 
initial offset,
+            // the task for the first split is scheduled, When the task status 
is running or failed,
+            // If FE restarts, the split needs to be restore from the meta 
again.
+            log.info("Replaying offset provider for job {}, 
offsetProviderPersist is empty", getJobId());
+            Map<String, List<SnapshotSplit>> snapshotSplits = 
StreamingJobUtils.restoreSplitsToJob(job.getJobId());
+            recalculateRemainingSplits(new HashMap<>(), snapshotSplits);
+        } else {
+            log.info("No need to replay offset provider for job {}", 
getJobId());
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index bac12ae3eba..05c75707124 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -120,7 +120,7 @@ public class StreamingJobUtils {
         }
     }
 
-    public static Map<String, List<SnapshotSplit>> restoreSplitsToJob(Long 
jobId) throws IOException {
+    public static Map<String, List<SnapshotSplit>> restoreSplitsToJob(Long 
jobId) throws JobException {
         List<ResultRow> resultRows;
         String sql = String.format(SELECT_SPLITS_TABLE_TEMPLATE, jobId);
         try (AutoCloseConnectContext context
@@ -130,12 +130,17 @@ public class StreamingJobUtils {
         }
 
         Map<String, List<SnapshotSplit>> tableSplits = new LinkedHashMap<>();
-        for (ResultRow row : resultRows) {
-            String tableName = row.get(0);
-            String chunkListStr = row.get(1);
-            List<SnapshotSplit> splits =
-                    new 
ArrayList<>(Arrays.asList(objectMapper.readValue(chunkListStr, 
SnapshotSplit[].class)));
-            tableSplits.put(tableName, splits);
+        try {
+            for (ResultRow row : resultRows) {
+                String tableName = row.get(0);
+                String chunkListStr = row.get(1);
+                List<SnapshotSplit> splits =
+                        new 
ArrayList<>(Arrays.asList(objectMapper.readValue(chunkListStr, 
SnapshotSplit[].class)));
+                tableSplits.put(tableName, splits);
+            }
+        } catch (IOException ex) {
+            log.warn("Failed to deserialize snapshot splits from job {} meta 
table: {}", jobId, ex.getMessage());
+            throw new JobException(ex);
         }
         return tableSplits;
     }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index a3f14a953b6..795deb55c26 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -433,7 +433,7 @@ public class MySqlSourceReader implements SourceReader {
         SourceRecords sourceRecords = null;
         String currentSplitId = null;
         DebeziumReader<SourceRecords, MySqlSplit> currentReader = null;
-        LOG.info("Get a split: {}", split.splitId());
+        LOG.info("Get a split: {}", split.toString());
         if (split instanceof MySqlSnapshotSplit) {
             currentReader = getSnapshotSplitReader(jobConfig);
         } else if (split instanceof MySqlBinlogSplit) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to