This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 641b3861ee4 [Fix](job) add thread num config for streaming task exec
(#57230)
641b3861ee4 is described below
commit 641b3861ee4d7d17398ca8eb3d8abd90c7b3a663
Author: wudi <[email protected]>
AuthorDate: Wed Oct 22 22:45:41 2025 +0800
[Fix](job) add thread num config for streaming task exec (#57230)
### What problem does this PR solve?
Add thread num config for streaming task exec
Currently, coreSize=0. New threads are created only when the task
exceeds the queue length.
If the queue is not full, only one thread will execute.
---
fe/fe-common/src/main/java/org/apache/doris/common/Config.java | 5 +++++
.../org/apache/doris/job/scheduler/StreamingTaskScheduler.java | 8 +++++---
2 files changed, 10 insertions(+), 3 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 99103ae4244..a49d3b941e4 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1976,6 +1976,11 @@ public class Config extends ConfigBase {
+ " greater than 0, otherwise it defaults to 3." })
public static int job_dictionary_task_consumer_thread_num = 3;
+ @ConfField(masterOnly = true, description = {"用于执行 Streaming
任务的线程数,值应该大于0,否则默认为10",
+ "The number of threads used to execute Streaming Tasks, "
+ + "the value should be greater than 0, if it is <=0,
default is 10."})
+ public static int job_streaming_task_exec_thread_num = 10;
+
@ConfField(masterOnly = true, description = {"最大的 Streaming
作业数量,值应该大于0,否则默认为1024",
"The maximum number of Streaming jobs, "
+ "the value should be greater than 0, if it is <=0,
default is 1024."})
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 91b6f475bfc..7e99ca3ada9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -41,9 +41,9 @@ import java.util.concurrent.TimeUnit;
@Log4j2
public class StreamingTaskScheduler extends MasterDaemon {
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
+ Config.job_streaming_task_exec_thread_num,
+ Config.job_streaming_task_exec_thread_num,
0,
- Config.max_streaming_job_num,
- 60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(Config.max_streaming_job_num),
new CustomThreadFactory("streaming-task-execute"),
@@ -120,9 +120,11 @@ public class StreamingTaskScheduler extends MasterDaemon {
log.info("prepare to schedule task, task id: {}, job id: {}",
task.getTaskId(), task.getJobId());
job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);
-
+ long start = System.currentTimeMillis();
try {
task.execute();
+ log.info("Finished executing task, task id: {}, job id: {}, cost
{}ms",
+ task.getTaskId(), task.getJobId(),
System.currentTimeMillis() - start);
} catch (Exception e) {
log.error("Failed to execute task, task id: {}, job id: {}",
task.getTaskId(), task.getJobId(), e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]