This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/high-priority-column by this
push:
new 6e27cc8bcb9 Use future to block auto analyze before job finish.
(#33083)
6e27cc8bcb9 is described below
commit 6e27cc8bcb91a6764fccaa07bb26b0f605d1057a
Author: Jibing-Li <[email protected]>
AuthorDate: Mon Apr 1 10:30:38 2024 +0800
Use future to block auto analyze before job finish. (#33083)
---
.../doris/statistics/AnalysisTaskExecutor.java | 7 ++++---
.../doris/statistics/StatisticsAutoCollector.java | 23 +++++++++++++++++++---
2 files changed, 24 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 3bdccaca047..d787794534a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger;
import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -52,7 +53,7 @@ public class AnalysisTaskExecutor {
simultaneouslyRunningTaskNum,
simultaneouslyRunningTaskNum, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
- new BlockedPolicy("Analysis Job Executor",
Integer.MAX_VALUE),
+ new BlockedPolicy("Analysis Job Executor Block Policy",
Integer.MAX_VALUE),
"Analysis Job Executor", true);
cancelExpiredTask();
} else {
@@ -88,9 +89,9 @@ public class AnalysisTaskExecutor {
}
}
- public void submitTask(BaseAnalysisTask task) {
+ public Future<?> submitTask(BaseAnalysisTask task) {
AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task);
- executors.submit(taskWrapper);
+ return executors.submit(taskWrapper);
}
public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 57f3f494573..bf0179f5603 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -45,6 +45,8 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -134,7 +136,15 @@ public class StatisticsAutoCollector extends MasterDaemon {
}
AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns,
priority);
LOG.debug("Auto analyze job : {}", analyzeJob.toString());
- executeSystemAnalysisJob(analyzeJob);
+ try {
+ executeSystemAnalysisJob(analyzeJob);
+ } catch (Exception e) {
+ StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
+ for (Pair<String, String> pair : columns) {
+ stringJoiner.add(pair.toString());
+ }
+ LOG.warn("Fail to auto analyze table {}, columns [{}]",
table.getName(), stringJoiner.toString());
+ }
}
protected void appendPartitionColumns(TableIf table, Set<Pair<String,
String>> columns) throws DdlException {
@@ -205,7 +215,7 @@ public class StatisticsAutoCollector extends MasterDaemon {
// Analysis job created by the system
@VisibleForTesting
protected void executeSystemAnalysisJob(AnalysisInfo jobInfo)
- throws DdlException {
+ throws DdlException, ExecutionException, InterruptedException {
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks,
false);
@@ -215,7 +225,14 @@ public class StatisticsAutoCollector extends MasterDaemon {
}
Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo,
analysisTasks.values());
Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo,
analysisTasks);
- analysisTasks.values().forEach(analysisTaskExecutor::submitTask);
+ Future<?>[] futures = new Future[analysisTasks.values().size()];
+ int i = 0;
+ for (BaseAnalysisTask task : analysisTasks.values()) {
+ futures[i++] = analysisTaskExecutor.submitTask(task);
+ }
+ for (Future future : futures) {
+ future.get();
+ }
}
protected AnalysisInfo getNeedAnalyzeColumns(AnalysisInfo jobInfo) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]