This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 64f4c7496cd branch-4.0: [fix](streaming-job) fix S3 offset and job
statistics lost after FE checkpoint restart #62449 (#62626)
64f4c7496cd is described below
commit 64f4c7496cd86451c691b1bcb7033e433cb49091
Author: wudi <[email protected]>
AuthorDate: Sun May 10 10:31:00 2026 +0800
branch-4.0: [fix](streaming-job) fix S3 offset and job statistics lost
after FE checkpoint restart #62449 (#62626)
Cherry-picked from #62449
---
.../insert/streaming/StreamingInsertJob.java | 5 +
.../insert/streaming/StreamingJobStatistic.java | 3 +
.../doris/job/offset/SourceOffsetProvider.java | 8 ++
.../job/offset/s3/S3SourceOffsetProvider.java | 42 ++++++
...g_job_alter_offset_checkpoint_restart_fe.groovy | 151 +++++++++++++++++++++
...test_streaming_job_checkpoint_restart_fe.groovy | 144 ++++++++++++++++++++
6 files changed, 353 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 2562685261f..8f6616134e4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -638,6 +638,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() +
attachment.getNumFiles());
this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() +
attachment.getFileBytes());
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+ // Sync offsetProviderPersist after each offset update so the
checkpoint thread
+ // (which replays journals on its own Env) writes the latest offset
into the image.
+ this.offsetProviderPersist = offsetProvider.getPersistInfo();
//update metric
if (MetricRepo.isInit && !isReplay) {
@@ -744,6 +747,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty()) &&
this.tvfType != null) {
Offset offset =
validateOffset(inputStreamProps.getOffsetProperty());
this.offsetProvider.updateOffset(offset);
+ this.offsetProviderPersist = offsetProvider.getPersistInfo();
if (Config.isCloudMode()) {
resetCloudProgress(offset);
}
@@ -1101,6 +1105,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (offsetProvider == null) {
if (tvfType != null) {
offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
+ offsetProvider.restoreFromPersistInfo(offsetProviderPersist);
} else {
offsetProvider = new JdbcSourceOffsetProvider(getJobId(),
dataSourceType, sourceProperties);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
index 80d0da3de23..61d68b84a27 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
@@ -34,12 +34,15 @@ public class StreamingJobStatistic {
private long loadBytes;
@Getter
@Setter
+ @SerializedName("fileNumber")
private long fileNumber;
@Getter
@Setter
+ @SerializedName("fileSize")
private long fileSize;
@Getter
@Setter
+ @SerializedName("filteredRows")
private long filteredRows;
public String toJson() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index 892231444e3..306cc737814 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -110,5 +110,13 @@ public interface SourceOffsetProvider {
return null;
}
+ /**
+ * Restore offset from persisted string during image load
(gsonPostProcess).
+ * Called immediately after the provider is created so that even PAUSED
jobs
+ * have the correct offset state.
+ */
+ default void restoreFromPersistInfo(String persistInfo) {
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 23996fb8f38..0296792267d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -24,6 +24,7 @@ import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.GlobListResult;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
@@ -197,6 +198,47 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
return false;
}
+ @Override
+ public String getPersistInfo() {
+ if (currentOffset == null) {
+ return null;
+ }
+ return currentOffset.toSerializedJson();
+ }
+
+ @Override
+ public void restoreFromPersistInfo(String persistInfo) {
+ if (persistInfo == null) {
+ return;
+ }
+ try {
+ this.currentOffset = GsonUtils.GSON.fromJson(
+ persistInfo, S3Offset.class);
+ } catch (Exception e) {
+ log.warn("Failed to restore S3 offset from persistInfo", e);
+ }
+ }
+
+ @Override
+ public void replayIfNeed(StreamingInsertJob job) {
+ // If currentOffset was already set by EditLog replay
(replayOnCommitted -> updateOffset),
+ // it reflects the latest committed state and should not be
overwritten by
+ // offsetProviderPersist which may be stale (e.g. txn replay runs
after ALTER replay).
+ if (currentOffset != null) {
+ log.info("S3 offset for job {} already set by EditLog replay:
endFile={}",
+ job.getJobId(), currentOffset.getEndFile());
+ return;
+ }
+ // Only restore from offsetProviderPersist when currentOffset is null,
+ // which means recovery is from checkpoint image without subsequent
EditLog replay.
+ String persist = job.getOffsetProviderPersist();
+ if (persist != null) {
+ this.currentOffset = GsonUtils.GSON.fromJson(persist,
S3Offset.class);
+ log.info("Restored S3 offset from checkpoint for job {}:
endFile={}",
+ job.getJobId(), currentOffset.getEndFile());
+ }
+ }
+
@Override
public Offset deserializeOffset(String offset) {
return GsonUtils.GSON.fromJson(offset, S3Offset.class);
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_checkpoint_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_checkpoint_restart_fe.groovy
new file mode 100644
index 00000000000..5c0dfc8a677
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_checkpoint_restart_fe.groovy
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Test that ALTER'd S3 offset survives FE checkpoint restart.
+// Flow: consume data → PAUSE → ALTER offset → checkpoint → restart FE
+// → verify offset preserved → RESUME → verify re-consumption from altered
offset.
+suite("test_streaming_job_alter_offset_checkpoint_restart_fe", "docker") {
+ def tableName = "test_streaming_job_alter_ckpt_tbl"
+ def jobName = "test_streaming_job_alter_ckpt_name"
+
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ options.cloudMode = null
+ options.feConfigs += [
+ 'edit_log_roll_num=1'
+ ]
+
+ docker(options) {
+ sql """drop table if exists `${tableName}` force"""
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // Create job with offset starting after example_0.csv
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES (
+ 'offset' = '{"fileName":"regression/load/data/example_0.csv"}'
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def cnt = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobSucceedCount: " + cnt)
+ cnt.size() == 1 && '1' <= cnt.get(0).get(0)
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ // Pause and ALTER offset to a position before all files
+ sql """PAUSE JOB where jobname = '${jobName}'"""
+ sql """
+ ALTER JOB ${jobName}
+ PROPERTIES (
+ 'offset' = '{"fileName":"regression/load/data/anoexist1234.csv"}'
+ )
+ """
+
+ def alterInfo = sql """
+ select currentOffset from jobs("type"="insert") where
Name='${jobName}'
+ """
+ log.info("after ALTER: " + alterInfo)
+ assert alterInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/anoexist1234.csv\"}"
+
+ // Wait for checkpoint to capture the altered offset in the image
+ log.info("Waiting 90 seconds for checkpoint to complete...")
+ sleep(90000)
+
+ // Restart FE - recover from checkpoint image
+ cluster.restartFrontends()
+ sleep(30000)
+ context.reconnectFe()
+
+ // Verify altered offset is preserved after checkpoint restart
+ def afterRestart = sql """
+ select status, currentOffset from jobs("type"="insert") where
Name='${jobName}'
+ """
+ log.info("after checkpoint restart: " + afterRestart)
+ assert afterRestart.get(0).get(0) == "PAUSED"
+ assert afterRestart.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/anoexist1234.csv\"}"
+
+ // Resume and verify job consumes all files from the altered offset
+ sql """RESUME JOB where jobname = '${jobName}'"""
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def st = sql """ select status, SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobStatus: " + st)
+ st.size() == 1 && st.get(0).get(0) == 'RUNNING' && '2'
<= st.get(0).get(1)
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def finalInfo = sql """
+ select currentOffset from jobs("type"="insert") where
Name='${jobName}'
+ """
+ log.info("final offset: " + finalInfo)
+ assert finalInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists `${tableName}` force"""
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_checkpoint_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_checkpoint_restart_fe.groovy
new file mode 100644
index 00000000000..3d54930bd8e
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_checkpoint_restart_fe.groovy
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Test that streaming job loadStatistic (fileNumber, fileSize, filteredRows)
+// survives FE checkpoint restart. This guards against fields missing
+// @SerializedName in StreamingJobStatistic being lost when the checkpoint
+// image is written via GsonUtils.GSON (which uses
HiddenAnnotationExclusionStrategy
+// to skip fields without @SerializedName).
+suite("test_streaming_job_checkpoint_restart_fe", "docker") {
+ def tableName = "test_streaming_job_ckpt_restart_tbl"
+ def jobName = "test_streaming_job_ckpt_restart_name"
+
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ options.cloudMode = null
+ // Set edit_log_roll_num=1 so that every journal write triggers a roll,
+ // ensuring finalized journals exist for the checkpoint thread to pick up.
+ options.feConfigs += [
+ 'edit_log_roll_num=1'
+ ]
+
+ docker(options) {
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // Create streaming job
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES(
+ "s3.max_batch_files" = "1"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ try {
+ // Wait for all files to be processed. With example_[0-1].csv and
+ // s3.max_batch_files=1, processing completes in two successful
tasks.
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSucceedCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name like '%${jobName}%' and
ExecuteType='STREAMING' """
+ log.info("jobSucceedCount: " + jobSucceedCount)
+ jobSucceedCount.size() == 1 && '2' <=
jobSucceedCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobInfo = sql """
+ select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfo before checkpoint: " + jobInfo)
+ def loadStatBefore = parseJson(jobInfo.get(0).get(2))
+ log.info("loadStatistic before checkpoint: " + jobInfo.get(0).get(2))
+ assert loadStatBefore.scannedRows > 0
+ assert loadStatBefore.loadBytes > 0
+ assert loadStatBefore.fileNumber > 0
+ assert loadStatBefore.fileSize > 0
+
+ // Wait for checkpoint thread to execute.
+ // Checkpoint thread runs every 60 seconds
(FeConstants.checkpoint_interval_second).
+ // With edit_log_roll_num=1, journals are finalized immediately,
+ // so the next checkpoint cycle will generate a new image containing
+ // the streaming job state.
+ log.info("Waiting 90 seconds for checkpoint to complete...")
+ sleep(90000)
+
+ // Restart FE - this time it should recover from the checkpoint image
+ cluster.restartFrontends()
+ sleep(30000)
+ context.reconnectFe()
+
+ // Verify loadStatistic is preserved after checkpoint restart
+ def jobInfoAfter = sql """
+ select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfo after checkpoint restart: " + jobInfoAfter)
+ def loadStatAfter = parseJson(jobInfoAfter.get(0).get(2))
+ log.info("loadStatistic after checkpoint restart: " +
jobInfoAfter.get(0).get(2))
+
+ // These assertions verify that all statistics fields survive the
checkpoint.
+ // Without @SerializedName on these fields, GsonUtils.GSON (which uses
+ // HiddenAnnotationExclusionStrategy) would skip them during
serialization,
+ // causing them to reset to 0 after loading from the checkpoint image.
+ assert loadStatAfter.scannedRows == loadStatBefore.scannedRows
+ assert loadStatAfter.loadBytes == loadStatBefore.loadBytes
+ assert loadStatAfter.fileNumber == loadStatBefore.fileNumber
+ assert loadStatAfter.fileSize == loadStatBefore.fileSize
+ assert loadStatAfter.filteredRows == loadStatBefore.filteredRows
+
+ sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
+ sql """drop table if exists `${tableName}` force"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]