This is an automated email from the ASF dual-hosted git repository.
starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new afb6a57aa8 [enhancement](nereids) Improve stats preload performance
(#21970)
afb6a57aa8 is described below
commit afb6a57aa8741ecc0a61754100d17703d5c9894c
Author: AKIRA <[email protected]>
AuthorDate: Mon Jul 31 17:32:01 2023 +0800
[enhancement](nereids) Improve stats preload performance (#21970)
---
.../main/java/org/apache/doris/common/Config.java | 13 ++-
.../org/apache/doris/analysis/AnalyzeTblStmt.java | 7 +-
.../java/org/apache/doris/nereids/memo/Memo.java | 2 +-
.../apache/doris/statistics/AnalysisManager.java | 17 ++--
.../doris/statistics/AnalysisTaskExecutor.java | 40 +++-----
.../doris/statistics/AnalysisTaskScheduler.java | 108 ---------------------
.../apache/doris/statistics/BaseAnalysisTask.java | 8 +-
.../apache/doris/statistics/ColumnStatistic.java | 14 +--
.../apache/doris/statistics/OlapAnalysisTask.java | 31 +++---
.../doris/statistics/StatisticsAutoAnalyzer.java | 25 +++--
.../apache/doris/statistics/StatisticsCache.java | 66 ++++++++-----
.../doris/statistics/StatisticsRepository.java | 15 +--
.../doris/statistics/util/StatisticsUtil.java | 17 +++-
.../apache/doris/statistics/AnalysisJobTest.java | 10 +-
.../doris/statistics/AnalysisTaskExecutorTest.java | 28 ++----
.../apache/doris/statistics/HistogramTaskTest.java | 14 +--
regression-test/pipeline/p0/conf/fe.conf | 2 +-
17 files changed, 158 insertions(+), 259 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a47ae31cdd..1b4c61a866 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1680,7 +1680,7 @@ public class Config extends ConfigBase {
* Used to determined how many statistics collection SQL could run
simultaneously.
*/
@ConfField
- public static int statistics_simultaneously_running_task_num = 10;
+ public static int statistics_simultaneously_running_task_num = 5;
/**
* if table has too many replicas, Fe occur oom when schema change.
@@ -2031,13 +2031,13 @@ public class Config extends ConfigBase {
public static int hive_stats_partition_sample_size = 3000;
@ConfField
- public static boolean enable_full_auto_analyze = false;
+ public static boolean enable_full_auto_analyze = true;
@ConfField
public static String full_auto_analyze_start_time = "00:00:00";
@ConfField
- public static String full_auto_analyze_end_time = "23:59:59";
+ public static String full_auto_analyze_end_time = "02:00:00";
@ConfField
public static int statistics_sql_parallel_exec_instance_num = 1;
@@ -2056,4 +2056,11 @@ public class Config extends ConfigBase {
+ "and modifying table properties. "
+ "This config is recommended to be used only in the test
environment"})
public static int force_olap_table_replication_num = 0;
+
+ @ConfField
+ public static int full_auto_analyze_simultaneously_running_task_num = 1;
+
+ @ConfField
+ public static int cpu_resource_limit_per_analyze_task = 1;
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
index da08f45bee..527f802748 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
@@ -36,6 +36,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
@@ -192,10 +193,10 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
}
}
if (containsUnsupportedTytpe) {
- if
(!ConnectContext.get().getSessionVariable().enableAnalyzeComplexTypeColumn) {
+ if (ConnectContext.get() == null
+ ||
!ConnectContext.get().getSessionVariable().enableAnalyzeComplexTypeColumn) {
columnNames = columnNames.stream()
- .filter(c ->
!ColumnStatistic.UNSUPPORTED_TYPE.contains(
- table.getColumn(c).getType()))
+ .filter(c ->
!StatisticsUtil.isUnsupportedType(table.getColumn(c).getType()))
.collect(Collectors.toList());
} else {
throw new AnalysisException(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
index a313975684..18ea08d58e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java
@@ -115,7 +115,7 @@ public class Memo {
public void removePhysicalExpression() {
groupExpressions.entrySet().removeIf(entry ->
entry.getValue().getPlan() instanceof PhysicalPlan);
- Iterator<Entry<GroupId, Group>> iterator =
groups.entrySet().iterator();
+ Iterator<Map.Entry<GroupId, Group>> iterator =
groups.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<GroupId, Group> entry = iterator.next();
Group group = entry.getValue();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 66f0b94aa8..63ab923234 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -94,8 +94,6 @@ import java.util.stream.Collectors;
public class AnalysisManager extends Daemon implements Writable {
- public AnalysisTaskScheduler taskScheduler;
-
private static final Logger LOG =
LogManager.getLogger(AnalysisManager.class);
private ConcurrentMap<Long, Map<Long, BaseAnalysisTask>>
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
@@ -112,8 +110,7 @@ public class AnalysisManager extends Daemon implements
Writable {
public AnalysisManager() {
super(TimeUnit.SECONDS.toMillis(StatisticConstants.ANALYZE_MANAGER_INTERVAL_IN_SECS));
if (!Env.isCheckpointThread()) {
- this.taskScheduler = new AnalysisTaskScheduler();
- this.taskExecutor = new AnalysisTaskExecutor(taskScheduler);
+ this.taskExecutor = new
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num);
this.statisticsCache = new StatisticsCache();
taskExecutor.start();
}
@@ -192,7 +189,9 @@ public class AnalysisManager extends Daemon implements
Writable {
table.getName());
// columnNames null means to add all visitable columns.
AnalyzeTblStmt analyzeTblStmt = new
AnalyzeTblStmt(analyzeProperties, tableName,
- null, db.getId(), table);
+ table.getBaseSchema().stream().filter(c ->
!StatisticsUtil.isUnsupportedType(c.getType())).map(
+ Column::getName).collect(
+ Collectors.toList()), db.getId(), table);
try {
analyzeTblStmt.check();
} catch (AnalysisException analysisException) {
@@ -254,12 +253,13 @@ public class AnalysisManager extends Daemon implements
Writable {
return null;
}
- analysisTaskInfos.values().forEach(taskScheduler::schedule);
+ analysisTaskInfos.values().forEach(taskExecutor::submitTask);
return jobInfo;
}
// Analysis job created by the system
- public void createSystemAnalysisJob(AnalysisInfo info) throws DdlException
{
+ public void createSystemAnalysisJob(AnalysisInfo info,
AnalysisTaskExecutor analysisTaskExecutor)
+ throws DdlException {
AnalysisInfo jobInfo = buildAnalysisJobInfo(info);
if (jobInfo.colToPartitions.isEmpty()) {
// No statistics need to be collected or updated
@@ -273,8 +273,7 @@ public class AnalysisManager extends Daemon implements
Writable {
persistAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
}
-
- analysisTaskInfos.values().forEach(taskScheduler::schedule);
+ analysisTaskInfos.values().forEach(taskExecutor::submitTask);
}
private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index b5ec7aeb87..fb23050fff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -35,26 +35,23 @@ public class AnalysisTaskExecutor extends Thread {
private static final Logger LOG =
LogManager.getLogger(AnalysisTaskExecutor.class);
- private final ThreadPoolExecutor executors =
ThreadPoolManager.newDaemonThreadPool(
- Config.statistics_simultaneously_running_task_num,
- Config.statistics_simultaneously_running_task_num, 0,
- TimeUnit.DAYS, new LinkedBlockingQueue<>(),
- new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
- "Analysis Job Executor", true);
-
- private final AnalysisTaskScheduler taskScheduler;
+ private final ThreadPoolExecutor executors;
private final BlockingQueue<AnalysisTaskWrapper> taskQueue =
new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
- public AnalysisTaskExecutor(AnalysisTaskScheduler jobExecutor) {
- this.taskScheduler = jobExecutor;
+ public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
+ executors = ThreadPoolManager.newDaemonThreadPool(
+ simultaneouslyRunningTaskNum,
+ simultaneouslyRunningTaskNum, 0,
+ TimeUnit.DAYS, new LinkedBlockingQueue<>(),
+ new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
+ "Analysis Job Executor", true);
}
@Override
public void run() {
- fetchAndExecute();
cancelExpiredTask();
}
@@ -82,22 +79,7 @@ public class AnalysisTaskExecutor extends Thread {
}
}
- public void fetchAndExecute() {
- Thread t = new Thread(() -> {
- for (;;) {
- try {
- doFetchAndExecute();
- } catch (Throwable throwable) {
- LOG.warn(throwable);
- }
- }
- }, "Analysis Task Submitter");
- t.setDaemon(true);
- t.start();
- }
-
- private void doFetchAndExecute() {
- BaseAnalysisTask task = taskScheduler.getPendingTasks();
+ public void submitTask(BaseAnalysisTask task) {
AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task);
executors.submit(taskWrapper);
}
@@ -105,4 +87,8 @@ public class AnalysisTaskExecutor extends Thread {
public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
taskQueue.put(wrapper);
}
+
+ public boolean idle() {
+ return executors.getQueue().isEmpty();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java
deleted file mode 100644
index 5c9de2b58b..0000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java
+++ /dev/null
@@ -1,108 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Env;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-
-public class AnalysisTaskScheduler {
-
- private static final Logger LOG =
LogManager.getLogger(AnalysisTaskScheduler.class);
-
- private final PriorityQueue<BaseAnalysisTask> systemJobQueue =
- new
PriorityQueue<>(Comparator.comparingLong(BaseAnalysisTask::getLastExecTime));
-
- private final Queue<BaseAnalysisTask> manualJobQueue = new LinkedList<>();
-
- private final Set<BaseAnalysisTask> systemJobSet = new HashSet<>();
-
- private final Set<BaseAnalysisTask> manualJobSet = new HashSet<>();
-
- public synchronized void schedule(BaseAnalysisTask analysisTask) {
- try {
-
- switch (analysisTask.info.jobType) {
- case MANUAL:
- addToManualJobQueue(analysisTask);
- break;
- case SYSTEM:
- addToSystemQueue(analysisTask);
- break;
- default:
- throw new IllegalArgumentException("Unknown job type: " +
analysisTask.info.jobType);
- }
- } catch (Throwable t) {
- Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(
- analysisTask.info, AnalysisState.FAILED, t.getMessage(),
System.currentTimeMillis());
- }
- }
-
- // Make sure invoker of this method is synchronized on object.
-
- private void addToSystemQueue(BaseAnalysisTask analysisJobInfo) {
- if (systemJobSet.contains(analysisJobInfo)) {
- return;
- }
- systemJobSet.add(analysisJobInfo);
- systemJobQueue.add(analysisJobInfo);
- notify();
- }
-
- // Make sure invoker of this method is synchronized on object.
- private void addToManualJobQueue(BaseAnalysisTask analysisJobInfo) {
- if (manualJobSet.contains(analysisJobInfo)) {
- return;
- }
- manualJobSet.add(analysisJobInfo);
- manualJobQueue.add(analysisJobInfo);
- notify();
- }
-
- public synchronized BaseAnalysisTask getPendingTasks() {
- while (true) {
- if (!manualJobQueue.isEmpty()) {
- return pollAndRemove(manualJobQueue, manualJobSet);
- }
- if (!systemJobQueue.isEmpty()) {
- return pollAndRemove(systemJobQueue, systemJobSet);
- }
- try {
- wait();
- } catch (Exception e) {
- LOG.warn("Thread get interrupted when waiting for pending
jobs", e);
- return null;
- }
- }
- }
-
- // Poll from queue, remove from set. Make sure invoker of this method is
synchronized on object.
- private BaseAnalysisTask pollAndRemove(Queue<BaseAnalysisTask> q,
Set<BaseAnalysisTask> s) {
- BaseAnalysisTask t = q.poll();
- s.remove(t);
- return t;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index e146a2e8e3..fc264e0661 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -26,8 +26,10 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
+import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -152,10 +154,8 @@ public abstract class BaseAnalysisTask {
if (col == null) {
throw new RuntimeException(String.format("Column with name %s
not exists", info.tblName));
}
- if (isUnsupportedType(col.getType().getPrimitiveType())) {
- throw new RuntimeException(String.format("Column with type %s
is not supported",
- col.getType().toString()));
- }
+
Preconditions.checkArgument(!StatisticsUtil.isUnsupportedType(col.getType()),
+ String.format("Column with type %s is not supported",
col.getType().toString()));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
index 6887108a68..7e23c83e07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
@@ -19,9 +19,7 @@ package org.apache.doris.statistics;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Type;
-import org.apache.doris.common.DdlException;
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
@@ -206,9 +204,6 @@ public class ColumnStatistic {
columnStatisticBuilder.setMaxValue(Double.MAX_VALUE);
}
columnStatisticBuilder.setSelectivity(1.0);
- Histogram histogram =
Env.getCurrentEnv().getStatisticsCache().getHistogram(tblId, idxId, colName)
- .orElse(null);
- columnStatisticBuilder.setHistogram(histogram);
columnStatisticBuilder.setUpdatedTime(resultRow.getColumnValue("update_time"));
return columnStatisticBuilder.build();
} catch (Exception e) {
@@ -428,12 +423,7 @@ public class ColumnStatistic {
return isUnKnown;
}
- public void loadPartitionStats(long tableId, long idxId, String colName)
throws DdlException {
- List<ResultRow> resultRows =
StatisticsRepository.loadPartStats(tableId, idxId, colName);
- for (ResultRow resultRow : resultRows) {
- String partId = resultRow.getColumnValue("part_id");
- ColumnStatistic columnStatistic =
ColumnStatistic.fromResultRow(resultRow);
- partitionIdToColStats.put(Long.parseLong(partId), columnStatistic);
- }
+ public void putPartStats(long partId, ColumnStatistic columnStatistic) {
+ this.partitionIdToColStats.put(partId, columnStatistic);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index a980385bde..3f3a04d620 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -110,21 +110,24 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
@VisibleForTesting
public void execSQL(String sql) throws Exception {
- if (killed) {
- return;
- }
- long startTime = System.currentTimeMillis();
- try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext())
{
- r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
- stmtExecutor = new StmtExecutor(r.connectContext, sql);
- r.connectContext.setExecutor(stmtExecutor);
- stmtExecutor.execute();
- QueryState queryState = r.connectContext.getState();
- if (queryState.getStateType().equals(MysqlStateType.ERR)) {
- throw new RuntimeException(String.format("Failed to analyze
%s.%s.%s, error: %s sql: %s",
- info.catalogName, info.dbName, info.colName, sql,
queryState.getErrorMessage()));
+ synchronized (OlapAnalysisTask.class) {
+ if (killed) {
+ return;
+ }
+ long startTime = System.currentTimeMillis();
+ try (AutoCloseConnectContext r =
StatisticsUtil.buildConnectContext()) {
+
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
+ stmtExecutor = new StmtExecutor(r.connectContext, sql);
+ r.connectContext.setExecutor(stmtExecutor);
+ stmtExecutor.execute();
+ QueryState queryState = r.connectContext.getState();
+ if (queryState.getStateType().equals(MysqlStateType.ERR)) {
+ throw new RuntimeException(String.format("Failed to
analyze %s.%s.%s, error: %s sql: %s",
+ info.catalogName, info.dbName, info.colName, sql,
queryState.getErrorMessage()));
+ }
+ LOG.info("Analyze SQL: " + sql + " cost time: " +
(System.currentTimeMillis() - startTime) + "ms");
}
- LOG.info("Analyze SQL: " + sql + " cost time: " +
(System.currentTimeMillis() - startTime) + "ms");
}
+
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
index 181af16882..02fe96ec71 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
@@ -50,8 +50,11 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(StatisticsAutoAnalyzer.class);
+ private AnalysisTaskExecutor analysisTaskExecutor;
+
public StatisticsAutoAnalyzer() {
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
+ analysisTaskExecutor = new
AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num);
}
@Override
@@ -66,12 +69,16 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
return;
}
- // if (!Config.enable_full_auto_analyze) {
- // analyzePeriodically();
- // analyzeAutomatically();
- // } else {
- // analyzeAll();
- // }
+ if (!analysisTaskExecutor.idle()) {
+ return;
+ }
+
+ if (!Config.enable_full_auto_analyze) {
+ analyzePeriodically();
+ analyzeAutomatically();
+ } else {
+ analyzeAll();
+ }
}
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -92,7 +99,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
if (analysisInfo == null) {
continue;
}
- analysisManager.createSystemAnalysisJob(analysisInfo);
+ analysisManager.createSystemAnalysisJob(analysisInfo,
analysisTaskExecutor);
}
}
} catch (Throwable t) {
@@ -109,7 +116,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs();
for (AnalysisInfo jobInfo : jobInfos) {
jobInfo = new
AnalysisInfoBuilder(jobInfo).setJobType(JobType.SYSTEM).build();
- analysisManager.createSystemAnalysisJob(jobInfo);
+ analysisManager.createSystemAnalysisJob(jobInfo,
analysisTaskExecutor);
}
} catch (DdlException e) {
LOG.warn("Failed to periodically analyze the statistics." + e);
@@ -124,7 +131,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
try {
checkedJobInfo = getReAnalyzeRequiredPart(jobInfo);
if (checkedJobInfo != null) {
- analysisManager.createSystemAnalysisJob(checkedJobInfo);
+ analysisManager.createSystemAnalysisJob(checkedJobInfo,
analysisTaskExecutor);
}
} catch (Throwable t) {
LOG.warn("Failed to create analyze job: {}", checkedJobInfo,
t);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
index 1149ecdd5a..d5a3b972f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
@@ -38,10 +38,13 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -213,6 +216,7 @@ public class StatisticsCache {
if (CollectionUtils.isEmpty(recentStatsUpdatedCols)) {
return;
}
+ Map<StatisticsCacheKey, ColumnStatistic> keyToColStats = new
HashMap<>();
for (ResultRow r : recentStatsUpdatedCols) {
try {
long tblId = Long.parseLong(r.getColumnValue("tbl_id"));
@@ -221,12 +225,17 @@ public class StatisticsCache {
final StatisticsCacheKey k =
new StatisticsCacheKey(tblId, idxId, colId);
final ColumnStatistic c = ColumnStatistic.fromResultRow(r);
- c.loadPartitionStats(tblId, idxId, colId);
+ keyToColStats.put(k, c);
putCache(k, c);
} catch (Throwable t) {
LOG.warn("Error when preheating stats cache", t);
}
}
+ try {
+ loadPartStats(keyToColStats);
+ } catch (Exception e) {
+ LOG.warn("Fucka", e);
+ }
}
public void syncLoadColStats(long tableId, long idxId, String colName) {
@@ -261,32 +270,43 @@ public class StatisticsCache {
}
public void putCache(StatisticsCacheKey k, ColumnStatistic c) {
- CompletableFuture<Optional<ColumnStatistic>> f = new
CompletableFuture<Optional<ColumnStatistic>>() {
-
- @Override
- public Optional<ColumnStatistic> get() throws
InterruptedException, ExecutionException {
- return Optional.of(c);
- }
-
- @Override
- public boolean isDone() {
- return true;
- }
+ CompletableFuture<Optional<ColumnStatistic>> f = new
CompletableFuture<Optional<ColumnStatistic>>();
+ f.obtrudeValue(Optional.of(c));
+ columnStatisticsCache.put(k, f);
+ }
- @Override
- public boolean complete(Optional<ColumnStatistic> value) {
- return true;
+ private void loadPartStats(Map<StatisticsCacheKey, ColumnStatistic>
keyToColStats) {
+ final int batchSize = Config.expr_children_limit;
+ Set<StatisticsCacheKey> keySet = new HashSet<>();
+ for (StatisticsCacheKey statisticsCacheKey : keyToColStats.keySet()) {
+ if (keySet.size() < batchSize - 1) {
+ keySet.add(statisticsCacheKey);
+ } else {
+ List<ResultRow> partStats =
StatisticsRepository.loadPartStats(keySet);
+ addPartStatsToColStats(keyToColStats, partStats);
+ keySet = new HashSet<>();
}
+ }
+ if (!keySet.isEmpty()) {
+ List<ResultRow> partStats =
StatisticsRepository.loadPartStats(keySet);
+ addPartStatsToColStats(keyToColStats, partStats);
+ }
+ }
- @Override
- public Optional<ColumnStatistic> join() {
- return Optional.of(c);
+ private void addPartStatsToColStats(Map<StatisticsCacheKey,
ColumnStatistic> keyToColStats,
+ List<ResultRow> partsStats) {
+ for (ResultRow r : partsStats) {
+ try {
+ long tblId = Long.parseLong(r.getColumnValue("tbl_id"));
+ long idxId = Long.parseLong(r.getColumnValue("idx_id"));
+ long partId = Long.parseLong(r.getColumnValue("part_id"));
+ String colId = r.getColumnValue("col_id");
+ ColumnStatistic partStats = ColumnStatistic.fromResultRow(r);
+ keyToColStats.get(new StatisticsCacheKey(tblId, idxId,
colId)).putPartStats(partId, partStats);
+ } catch (Throwable t) {
+ LOG.warn("Failed to deserialized part stats", t);
}
- };
- if (c.isUnKnown) {
- return;
}
- columnStatisticsCache.put(k, f);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
index d20bb358c1..7a043e7708 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
@@ -38,6 +38,7 @@ import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -126,7 +127,7 @@ public class StatisticsRepository {
private static final String QUERY_PARTITION_STATISTICS = "SELECT * FROM "
+ FeConstants.INTERNAL_DB_NAME
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
- + " tbl_id=${tblId} AND idx_id=${idxId} AND col_id='${colId}' "
+ + " ${inPredicate}"
+ " AND part_id IS NOT NULL";
public static ColumnStatistic queryColumnStatisticsByName(long tableId,
String colName) {
@@ -440,12 +441,14 @@ public class StatisticsRepository {
.replace(QUERY_COLUMN_STATISTICS));
}
- public static List<ResultRow> loadPartStats(long tableId, long idxId,
String colName) {
+ public static List<ResultRow> loadPartStats(Collection<StatisticsCacheKey>
keys) {
+ String inPredicate = "CONCAT(tbl_id, '-', idx_id, '-', col_id) in
(%s)";
+ StringJoiner sj = new StringJoiner(",");
+ for (StatisticsCacheKey statisticsCacheKey : keys) {
+ sj.add("'" + statisticsCacheKey.toString() + "'");
+ }
Map<String, String> params = new HashMap<>();
- params.put("tblId", String.valueOf(tableId));
- params.put("idxId", String.valueOf(idxId));
- params.put("colId", colName);
-
+ params.put("inPredicate", String.format(inPredicate, sj.toString()));
return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
.replace(QUERY_PARTITION_STATISTICS));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 0451097967..e82a8b955c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -29,17 +29,21 @@ import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.VariantType;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@@ -168,6 +172,7 @@ public class StatisticsUtil {
SessionVariable sessionVariable = connectContext.getSessionVariable();
sessionVariable.internalSession = true;
sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes);
+ sessionVariable.cpuResourceLimit =
Config.cpu_resource_limit_per_analyze_task;
sessionVariable.setEnableInsertStrict(true);
sessionVariable.parallelExecInstanceNum =
Config.statistics_sql_parallel_exec_instance_num;
sessionVariable.parallelPipelineTaskNum =
Config.statistics_sql_parallel_exec_instance_num;
@@ -633,7 +638,7 @@ public class StatisticsUtil {
}
private static void processDataFile(DataFile dataFile, PartitionSpec
partitionSpec,
- String colName, ColumnStatisticBuilder
columnStatisticBuilder) {
+ String colName, ColumnStatisticBuilder columnStatisticBuilder) {
int colId = -1;
for (Types.NestedField column : partitionSpec.schema().columns()) {
if (column.name().equals(colName)) {
@@ -651,4 +656,14 @@ public class StatisticsUtil {
columnStatisticBuilder.setNumNulls(columnStatisticBuilder.getNumNulls()
+ dataFile.nullValueCounts().get(colId));
}
+
+ public static boolean isUnsupportedType(Type type) {
+ if (ColumnStatistic.UNSUPPORTED_TYPE.contains(type)) {
+ return true;
+ }
+ return type instanceof ArrayType
+ || type instanceof StructType
+ || type instanceof MapType
+ || type instanceof VariantType;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index ca145f07f2..fa0c0fc8ef 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -62,13 +62,7 @@ public class AnalysisJobTest extends TestWithFeService {
}
@Test
- public void testCreateAnalysisJob(@Mocked AnalysisTaskScheduler scheduler)
throws Exception {
- new Expectations() {
- {
- scheduler.schedule((BaseAnalysisTask) any);
- times = 3;
- }
- };
+ public void testCreateAnalysisJob() throws Exception {
new MockUp<StatisticsUtil>() {
@@ -101,7 +95,7 @@ public class AnalysisJobTest extends TestWithFeService {
}
@Test
- public void testJobExecution(@Mocked AnalysisTaskScheduler scheduler,
@Mocked StmtExecutor stmtExecutor)
+ public void testJobExecution(@Mocked StmtExecutor stmtExecutor)
throws Exception {
new MockUp<StatisticsUtil>() {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index 8da819f09c..453eb78628 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -28,9 +28,9 @@ import
org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.Maps;
+import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
-import mockit.Mocked;
import org.junit.jupiter.api.Test;
import java.util.Collections;
@@ -41,8 +41,6 @@ import java.util.concurrent.BlockingQueue;
public class AnalysisTaskExecutorTest extends TestWithFeService {
- @Mocked
- AnalysisTaskScheduler analysisTaskScheduler;
@Override
protected void runBeforeAll() throws Exception {
@@ -71,13 +69,7 @@ public class AnalysisTaskExecutorTest extends
TestWithFeService {
.build();
OlapAnalysisTask analysisJob = new OlapAnalysisTask(analysisJobInfo);
- new MockUp<AnalysisTaskScheduler>() {
- public synchronized BaseAnalysisTask getPendingTasks() {
- return analysisJob;
- }
- };
-
- AnalysisTaskExecutor analysisTaskExecutor = new
AnalysisTaskExecutor(analysisTaskScheduler);
+ AnalysisTaskExecutor analysisTaskExecutor = new
AnalysisTaskExecutor(1);
BlockingQueue<AnalysisTaskWrapper> b =
Deencapsulation.getField(analysisTaskExecutor, "taskQueue");
AnalysisTaskWrapper analysisTaskWrapper = new
AnalysisTaskWrapper(analysisTaskExecutor, analysisJob);
Deencapsulation.setField(analysisTaskWrapper, "startTime", 5);
@@ -102,7 +94,7 @@ public class AnalysisTaskExecutorTest extends
TestWithFeService {
}
};
- AnalysisTaskExecutor analysisTaskExecutor = new
AnalysisTaskExecutor(analysisTaskScheduler);
+ AnalysisTaskExecutor analysisTaskExecutor = new
AnalysisTaskExecutor(1);
HashMap<String, Set<String>> colToPartitions = Maps.newHashMap();
colToPartitions.put("col1", Collections.singleton("t1"));
AnalysisInfo analysisInfo = new
AnalysisInfoBuilder().setJobId(0).setTaskId(0)
@@ -114,16 +106,16 @@ public class AnalysisTaskExecutorTest extends
TestWithFeService {
.setColToPartitions(colToPartitions)
.build();
OlapAnalysisTask task = new OlapAnalysisTask(analysisInfo);
- new MockUp<AnalysisTaskScheduler>() {
- @Mock
- public synchronized BaseAnalysisTask getPendingTasks() {
- return task;
- }
- };
+
new MockUp<AnalysisManager>() {
@Mock
public void updateTaskStatus(AnalysisInfo info, AnalysisState
jobState, String message, long time) {}
};
- Deencapsulation.invoke(analysisTaskExecutor, "doFetchAndExecute");
+ new Expectations() {
+ {
+ task.doExecute();
+ }
+ };
+ Deencapsulation.invoke(analysisTaskExecutor, "submitTask", task);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java
index d3d5245a81..0660c994a1 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java
@@ -30,7 +30,6 @@ import org.apache.doris.utframe.TestWithFeService;
import mockit.Mock;
import mockit.MockUp;
-import mockit.Mocked;
import org.junit.FixMethodOrder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -43,9 +42,6 @@ import java.util.concurrent.ConcurrentMap;
@FixMethodOrder(value = MethodSorters.NAME_ASCENDING)
public class HistogramTaskTest extends TestWithFeService {
- @Mocked
- AnalysisTaskScheduler analysisTaskScheduler;
-
@Override
protected void runBeforeAll() throws Exception {
createDatabase("histogram_task_test");
@@ -96,7 +92,7 @@ public class HistogramTaskTest extends TestWithFeService {
@Test
public void test2TaskExecution() throws Exception {
- AnalysisTaskExecutor analysisTaskExecutor = new
AnalysisTaskExecutor(analysisTaskScheduler);
+ AnalysisTaskExecutor analysisTaskExecutor = new
AnalysisTaskExecutor(1);
AnalysisInfo analysisInfo = new AnalysisInfoBuilder()
.setJobId(0).setTaskId(0).setCatalogName("internal")
.setDbName(SystemInfoService.DEFAULT_CLUSTER + ":" +
"histogram_task_test").setTblName("t1")
@@ -107,17 +103,11 @@ public class HistogramTaskTest extends TestWithFeService {
.build();
HistogramTask task = new HistogramTask(analysisInfo);
- new MockUp<AnalysisTaskScheduler>() {
- @Mock
- public synchronized BaseAnalysisTask getPendingTasks() {
- return task;
- }
- };
new MockUp<AnalysisManager>() {
@Mock
public void updateTaskStatus(AnalysisInfo info, AnalysisState
jobState, String message, long time) {}
};
- Deencapsulation.invoke(analysisTaskExecutor, "doFetchAndExecute");
+ Deencapsulation.invoke(analysisTaskExecutor, "submitTask", task);
}
}
diff --git a/regression-test/pipeline/p0/conf/fe.conf
b/regression-test/pipeline/p0/conf/fe.conf
index 592bab5556..772ad23a9d 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -83,4 +83,4 @@ enable_mtmv = true
dynamic_partition_check_interval_seconds=3
-enable_full_auto_analyze=false
+enable_full_auto_analyze=true
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]