This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a69cf852159 [fix](job) fix streaming job stuck when S3 auth error is 
silently ignored in fetchRemoteMeta (#61284)
a69cf852159 is described below

commit a69cf8521595172e7c96a6bd3c6bb4c61df13224
Author: wudi <[email protected]>
AuthorDate: Fri Mar 13 10:56:21 2026 +0800

    [fix](job) fix streaming job stuck when S3 auth error is silently ignored 
in fetchRemoteMeta (#61284)
    
    ### What problem does this PR solve?
    
    #### Problem
    
    When S3 credentials become invalid (e.g. 403 auth error), the streaming
    job neither pauses nor reports an error — it hang, even add new files.
      indefinitely without making progress.
    
    #### Root cause:
    
    S3ObjStorage.globListInternal() catches all exceptions and returns a
    GlobListResult with a non-ok Status instead of
    rethrowing. S3SourceOffsetProvider.fetchRemoteMeta() called
    globListWithLimit() but never checked the returned status.
    Since objects was empty, the maxEndFile was never updated,
    hasMoreDataToConsume() kept returning false, and the scheduler
      retried every 500ms forever without triggering a PAUSE.
    
    The same status check was also missing in getNextOffset(), which would
    produce a misleading "No new files found" error
      instead of the actual S3 error message.
    
    #### Fix
    
    - In fetchRemoteMeta(): check globListResult status after
    globListWithLimit(); throw Exception with the real error message
    if not ok, so the upper-level StreamingInsertJob.fetchMeta() catch block
    can catch it, set GET_REMOTE_DATA_ERROR, and PAUSE
       the job for auto-resume.
    - In getNextOffset(): same status check, throw RuntimeException with
    accurate error message.
    - Add a debug point S3SourceOffsetProvider.fetchRemoteMeta.error to
    simulate a failed GlobListResult for testing.
    
    #### Test
    
    Added regression test test_streaming_insert_job_fetch_meta_error:
    enables the debug point to inject a failed
    GlobListResult, creates a streaming job, waits for it to reach PAUSED
    status, and asserts the ErrorMsg contains "Failed to
      list S3 files".
---
 .../job/offset/s3/S3SourceOffsetProvider.java      | 21 ++++-
 ...st_streaming_insert_job_fetch_meta_error.groovy | 96 ++++++++++++++++++++++
 2 files changed, 114 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 6e24b3d3680..23996fb8f38 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.job.offset.s3;
 
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.GlobListResult;
@@ -69,6 +71,11 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
             filePath = storageProperties.validateAndNormalizeUri(uri);
             GlobListResult globListResult = 
fileSystem.globListWithLimit(filePath, rfiles, startFile,
                     jobProps.getS3BatchBytes(), jobProps.getS3BatchFiles());
+            if (globListResult == null || !globListResult.getStatus().ok()) {
+                String errMsg = globListResult != null
+                        ? globListResult.getStatus().getErrMsg() : "null 
result";
+                throw new RuntimeException("Failed to list S3 files: " + 
errMsg);
+            }
 
             if (!rfiles.isEmpty()) {
                 String bucket = globListResult.getBucket();
@@ -162,11 +169,19 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
             String filePath = storageProperties.validateAndNormalizeUri(uri);
             List<RemoteFile> objects = new ArrayList<>();
             GlobListResult globListResult = 
fileSystem.globListWithLimit(filePath, objects, startFile, 1, 1);
-            if (globListResult != null && !objects.isEmpty() && 
StringUtils.isNotEmpty(globListResult.getMaxFile())) {
+            // debug point: simulate globListWithLimit returning a failed 
status (e.g. S3 auth error)
+            if 
(DebugPointUtil.isEnable("S3SourceOffsetProvider.fetchRemoteMeta.error")) {
+                globListResult = new GlobListResult(new 
Status(Status.ErrCode.COMMON_ERROR,
+                        "debug point: simulated S3 auth error"));
+            }
+            if (globListResult == null || !globListResult.getStatus().ok()) {
+                String errMsg = globListResult != null
+                        ? globListResult.getStatus().getErrMsg() : "null 
result";
+                throw new Exception("Failed to list S3 files: " + errMsg);
+            }
+            if (!objects.isEmpty() && 
StringUtils.isNotEmpty(globListResult.getMaxFile())) {
                 maxEndFile = globListResult.getMaxFile();
             }
-        } catch (Exception e) {
-            throw e;
         }
     }
 
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
new file mode 100644
index 00000000000..fab3bf1397e
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Verify that when fetchRemoteMeta receives a failed GlobListResult (e.g. S3 
auth error),
+// the streaming job is correctly PAUSED rather than hanging indefinitely.
+suite("test_streaming_insert_job_fetch_meta_error", "nonConcurrent") {
+    def tableName = "test_streaming_insert_job_fetch_meta_error_tbl"
+    def jobName = "test_streaming_insert_job_fetch_meta_error_job"
+
+    sql """drop table if exists `${tableName}` force"""
+    sql """
+        DROP JOB IF EXISTS where jobname = '${jobName}'
+    """
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int NULL,
+            `c2` string NULL,
+            `c3` int  NULL,
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`c1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    
GetDebugPoint().enableDebugPointForAllFEs("S3SourceOffsetProvider.fetchRemoteMeta.error")
+    try {
+        sql """
+           CREATE JOB ${jobName}
+           ON STREAMING DO INSERT INTO ${tableName}
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+
+        try {
+            Awaitility.await().atMost(120, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def jobRes = sql """ select Status from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                        log.info("jobRes: " + jobRes)
+                        jobRes.size() == 1 && 
'PAUSED'.equals(jobRes.get(0).get(0))
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        def jobStatus = sql """select Status, ErrorMsg from 
jobs("type"="insert") where Name='${jobName}'"""
+        assert jobStatus.get(0).get(0) == "PAUSED"
+        assert jobStatus.get(0).get(1).contains("Failed to list S3 files")
+
+        sql """
+            DROP JOB IF EXISTS where jobname = '${jobName}'
+        """
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllFEs("S3SourceOffsetProvider.fetchRemoteMeta.error")
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to