This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new b987e64e541 branch-3.1: [fix](job) fix routine load task scheduler
block for one job can not find any BE #52654 (#52792)
b987e64e541 is described below
commit b987e64e541d31c89e02c0a88a0d900fd059b2fb
Author: hui lai <[email protected]>
AuthorDate: Tue Jul 8 14:16:11 2025 +0800
branch-3.1: [fix](job) fix routine load task scheduler block for one job
can not find any BE #52654 (#52792)
pick #52654
---
.../apache/doris/load/routineload/RoutineLoadManager.java | 12 +++++++++++-
.../doris/load/routineload/RoutineLoadTaskScheduler.java | 4 ++--
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 23b36f11a4b..169af97670f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -498,8 +498,18 @@ public class RoutineLoadManager implements Writable {
// check if the specified BE is available for running task
// return true if it is available. return false if otherwise.
// throw exception if unrecoverable errors happen.
- public long getAvailableBeForTask(long jobId, long previousBeId) throws
LoadException {
+ public long getAvailableBeForTask(long jobId, long previousBeId) throws
UserException {
List<Long> availableBeIds = getAvailableBackendIds(jobId);
+ if (availableBeIds.isEmpty()) {
+ RoutineLoadJob job = getJob(jobId);
+ if (job != null) {
+ String msg = "no available BE found for job " + jobId
+ + "please check the BE status and user's cluster or
tags";
+ job.updateState(RoutineLoadJob.JobState.PAUSED,
+ new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
msg), false /* not replay */);
+ }
+ return -1L;
+ }
// check if be has idle slot
readLock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index d40a6705626..040ca103004 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -147,7 +147,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
// this should be done before txn begin, or the txn may be begun
successfully but failed to be allocated.
if (!allocateTaskToBe(routineLoadTaskInfo)) {
// allocate failed, push it back to the queue to wait next
scheduling
- needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
+ needScheduleTasksQueue.addLast(routineLoadTaskInfo);
return;
}
} catch (UserException e) {
@@ -311,7 +311,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
// 2. If not, try to find a better one with most idle slots.
// return true if allocate successfully. return false if failed.
// throw exception if unrecoverable errors happen.
- private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo)
throws LoadException {
+ private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo)
throws UserException {
long beId =
routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getJobId(),
routineLoadTaskInfo.getPreviousBeId());
if (beId == -1L) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]