This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 64f4c7496cd branch-4.0: [fix](streaming-job) fix S3 offset and job 
statistics lost after FE checkpoint restart #62449 (#62626)
64f4c7496cd is described below

commit 64f4c7496cd86451c691b1bcb7033e433cb49091
Author: wudi <[email protected]>
AuthorDate: Sun May 10 10:31:00 2026 +0800

    branch-4.0: [fix](streaming-job) fix S3 offset and job statistics lost 
after FE checkpoint restart #62449 (#62626)
    
    Cherry-picked from #62449
---
 .../insert/streaming/StreamingInsertJob.java       |   5 +
 .../insert/streaming/StreamingJobStatistic.java    |   3 +
 .../doris/job/offset/SourceOffsetProvider.java     |   8 ++
 .../job/offset/s3/S3SourceOffsetProvider.java      |  42 ++++++
 ...g_job_alter_offset_checkpoint_restart_fe.groovy | 151 +++++++++++++++++++++
 ...test_streaming_job_checkpoint_restart_fe.groovy | 144 ++++++++++++++++++++
 6 files changed, 353 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 2562685261f..8f6616134e4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -638,6 +638,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + 
attachment.getNumFiles());
         this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + 
attachment.getFileBytes());
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+        // Sync offsetProviderPersist after each offset update so the 
checkpoint thread
+        // (which replays journals on its own Env) writes the latest offset 
into the image.
+        this.offsetProviderPersist = offsetProvider.getPersistInfo();
 
         //update metric
         if (MetricRepo.isInit && !isReplay) {
@@ -744,6 +747,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty()) && 
this.tvfType != null) {
             Offset offset = 
validateOffset(inputStreamProps.getOffsetProperty());
             this.offsetProvider.updateOffset(offset);
+            this.offsetProviderPersist = offsetProvider.getPersistInfo();
             if (Config.isCloudMode()) {
                 resetCloudProgress(offset);
             }
@@ -1101,6 +1105,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         if (offsetProvider == null) {
             if (tvfType != null) {
                 offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
+                offsetProvider.restoreFromPersistInfo(offsetProviderPersist);
             } else {
                 offsetProvider = new JdbcSourceOffsetProvider(getJobId(), 
dataSourceType, sourceProperties);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
index 80d0da3de23..61d68b84a27 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
@@ -34,12 +34,15 @@ public class StreamingJobStatistic {
     private long loadBytes;
     @Getter
     @Setter
+    @SerializedName("fileNumber")
     private long fileNumber;
     @Getter
     @Setter
+    @SerializedName("fileSize")
     private long fileSize;
     @Getter
     @Setter
+    @SerializedName("filteredRows")
     private long filteredRows;
 
     public String toJson() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index 892231444e3..306cc737814 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -110,5 +110,13 @@ public interface SourceOffsetProvider {
         return null;
     }
 
+    /**
+     * Restore offset from persisted string during image load 
(gsonPostProcess).
+     * Called immediately after the provider is created so that even PAUSED 
jobs
+     * have the correct offset state.
+     */
+    default void restoreFromPersistInfo(String persistInfo) {
+    }
+
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 23996fb8f38..0296792267d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -24,6 +24,7 @@ import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.GlobListResult;
 import org.apache.doris.fs.remote.RemoteFile;
 import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
 import org.apache.doris.job.offset.Offset;
 import org.apache.doris.job.offset.SourceOffsetProvider;
@@ -197,6 +198,47 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
         return false;
     }
 
+    @Override
+    public String getPersistInfo() {
+        if (currentOffset == null) {
+            return null;
+        }
+        return currentOffset.toSerializedJson();
+    }
+
+    @Override
+    public void restoreFromPersistInfo(String persistInfo) {
+        if (persistInfo == null) {
+            return;
+        }
+        try {
+            this.currentOffset = GsonUtils.GSON.fromJson(
+                    persistInfo, S3Offset.class);
+        } catch (Exception e) {
+            log.warn("Failed to restore S3 offset from persistInfo", e);
+        }
+    }
+
+    @Override
+    public void replayIfNeed(StreamingInsertJob job) {
+        // If currentOffset was already set by EditLog replay 
(replayOnCommitted -> updateOffset),
+        // it reflects the latest committed state and should not be 
overwritten by
+        // offsetProviderPersist which may be stale (e.g. txn replay runs 
after ALTER replay).
+        if (currentOffset != null) {
+            log.info("S3 offset for job {} already set by EditLog replay: 
endFile={}",
+                    job.getJobId(), currentOffset.getEndFile());
+            return;
+        }
+        // Only restore from offsetProviderPersist when currentOffset is null,
+        // which means recovery is from checkpoint image without subsequent 
EditLog replay.
+        String persist = job.getOffsetProviderPersist();
+        if (persist != null) {
+            this.currentOffset = GsonUtils.GSON.fromJson(persist, 
S3Offset.class);
+            log.info("Restored S3 offset from checkpoint for job {}: 
endFile={}",
+                    job.getJobId(), currentOffset.getEndFile());
+        }
+    }
+
     @Override
     public Offset deserializeOffset(String offset) {
         return GsonUtils.GSON.fromJson(offset, S3Offset.class);
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_checkpoint_restart_fe.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_checkpoint_restart_fe.groovy
new file mode 100644
index 00000000000..5c0dfc8a677
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_checkpoint_restart_fe.groovy
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Test that ALTER'd S3 offset survives FE checkpoint restart.
+// Flow: consume data → PAUSE → ALTER offset → checkpoint → restart FE
+// → verify offset preserved → RESUME → verify re-consumption from altered 
offset.
+suite("test_streaming_job_alter_offset_checkpoint_restart_fe", "docker") {
+    def tableName = "test_streaming_job_alter_ckpt_tbl"
+    def jobName = "test_streaming_job_alter_ckpt_name"
+
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.cloudMode = null
+    options.feConfigs += [
+        'edit_log_roll_num=1'
+    ]
+
+    docker(options) {
+        sql """drop table if exists `${tableName}` force"""
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `c1` int NULL,
+                `c2` string NULL,
+                `c3` int  NULL,
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`c1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        // Create job with offset starting after example_0.csv
+        sql """
+           CREATE JOB ${jobName}
+           PROPERTIES (
+            'offset' = '{"fileName":"regression/load/data/example_0.csv"}'
+           )
+           ON STREAMING DO INSERT INTO ${tableName}
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def cnt = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                        log.info("jobSucceedCount: " + cnt)
+                        cnt.size() == 1 && '1' <= cnt.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        // Pause and ALTER offset to a position before all files
+        sql """PAUSE JOB where jobname = '${jobName}'"""
+        sql """
+        ALTER JOB ${jobName}
+        PROPERTIES (
+            'offset' = '{"fileName":"regression/load/data/anoexist1234.csv"}'
+        )
+        """
+
+        def alterInfo = sql """
+            select currentOffset from jobs("type"="insert") where 
Name='${jobName}'
+        """
+        log.info("after ALTER: " + alterInfo)
+        assert alterInfo.get(0).get(0) == 
"{\"fileName\":\"regression/load/data/anoexist1234.csv\"}"
+
+        // Wait for checkpoint to capture the altered offset in the image
+        log.info("Waiting 90 seconds for checkpoint to complete...")
+        sleep(90000)
+
+        // Restart FE - recover from checkpoint image
+        cluster.restartFrontends()
+        sleep(30000)
+        context.reconnectFe()
+
+        // Verify altered offset is preserved after checkpoint restart
+        def afterRestart = sql """
+            select status, currentOffset from jobs("type"="insert") where 
Name='${jobName}'
+        """
+        log.info("after checkpoint restart: " + afterRestart)
+        assert afterRestart.get(0).get(0) == "PAUSED"
+        assert afterRestart.get(0).get(1) == 
"{\"fileName\":\"regression/load/data/anoexist1234.csv\"}"
+
+        // Resume and verify job consumes all files from the altered offset
+        sql """RESUME JOB where jobname = '${jobName}'"""
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def st = sql """ select status, SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                        log.info("jobStatus: " + st)
+                        st.size() == 1 && st.get(0).get(0) == 'RUNNING' && '2' 
<= st.get(0).get(1)
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        def finalInfo = sql """
+            select currentOffset from jobs("type"="insert") where 
Name='${jobName}'
+        """
+        log.info("final offset: " + finalInfo)
+        assert finalInfo.get(0).get(0) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}"
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        sql """drop table if exists `${tableName}` force"""
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_checkpoint_restart_fe.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_checkpoint_restart_fe.groovy
new file mode 100644
index 00000000000..3d54930bd8e
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_checkpoint_restart_fe.groovy
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Test that streaming job loadStatistic (fileNumber, fileSize, filteredRows)
+// survives FE checkpoint restart. This guards against fields missing
+// @SerializedName in StreamingJobStatistic being lost when the checkpoint
+// image is written via GsonUtils.GSON (which uses 
HiddenAnnotationExclusionStrategy
+// to skip fields without @SerializedName).
+suite("test_streaming_job_checkpoint_restart_fe", "docker") {
+    def tableName = "test_streaming_job_ckpt_restart_tbl"
+    def jobName = "test_streaming_job_ckpt_restart_name"
+
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.cloudMode = null
+    // Set edit_log_roll_num=1 so that every journal write triggers a roll,
+    // ensuring finalized journals exist for the checkpoint thread to pick up.
+    options.feConfigs += [
+        'edit_log_roll_num=1'
+    ]
+
+    docker(options) {
+        sql """drop table if exists `${tableName}` force"""
+        sql """
+            DROP JOB IF EXISTS where jobname =  '${jobName}'
+        """
+
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `c1` int NULL,
+                `c2` string NULL,
+                `c3` int  NULL,
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`c1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        // Create streaming job
+        sql """
+           CREATE JOB ${jobName}
+           PROPERTIES(
+            "s3.max_batch_files" = "1"
+           )
+           ON STREAMING DO INSERT INTO ${tableName}
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+
+        try {
+            // Wait for all files to be processed. With example_[0-1].csv and
+            // s3.max_batch_files=1, processing completes in two successful 
tasks.
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSucceedCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name like '%${jobName}%' and 
ExecuteType='STREAMING' """
+                        log.info("jobSucceedCount: " + jobSucceedCount)
+                        jobSucceedCount.size() == 1 && '2' <= 
jobSucceedCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        def jobInfo = sql """
+            select currentOffset, endoffset, loadStatistic from 
jobs("type"="insert") where Name='${jobName}'
+        """
+        log.info("jobInfo before checkpoint: " + jobInfo)
+        def loadStatBefore = parseJson(jobInfo.get(0).get(2))
+        log.info("loadStatistic before checkpoint: " + jobInfo.get(0).get(2))
+        assert loadStatBefore.scannedRows > 0
+        assert loadStatBefore.loadBytes > 0
+        assert loadStatBefore.fileNumber > 0
+        assert loadStatBefore.fileSize > 0
+
+        // Wait for checkpoint thread to execute.
+        // Checkpoint thread runs every 60 seconds 
(FeConstants.checkpoint_interval_second).
+        // With edit_log_roll_num=1, journals are finalized immediately,
+        // so the next checkpoint cycle will generate a new image containing
+        // the streaming job state.
+        log.info("Waiting 90 seconds for checkpoint to complete...")
+        sleep(90000)
+
+        // Restart FE - this time it should recover from the checkpoint image
+        cluster.restartFrontends()
+        sleep(30000)
+        context.reconnectFe()
+
+        // Verify loadStatistic is preserved after checkpoint restart
+        def jobInfoAfter = sql """
+            select currentOffset, endoffset, loadStatistic from 
jobs("type"="insert") where Name='${jobName}'
+        """
+        log.info("jobInfo after checkpoint restart: " + jobInfoAfter)
+        def loadStatAfter = parseJson(jobInfoAfter.get(0).get(2))
+        log.info("loadStatistic after checkpoint restart: " + 
jobInfoAfter.get(0).get(2))
+
+        // These assertions verify that all statistics fields survive the 
checkpoint.
+        // Without @SerializedName on these fields, GsonUtils.GSON (which uses
+        // HiddenAnnotationExclusionStrategy) would skip them during 
serialization,
+        // causing them to reset to 0 after loading from the checkpoint image.
+        assert loadStatAfter.scannedRows == loadStatBefore.scannedRows
+        assert loadStatAfter.loadBytes == loadStatBefore.loadBytes
+        assert loadStatAfter.fileNumber == loadStatBefore.fileNumber
+        assert loadStatAfter.fileSize == loadStatBefore.fileSize
+        assert loadStatAfter.filteredRows == loadStatBefore.filteredRows
+
+        sql """ DROP JOB IF EXISTS where jobname =  '${jobName}' """
+        sql """drop table if exists `${tableName}` force"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to