This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 663312cba4c branch-4.0: [fix](job) fix streaming job stuck when S3
auth error is silently ignored in fetchRemoteMeta #61284 (#61296)
663312cba4c is described below
commit 663312cba4c1344ea041ffaabb393718b1f192e5
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 13 14:03:46 2026 +0800
branch-4.0: [fix](job) fix streaming job stuck when S3 auth error is
silently ignored in fetchRemoteMeta #61284 (#61296)
Cherry-picked from #61284
Co-authored-by: wudi <[email protected]>
---
.../job/offset/s3/S3SourceOffsetProvider.java | 21 ++++-
...st_streaming_insert_job_fetch_meta_error.groovy | 96 ++++++++++++++++++++++
2 files changed, 114 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 6e24b3d3680..23996fb8f38 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -17,6 +17,8 @@
package org.apache.doris.job.offset.s3;
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.GlobListResult;
@@ -69,6 +71,11 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
filePath = storageProperties.validateAndNormalizeUri(uri);
GlobListResult globListResult =
fileSystem.globListWithLimit(filePath, rfiles, startFile,
jobProps.getS3BatchBytes(), jobProps.getS3BatchFiles());
+ if (globListResult == null || !globListResult.getStatus().ok()) {
+ String errMsg = globListResult != null
+ ? globListResult.getStatus().getErrMsg() : "null
result";
+ throw new RuntimeException("Failed to list S3 files: " +
errMsg);
+ }
if (!rfiles.isEmpty()) {
String bucket = globListResult.getBucket();
@@ -162,11 +169,19 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
String filePath = storageProperties.validateAndNormalizeUri(uri);
List<RemoteFile> objects = new ArrayList<>();
GlobListResult globListResult =
fileSystem.globListWithLimit(filePath, objects, startFile, 1, 1);
- if (globListResult != null && !objects.isEmpty() &&
StringUtils.isNotEmpty(globListResult.getMaxFile())) {
+ // debug point: simulate globListWithLimit returning a failed
status (e.g. S3 auth error)
+ if
(DebugPointUtil.isEnable("S3SourceOffsetProvider.fetchRemoteMeta.error")) {
+ globListResult = new GlobListResult(new
Status(Status.ErrCode.COMMON_ERROR,
+ "debug point: simulated S3 auth error"));
+ }
+ if (globListResult == null || !globListResult.getStatus().ok()) {
+ String errMsg = globListResult != null
+ ? globListResult.getStatus().getErrMsg() : "null
result";
+ throw new Exception("Failed to list S3 files: " + errMsg);
+ }
+ if (!objects.isEmpty() &&
StringUtils.isNotEmpty(globListResult.getMaxFile())) {
maxEndFile = globListResult.getMaxFile();
}
- } catch (Exception e) {
- throw e;
}
}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
new file mode 100644
index 00000000000..fab3bf1397e
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Verify that when fetchRemoteMeta receives a failed GlobListResult (e.g. S3
auth error),
+// the streaming job is correctly PAUSED rather than hanging indefinitely.
+suite("test_streaming_insert_job_fetch_meta_error", "nonConcurrent") {
+ def tableName = "test_streaming_insert_job_fetch_meta_error_tbl"
+ def jobName = "test_streaming_insert_job_fetch_meta_error_job"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+
GetDebugPoint().enableDebugPointForAllFEs("S3SourceOffsetProvider.fetchRemoteMeta.error")
+ try {
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ try {
+ Awaitility.await().atMost(120, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ def jobRes = sql """ select Status from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobRes: " + jobRes)
+ jobRes.size() == 1 &&
'PAUSED'.equals(jobRes.get(0).get(0))
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ def jobStatus = sql """select Status, ErrorMsg from
jobs("type"="insert") where Name='${jobName}'"""
+ assert jobStatus.get(0).get(0) == "PAUSED"
+ assert jobStatus.get(0).get(1).contains("Failed to list S3 files")
+
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+
+ } finally {
+
GetDebugPoint().disableDebugPointForAllFEs("S3SourceOffsetProvider.fetchRemoteMeta.error")
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]