This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a69cf852159 [fix](job) fix streaming job stuck when S3 auth error is
silently ignored in fetchRemoteMeta (#61284)
a69cf852159 is described below
commit a69cf8521595172e7c96a6bd3c6bb4c61df13224
Author: wudi <[email protected]>
AuthorDate: Fri Mar 13 10:56:21 2026 +0800
[fix](job) fix streaming job stuck when S3 auth error is silently ignored
in fetchRemoteMeta (#61284)
### What problem does this PR solve?
#### Problem
When S3 credentials become invalid (e.g. 403 auth error), the streaming
job neither pauses nor reports an error — it hang, even add new files.
indefinitely without making progress.
#### Root cause:
S3ObjStorage.globListInternal() catches all exceptions and returns a
GlobListResult with a non-ok Status instead of
rethrowing. S3SourceOffsetProvider.fetchRemoteMeta() called
globListWithLimit() but never checked the returned status.
Since objects was empty, the maxEndFile was never updated,
hasMoreDataToConsume() kept returning false, and the scheduler
retried every 500ms forever without triggering a PAUSE.
The same status check was also missing in getNextOffset(), which would
produce a misleading "No new files found" error
instead of the actual S3 error message.
#### Fix
- In fetchRemoteMeta(): check globListResult status after
globListWithLimit(); throw Exception with the real error message
if not ok, so the upper-level StreamingInsertJob.fetchMeta() catch block
can catch it, set GET_REMOTE_DATA_ERROR, and PAUSE
the job for auto-resume.
- In getNextOffset(): same status check, throw RuntimeException with
accurate error message.
- Add a debug point S3SourceOffsetProvider.fetchRemoteMeta.error to
simulate a failed GlobListResult for testing.
#### Test
Added regression test test_streaming_insert_job_fetch_meta_error:
enables the debug point to inject a failed
GlobListResult, creates a streaming job, waits for it to reach PAUSED
status, and asserts the ErrorMsg contains "Failed to
list S3 files".
---
.../job/offset/s3/S3SourceOffsetProvider.java | 21 ++++-
...st_streaming_insert_job_fetch_meta_error.groovy | 96 ++++++++++++++++++++++
2 files changed, 114 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 6e24b3d3680..23996fb8f38 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -17,6 +17,8 @@
package org.apache.doris.job.offset.s3;
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.GlobListResult;
@@ -69,6 +71,11 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
filePath = storageProperties.validateAndNormalizeUri(uri);
GlobListResult globListResult =
fileSystem.globListWithLimit(filePath, rfiles, startFile,
jobProps.getS3BatchBytes(), jobProps.getS3BatchFiles());
+ if (globListResult == null || !globListResult.getStatus().ok()) {
+ String errMsg = globListResult != null
+ ? globListResult.getStatus().getErrMsg() : "null
result";
+ throw new RuntimeException("Failed to list S3 files: " +
errMsg);
+ }
if (!rfiles.isEmpty()) {
String bucket = globListResult.getBucket();
@@ -162,11 +169,19 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
String filePath = storageProperties.validateAndNormalizeUri(uri);
List<RemoteFile> objects = new ArrayList<>();
GlobListResult globListResult =
fileSystem.globListWithLimit(filePath, objects, startFile, 1, 1);
- if (globListResult != null && !objects.isEmpty() &&
StringUtils.isNotEmpty(globListResult.getMaxFile())) {
+ // debug point: simulate globListWithLimit returning a failed
status (e.g. S3 auth error)
+ if
(DebugPointUtil.isEnable("S3SourceOffsetProvider.fetchRemoteMeta.error")) {
+ globListResult = new GlobListResult(new
Status(Status.ErrCode.COMMON_ERROR,
+ "debug point: simulated S3 auth error"));
+ }
+ if (globListResult == null || !globListResult.getStatus().ok()) {
+ String errMsg = globListResult != null
+ ? globListResult.getStatus().getErrMsg() : "null
result";
+ throw new Exception("Failed to list S3 files: " + errMsg);
+ }
+ if (!objects.isEmpty() &&
StringUtils.isNotEmpty(globListResult.getMaxFile())) {
maxEndFile = globListResult.getMaxFile();
}
- } catch (Exception e) {
- throw e;
}
}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
new file mode 100644
index 00000000000..fab3bf1397e
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Verify that when fetchRemoteMeta receives a failed GlobListResult (e.g. S3
auth error),
+// the streaming job is correctly PAUSED rather than hanging indefinitely.
+suite("test_streaming_insert_job_fetch_meta_error", "nonConcurrent") {
+ def tableName = "test_streaming_insert_job_fetch_meta_error_tbl"
+ def jobName = "test_streaming_insert_job_fetch_meta_error_job"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+
GetDebugPoint().enableDebugPointForAllFEs("S3SourceOffsetProvider.fetchRemoteMeta.error")
+ try {
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ try {
+ Awaitility.await().atMost(120, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def jobRes = sql """ select Status from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobRes: " + jobRes)
+ jobRes.size() == 1 &&
'PAUSED'.equals(jobRes.get(0).get(0))
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ def jobStatus = sql """select Status, ErrorMsg from
jobs("type"="insert") where Name='${jobName}'"""
+ assert jobStatus.get(0).get(0) == "PAUSED"
+ assert jobStatus.get(0).get(1).contains("Failed to list S3 files")
+
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+
+ } finally {
+
GetDebugPoint().disableDebugPointForAllFEs("S3SourceOffsetProvider.fetchRemoteMeta.error")
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]