This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 04da5beed39 [enhancement](stats) limit bq cap size for analyze task
#27685 (#27687)
04da5beed39 is described below
commit 04da5beed39610a7eb849b41fad123d7aeffdbb4
Author: AKIRA <[email protected]>
AuthorDate: Wed Nov 29 02:59:40 2023 +0800
[enhancement](stats) limit bq cap size for analyze task #27685 (#27687)
---
.../src/main/java/org/apache/doris/statistics/AnalysisManager.java | 3 ++-
.../main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java | 6 +++++-
.../main/java/org/apache/doris/statistics/StatisticConstants.java | 2 +-
.../java/org/apache/doris/statistics/StatisticsAutoCollector.java | 3 ++-
.../main/java/org/apache/doris/statistics/StatisticsCollector.java | 6 ------
5 files changed, 10 insertions(+), 10 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 0c13d5eb6d8..61c72926f2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -248,7 +248,8 @@ public class AnalysisManager implements Writable {
public AnalysisManager() {
if (!Env.isCheckpointThread()) {
- this.taskExecutor = new
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num);
+ this.taskExecutor = new
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
+ Integer.MAX_VALUE);
this.statisticsCache = new StatisticsCache();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index fb4530837e4..3bdccaca047 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -43,11 +43,15 @@ public class AnalysisTaskExecutor {
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
+ this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE);
+ }
+
+ public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int
taskQueueSize) {
if (!Env.isCheckpointThread()) {
executors = ThreadPoolManager.newDaemonThreadPool(
simultaneouslyRunningTaskNum,
simultaneouslyRunningTaskNum, 0,
- TimeUnit.DAYS, new LinkedBlockingQueue<>(),
+ TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
new BlockedPolicy("Analysis Job Executor",
Integer.MAX_VALUE),
"Analysis Job Executor", true);
cancelExpiredTask();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index 9f1bd3bf681..ffb80074eee 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -96,7 +96,7 @@ public class StatisticConstants {
public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;
- public static final int SUBMIT_JOB_LIMIT = 5;
+ public static final int TASK_QUEUE_CAP = 10;
static {
SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 7ae7651421c..2eac82c91b6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -52,7 +52,8 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
public StatisticsAutoCollector() {
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
- new
AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num));
+ new
AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
+ StatisticConstants.TASK_QUEUE_CAP));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
index 63dcdab09ab..9d4c311523b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -35,8 +35,6 @@ public abstract class StatisticsCollector extends
MasterDaemon {
protected final AnalysisTaskExecutor analysisTaskExecutor;
- protected int submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
-
public StatisticsCollector(String name, long intervalMs,
AnalysisTaskExecutor analysisTaskExecutor) {
super(name, intervalMs);
this.analysisTaskExecutor = analysisTaskExecutor;
@@ -54,7 +52,6 @@ public abstract class StatisticsCollector extends
MasterDaemon {
if (Env.isCheckpointThread()) {
return;
}
- submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) {
LOG.info("Analyze tasks those submitted in last time is not
finished, skip");
return;
@@ -72,9 +69,6 @@ public abstract class StatisticsCollector extends
MasterDaemon {
// No statistics need to be collected or updated
return;
}
- if (submittedThisRound-- < 0) {
- return;
- }
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks,
false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]