This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 747d76bf7ba branch-4.0: [fix](job) fix streaming job fails with "No
new files found" on second scheduling #61249 (#61302)
747d76bf7ba is described below
commit 747d76bf7ba460b15b1377f681a3e2706e5f045d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Mar 14 14:43:42 2026 +0800
branch-4.0: [fix](job) fix streaming job fails with "No new files found" on
second scheduling #61249 (#61302)
Cherry-picked from #61249
Co-authored-by: wudi <[email protected]>
---
.../java/org/apache/doris/fs/obj/S3ObjStorage.java | 27 +++--
.../org/apache/doris/job/common/FailureReason.java | 4 +-
...est_streaming_job_no_new_files_with_sibling.out | 12 +++
..._streaming_job_no_new_files_with_sibling.groovy | 110 +++++++++++++++++++++
4 files changed, 145 insertions(+), 8 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 33694b1a3d8..d9d0c7e9442 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -622,11 +622,25 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
boolean isTruncated = false;
boolean reachLimit = false;
+ String lastMatchedKey = "";
do {
roundCnt++;
ListObjectsV2Response response = listObjectsV2(request);
for (S3Object obj : response.contents()) {
elementCnt++;
+
+ // Limit already reached: scan remaining objects in this
page to find
+ // the next glob-matching key, so hasMoreDataToConsume()
returns true
+ // correctly without recording a non-matching raw S3 key
as currentMaxFile.
+ if (reachLimit) {
+ java.nio.file.Path checkPath = Paths.get(obj.key());
+ if (matcher.matches(checkPath)) {
+ currentMaxFile = obj.key();
+ break;
+ }
+ continue;
+ }
+
java.nio.file.Path objPath = Paths.get(obj.key());
boolean isPrefix = false;
@@ -654,6 +668,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
isPrefix ? 0 :
obj.lastModified().toEpochMilli()
);
result.add(remoteFile);
+ lastMatchedKey = obj.key();
if (hasLimits && reachLimit(result.size(),
matchFileSize, fileSizeLimit, fileNumLimit)) {
reachLimit = true;
@@ -663,15 +678,13 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
objPath = objPath.getParent();
isPrefix = true;
}
- if (reachLimit) {
- break;
- }
}
- // Record current max file for limit scenario
- if (!response.contents().isEmpty()) {
- S3Object lastS3Object =
response.contents().get(response.contents().size() - 1);
- currentMaxFile = lastS3Object.key();
+ // If no next matching file was found after the limit in the
current page,
+ // fall back to lastMatchedKey to avoid a non-matching raw S3
key
+ // (e.g. a sibling file like .lz4) being recorded as
currentMaxFile.
+ if (currentMaxFile.isEmpty()) {
+ currentMaxFile = lastMatchedKey;
}
isTruncated = response.isTruncated();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
index 7f2ba752812..4280d43bb66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
@@ -50,7 +50,9 @@ public class FailureReason implements Writable {
private static boolean isTooManyFailureRowsErr(String msg) {
return msg.contains("Insert has filtered data in strict mode")
- || msg.contains("too many filtered rows");
+ || msg.contains("too many filtered")
+ || msg.contains("Encountered unqualified data")
+ || msg.contains("parse number fail");
}
public InternalErrorCode getCode() {
diff --git
a/regression-test/data/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.out
b/regression-test/data/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.out
new file mode 100644
index 00000000000..cad3e045e85
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.out
@@ -0,0 +1,12 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+1 Emily 25
+2 Benjamin 35
+3 Olivia 28
+4 Alexander 60
+5 Ava 17
+6 William 69
+7 Sophia 32
+8 James 64
+9 Emma 37
+10 Liam 64
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.groovy
new file mode 100644
index 00000000000..96a6b061b3c
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.groovy
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Regression test for: streaming job second scheduling fails with "No new
files found"
+// when S3 listing returns non-matching sibling keys (e.g. example_1.csv)
after the last
+// matched file, causing currentMaxFile to be set to a non-matching raw S3 key.
+//
+// Pattern example_[0-0].csv matches only example_0.csv, but getLongestPrefix
strips
+// the bracket so S3 lists both example_0.csv and example_1.csv in the same
page.
+// Without the fix, currentMaxFile = "example_1.csv" triggers a second
scheduling
+// that finds no matching files and errors. With the fix, currentMaxFile =
"example_0.csv"
+// and hasMoreDataToConsume() correctly returns false.
+suite("test_streaming_job_no_new_files_with_sibling") {
+ def tableName = "test_streaming_job_no_new_files_with_sibling_tbl"
+ def jobName = "test_streaming_job_no_new_files_with_sibling_job"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // Use example_[0-0].csv: glob matches only example_0.csv, but S3 listing
prefix
+ // "example_" also returns example_1.csv, which does not match the pattern.
+ // This reproduces the "non-matching sibling key" scenario.
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-0].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ try {
+ // Wait for the first task to succeed
+ Awaitility.await().atMost(120, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def res = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("SucceedTaskCount: " + res)
+ res.size() == 1 && '1' <= res.get(0).get(0)
+ }
+ )
+
+ // Wait extra time to allow a potential second (buggy) scheduling
attempt
+ Thread.sleep(10000)
+
+ // Verify no failed tasks: the job should not have tried to
re-schedule and
+ // hit "No new files found" after all matched files are consumed.
+ def jobStatus = sql """
+ select Status, SucceedTaskCount, FailedTaskCount, ErrorMsg
+ from jobs("type"="insert") where Name = '${jobName}'
+ """
+ log.info("jobStatus: " + jobStatus)
+ assert jobStatus.get(0).get(2) == '0' : "Expected no failed tasks, but
got: " + jobStatus
+ assert jobStatus.get(0).get(0) != "STOPPED" : "Job should not be
stopped, status: " + jobStatus
+
+ qt_select """ SELECT * FROM ${tableName} order by c1 """
+
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ } finally {
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+ sql """drop table if exists `${tableName}` force"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]