This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 663312cba4c branch-4.0: [fix](job) fix streaming job stuck when S3 
auth error is silently ignored in fetchRemoteMeta #61284 (#61296)
663312cba4c is described below

commit 663312cba4c1344ea041ffaabb393718b1f192e5
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 13 14:03:46 2026 +0800

    branch-4.0: [fix](job) fix streaming job stuck when S3 auth error is 
silently ignored in fetchRemoteMeta #61284 (#61296)
    
    Cherry-picked from #61284
    
    Co-authored-by: wudi <[email protected]>
---
 .../job/offset/s3/S3SourceOffsetProvider.java      | 21 ++++-
 ...st_streaming_insert_job_fetch_meta_error.groovy | 96 ++++++++++++++++++++++
 2 files changed, 114 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 6e24b3d3680..23996fb8f38 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.job.offset.s3;
 
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.GlobListResult;
@@ -69,6 +71,11 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
             filePath = storageProperties.validateAndNormalizeUri(uri);
             GlobListResult globListResult = 
fileSystem.globListWithLimit(filePath, rfiles, startFile,
                     jobProps.getS3BatchBytes(), jobProps.getS3BatchFiles());
+            if (globListResult == null || !globListResult.getStatus().ok()) {
+                String errMsg = globListResult != null
+                        ? globListResult.getStatus().getErrMsg() : "null 
result";
+                throw new RuntimeException("Failed to list S3 files: " + 
errMsg);
+            }
 
             if (!rfiles.isEmpty()) {
                 String bucket = globListResult.getBucket();
@@ -162,11 +169,19 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
             String filePath = storageProperties.validateAndNormalizeUri(uri);
             List<RemoteFile> objects = new ArrayList<>();
             GlobListResult globListResult = 
fileSystem.globListWithLimit(filePath, objects, startFile, 1, 1);
-            if (globListResult != null && !objects.isEmpty() && 
StringUtils.isNotEmpty(globListResult.getMaxFile())) {
+            // debug point: simulate globListWithLimit returning a failed 
status (e.g. S3 auth error)
+            if 
(DebugPointUtil.isEnable("S3SourceOffsetProvider.fetchRemoteMeta.error")) {
+                globListResult = new GlobListResult(new 
Status(Status.ErrCode.COMMON_ERROR,
+                        "debug point: simulated S3 auth error"));
+            }
+            if (globListResult == null || !globListResult.getStatus().ok()) {
+                String errMsg = globListResult != null
+                        ? globListResult.getStatus().getErrMsg() : "null 
result";
+                throw new Exception("Failed to list S3 files: " + errMsg);
+            }
+            if (!objects.isEmpty() && 
StringUtils.isNotEmpty(globListResult.getMaxFile())) {
                 maxEndFile = globListResult.getMaxFile();
             }
-        } catch (Exception e) {
-            throw e;
         }
     }
 
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
new file mode 100644
index 00000000000..fab3bf1397e
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Verify that when fetchRemoteMeta receives a failed GlobListResult (e.g. S3 
auth error),
+// the streaming job is correctly PAUSED rather than hanging indefinitely.
+suite("test_streaming_insert_job_fetch_meta_error", "nonConcurrent") {
+    def tableName = "test_streaming_insert_job_fetch_meta_error_tbl"
+    def jobName = "test_streaming_insert_job_fetch_meta_error_job"
+
+    sql """drop table if exists `${tableName}` force"""
+    sql """
+        DROP JOB IF EXISTS where jobname = '${jobName}'
+    """
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int NULL,
+            `c2` string NULL,
+            `c3` int  NULL,
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`c1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    
GetDebugPoint().enableDebugPointForAllFEs("S3SourceOffsetProvider.fetchRemoteMeta.error")
+    try {
+        sql """
+           CREATE JOB ${jobName}
+           ON STREAMING DO INSERT INTO ${tableName}
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+
+        try {
+            Awaitility.await().atMost(120, SECONDS)
+                    .pollInterval(2, SECONDS).until(
+                    {
+                        def jobRes = sql """ select Status from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                        log.info("jobRes: " + jobRes)
+                        jobRes.size() == 1 && 
'PAUSED'.equals(jobRes.get(0).get(0))
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex
+        }
+
+        def jobStatus = sql """select Status, ErrorMsg from 
jobs("type"="insert") where Name='${jobName}'"""
+        assert jobStatus.get(0).get(0) == "PAUSED"
+        assert jobStatus.get(0).get(1).contains("Failed to list S3 files")
+
+        sql """
+            DROP JOB IF EXISTS where jobname = '${jobName}'
+        """
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert") 
where Name='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllFEs("S3SourceOffsetProvider.fetchRemoteMeta.error")
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to