This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/high-priority-column by this
push:
new 88ed1980d86 High priority queue and map. (#31509)
88ed1980d86 is described below
commit 88ed1980d86731b799963f63eeef35390663f159
Author: Jibing-Li <[email protected]>
AuthorDate: Wed Feb 28 12:01:55 2024 +0800
High priority queue and map. (#31509)
---
.../main/java/org/apache/doris/catalog/Env.java | 20 +-
.../apache/doris/statistics/AnalysisManager.java | 33 +-
.../doris/statistics/StatisticsAutoCollector.java | 202 ++------
.../doris/statistics/StatisticsCollector.java | 4 -
.../doris/statistics/StatisticsJobAppender.java | 135 +++++
.../statistics/StatisticsAutoCollectorTest.java | 546 ---------------------
6 files changed, 213 insertions(+), 727 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index d5bcd324d9b..66812579d2e 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -243,6 +243,7 @@ import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.StatisticsAutoCollector;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
+import org.apache.doris.statistics.StatisticsJobAppender;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
@@ -516,6 +517,8 @@ public class Env {
private StatisticsAutoCollector statisticsAutoCollector;
+ private StatisticsJobAppender statisticsJobAppender;
+
private HiveTransactionMgr hiveTransactionMgr;
private TopicPublisherThread topicPublisherThread;
@@ -743,6 +746,7 @@ public class Env {
this.analysisManager = new AnalysisManager();
this.statisticsCleaner = new StatisticsCleaner();
this.statisticsAutoCollector = new StatisticsAutoCollector();
+ this.statisticsJobAppender = new StatisticsJobAppender();
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
@@ -1019,13 +1023,6 @@ public class Env {
// If not using bdb, we need to notify the FE type transfer
manually.
notifyNewFETypeTransfer(FrontendNodeType.MASTER);
}
- if (statisticsCleaner != null) {
- statisticsCleaner.start();
- }
- if (statisticsAutoCollector != null) {
- statisticsAutoCollector.start();
- }
-
queryCancelWorker.start();
TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
@@ -1663,6 +1660,11 @@ public class Env {
binlogGcer.start();
columnIdFlusher.start();
insertOverwriteManager.start();
+
+ // auto analyze related threads.
+ statisticsCleaner.start();
+ statisticsAutoCollector.start();
+ statisticsJobAppender.start();
}
// start threads that should running on all FE
@@ -5994,6 +5996,10 @@ public class Env {
return statisticsAutoCollector;
}
+ public StatisticsJobAppender getStatisticsJobAppender() {
+ return statisticsJobAppender;
+ }
+
public void alterMTMVRefreshInfo(AlterMTMVRefreshInfo info) {
AlterMTMV alter = new AlterMTMV(info.getMvName(),
info.getRefreshInfo(), MTMVAlterOpType.ALTER_REFRESH_INFO);
this.alter.processAlterMTMV(alter, false);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index da80a48081b..31928491e3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -85,6 +85,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -108,15 +109,12 @@ public class AnalysisManager implements Writable {
private static final Logger LOG =
LogManager.getLogger(AnalysisManager.class);
- /**
- * Mem only.
- */
- public final Queue<HighPriorityColumn> predicateColumns = new
ArrayBlockingQueue<>(100);
-
- /**
- * Mem only.
- */
- public final Queue<HighPriorityColumn> queryColumns = new
ArrayBlockingQueue<>(100);
+ private static final int COLUMN_QUEUE_SIZE = 1000;
+ public final Queue<HighPriorityColumn> highPriorityColumns = new
ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
+ public final Queue<HighPriorityColumn> midPriorityColumns = new
ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
+ public final Map<TableIf, Set<String>> highPriorityJobs = new
LinkedHashMap<>();
+ public final Map<TableIf, Set<String>> midPriorityJobs = new
LinkedHashMap<>();
+ public final Map<TableIf, Set<String>> lowPriorityJobs = new
LinkedHashMap<>();
// Tracking running manually submitted async tasks, keep in mem only
protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>>
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
@@ -167,11 +165,6 @@ public class AnalysisManager implements Writable {
public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy)
throws DdlException, AnalysisException {
DatabaseIf<TableIf> db = analyzeDBStmt.getDb();
- // Using auto analyzer if user specifies.
- if
(analyzeDBStmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer"))
{
- Env.getCurrentEnv().getStatisticsAutoCollector().analyzeDb(db);
- return;
- }
List<AnalysisInfo> analysisInfos = buildAnalysisInfosForDB(db,
analyzeDBStmt.getAnalyzeProperties());
if (!analyzeDBStmt.isSync()) {
sendJobId(analysisInfos, proxy);
@@ -219,6 +212,11 @@ public class AnalysisManager implements Writable {
// Each analyze stmt corresponding to an analysis job.
public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws
DdlException {
+ // Using auto analyzer if user specifies.
+ if
(stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) {
+
Env.getCurrentEnv().getStatisticsAutoCollector().processOneJob(stmt.getTable(),
stmt.getColumnNames());
+ return;
+ }
AnalysisInfo jobInfo = buildAndAssignJob(stmt);
if (jobInfo == null) {
return;
@@ -1103,11 +1101,11 @@ public class AnalysisManager implements Writable {
public void updateColumnUsedInPredicate(Set<Slot> slotReferences) {
- updateColumn(slotReferences, predicateColumns);
+ updateColumn(slotReferences, highPriorityColumns);
}
public void updateQueriedColumn(Collection<Slot> slotReferences) {
- updateColumn(slotReferences, queryColumns);
+ updateColumn(slotReferences, midPriorityColumns);
}
protected void updateColumn(Collection<Slot> slotReferences,
Queue<HighPriorityColumn> queue) {
@@ -1117,7 +1115,8 @@ public class AnalysisManager implements Writable {
}
Optional<Column> optionalColumn = ((SlotReference) s).getColumn();
Optional<TableIf> optionalTable = ((SlotReference) s).getTable();
- if (optionalColumn.isPresent() && optionalTable.isPresent()) {
+ if (optionalColumn.isPresent() && optionalTable.isPresent()
+ &&
!StatisticsUtil.isUnsupportedType(optionalColumn.get().getType())) {
TableIf table = optionalTable.get();
DatabaseIf database = table.getDatabase();
if (database != null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index dbb7046467a..19b9c69db08 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -17,31 +17,24 @@
package org.apache.doris.statistics;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.StatisticsUtil;
-import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.LocalTime;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -59,8 +52,23 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
@Override
protected void collect() {
- if (canCollect()) {
- analyzeAll();
+ while (canCollect()) {
+ Map.Entry<TableIf, Set<String>> job = getJob();
+ if (job == null) {
+ // No more job to process, break and sleep.
+ break;
+ }
+ try {
+ TableIf table = job.getKey();
+ Set<String> columns = job.getValue()
+ .stream()
+ .filter(c -> needAnalyzeColumn(table, c))
+ .collect(Collectors.toSet());
+ processOneJob(table, columns);
+ } catch (Exception e) {
+ LOG.warn("Failed to analyze table {} with columns [{}]",
+ job.getKey().getName(),
job.getValue().stream().collect(Collectors.joining(",")), e);
+ }
}
}
@@ -69,133 +77,56 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
&&
StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
}
- protected void analyzeAll() {
- List<CatalogIf> catalogs = getCatalogsInOrder();
- for (CatalogIf ctl : catalogs) {
- if (!canCollect()) {
- analysisTaskExecutor.clear();
- break;
- }
- if (!ctl.enableAutoAnalyze()) {
- continue;
- }
- List<DatabaseIf> dbs = getDatabasesInOrder(ctl);
- for (DatabaseIf<TableIf> databaseIf : dbs) {
- if (!canCollect()) {
- analysisTaskExecutor.clear();
- break;
- }
- if
(StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) {
- continue;
- }
- try {
- analyzeDb(databaseIf);
- } catch (Throwable t) {
- LOG.warn("Failed to analyze database {}.{}",
ctl.getName(), databaseIf.getFullName(), t);
- continue;
- }
- }
+ protected Map.Entry<TableIf, Set<String>> getJob() {
+ AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
+ Optional<Map.Entry<TableIf, Set<String>>> job =
fetchJobFromMap(manager.highPriorityJobs);
+ if (job.isPresent()) {
+ return job.get();
}
+ job = fetchJobFromMap(manager.midPriorityJobs);
+ if (job.isPresent()) {
+ return job.get();
+ }
+ job = fetchJobFromMap(manager.lowPriorityJobs);
+ return job.isPresent() ? job.get() : null;
}
- public List<CatalogIf> getCatalogsInOrder() {
- return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream()
- .sorted((c1, c2) -> (int) (c1.getId() -
c2.getId())).collect(Collectors.toList());
- }
-
- public List<DatabaseIf<? extends TableIf>>
getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
- return catalog.getAllDbs().stream()
- .sorted((d1, d2) -> (int) (d1.getId() -
d2.getId())).collect(Collectors.toList());
- }
-
- public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) {
- return db.getTables().stream()
- .sorted((t1, t2) -> (int) (t1.getId() -
t2.getId())).collect(Collectors.toList());
- }
-
- public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException {
- List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf);
- for (AnalysisInfo analysisInfo : analysisInfos) {
- try {
- if (!canCollect()) {
- analysisTaskExecutor.clear();
- break;
- }
- analysisInfo = getReAnalyzeRequiredPart(analysisInfo);
- if (analysisInfo == null) {
- continue;
- }
- createSystemAnalysisJob(analysisInfo);
- } catch (Throwable t) {
- analysisInfo.message = t.getMessage();
- LOG.warn("Failed to auto analyze table {}.{}, reason {}",
- databaseIf.getFullName(), analysisInfo.tblId,
analysisInfo.message, t);
- continue;
- }
+ protected Optional<Map.Entry<TableIf, Set<String>>>
fetchJobFromMap(Map<TableIf, Set<String>> jobMap) {
+ synchronized (jobMap) {
+ Optional<Map.Entry<TableIf, Set<String>>> first =
jobMap.entrySet().stream().findFirst();
+ first.ifPresent(entry -> jobMap.remove(entry.getKey()));
+ return first;
}
}
- protected List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends
TableIf> db) {
- List<AnalysisInfo> analysisInfos = new ArrayList<>();
- for (TableIf table : getTablesInOrder(db)) {
- try {
- if (skip(table)) {
- continue;
- }
- createAnalyzeJobForTbl(db, analysisInfos, table);
- } catch (Throwable t) {
- LOG.warn("Failed to analyze table {}.{}.{}",
- db.getCatalog().getName(), db.getFullName(),
table.getName(), t);
- continue;
- }
+ protected void processOneJob(TableIf table, Set<String> columns) throws
DdlException {
+ Set<String> collect = columns.stream().filter(c ->
needAnalyzeColumn(table, c)).collect(Collectors.toSet());
+ if (collect.isEmpty()) {
+ return;
}
- return analysisInfos;
+ AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns);
+ createSystemAnalysisJob(analyzeJob);
}
- // return true if skip auto analyze this time.
- protected boolean skip(TableIf table) {
- if (!(table instanceof OlapTable || table instanceof
HMSExternalTable)) {
- return true;
- }
- // For now, only support Hive HMS table auto collection.
- if (table instanceof HMSExternalTable
- && !((HMSExternalTable)
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
- return true;
- }
- if (table.getDataSize(true) <
StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) {
- return false;
- }
- TableStatsMeta tableStats =
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
- // means it's never got analyzed or new partition loaded data.
- if (tableStats == null || tableStats.newPartitionLoaded.get()) {
- return false;
- }
- if (tableStats.userInjected) {
- return true;
- }
- return System.currentTimeMillis()
- - tableStats.updatedTime <
StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis();
+ protected boolean needAnalyzeColumn(TableIf table, String column) {
+ //TODO: Calculate column health value.
+ return true;
}
- protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
- List<AnalysisInfo> analysisInfos, TableIf table) {
+ protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set<String>
columns) {
AnalysisMethod analysisMethod = table.getDataSize(true) >=
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
- AnalysisInfo jobInfo = new AnalysisInfoBuilder()
+ return new AnalysisInfoBuilder()
.setJobId(Env.getCurrentEnv().getNextId())
- .setCatalogId(db.getCatalog().getId())
- .setDBId(db.getId())
+ .setCatalogId(table.getDatabase().getCatalog().getId())
+ .setDBId(table.getDatabase().getId())
.setTblId(table.getId())
- .setColName(
- table.getSchemaAllIndexes(false).stream()
- .filter(c ->
!StatisticsUtil.isUnsupportedType(c.getType()))
-
.map(Column::getName).collect(Collectors.joining(","))
- )
+ .setColName(columns.stream().collect(Collectors.joining(",")))
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
.setAnalysisMethod(analysisMethod)
.setSampleRows(analysisMethod.equals(AnalysisMethod.SAMPLE)
- ? StatisticsUtil.getHugeTableSampleRows() : -1)
+ ? StatisticsUtil.getHugeTableSampleRows() : -1)
.setScheduleType(ScheduleType.AUTOMATIC)
.setState(AnalysisState.PENDING)
.setTaskIds(new ArrayList<>())
@@ -204,40 +135,5 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
.setTblUpdateTime(table.getUpdateTime())
.setEmptyJob(table instanceof OlapTable && table.getRowCount()
== 0)
.build();
- analysisInfos.add(jobInfo);
- }
-
- @VisibleForTesting
- protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
- TableIf table = StatisticsUtil.findTable(jobInfo.catalogId,
jobInfo.dbId, jobInfo.tblId);
- // Skip tables that are too wide.
- if (table.getBaseSchema().size() >
StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) {
- return null;
- }
-
- AnalysisManager analysisManager =
Env.getServingEnv().getAnalysisManager();
- TableStatsMeta tblStats =
analysisManager.findTableStatsStatus(table.getId());
-
- Map<String, Set<String>> needRunPartitions = null;
- String colNames = jobInfo.colName;
- if (table.needReAnalyzeTable(tblStats)) {
- needRunPartitions = table.findReAnalyzeNeededPartitions();
- } else if (table instanceof OlapTable &&
tblStats.newPartitionLoaded.get()) {
- OlapTable olapTable = (OlapTable) table;
- needRunPartitions = new HashMap<>();
- Set<String> partitionColumnNames =
olapTable.getPartitionInfo().getPartitionColumns().stream()
- .map(Column::getName).collect(Collectors.toSet());
- colNames =
partitionColumnNames.stream().collect(Collectors.joining(","));
- Set<String> partitionNames = olapTable.getAllPartitions().stream()
- .map(Partition::getName).collect(Collectors.toSet());
- for (String column : partitionColumnNames) {
- needRunPartitions.put(column, partitionNames);
- }
- }
-
- if (needRunPartitions == null || needRunPartitions.isEmpty()) {
- return null;
- }
- return new
AnalysisInfoBuilder(jobInfo).setColName(colNames).setColToPartitions(needRunPartitions).build();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
index 0985b9b2b95..e26d3170178 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -61,10 +61,6 @@ public abstract class StatisticsCollector extends
MasterDaemon {
@VisibleForTesting
protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
throws DdlException {
- if (jobInfo.colToPartitions.isEmpty()) {
- // No statistics need to be collected or updated
- return;
- }
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks,
false);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
new file mode 100644
index 00000000000..73d0d1340ad
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StatisticsJobAppender extends MasterDaemon {
+
+ private static final Logger LOG =
LogManager.getLogger(StatisticsJobAppender.class);
+
+ public static final long INTERVAL = 1000;
+ public static final int JOB_MAP_SIZE = 1000;
+
+ private long currentDbId;
+ private long currentTableId;
+
+ public StatisticsJobAppender() {
+ super("Statistics Job Appender", INTERVAL);
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ if (!StatisticsUtil.enableAutoAnalyze()) {
+ return;
+ }
+ if (!Env.getCurrentEnv().isMaster()) {
+ return;
+ }
+ if (!StatisticsUtil.statsTblAvailable()) {
+ LOG.info("Stats table not available, skip");
+ return;
+ }
+ if (Env.isCheckpointThread()) {
+ return;
+ }
+ appendJobs();
+ }
+
+ protected void appendJobs() {
+ AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager();
+ appendColumnsToJobs(manager.highPriorityColumns,
manager.highPriorityJobs);
+ appendColumnsToJobs(manager.midPriorityColumns,
manager.midPriorityJobs);
+ appendToLowQueue(manager.lowPriorityJobs);
+ }
+
+ protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue,
Map<TableIf, Set<String>> jobsMap) {
+ int size = columnQueue.size();
+ for (int i = 0; i < size; i++) {
+ HighPriorityColumn column = columnQueue.poll();
+ TableIf table = StatisticsUtil.findTable(column.catalogId,
column.dbId, column.tblId);
+ synchronized (jobsMap) {
+ // If job map reach the upper limit, stop putting new jobs.
+ if (!jobsMap.containsKey(table) && jobsMap.size() >=
JOB_MAP_SIZE) {
+ break;
+ }
+ if (jobsMap.containsKey(table)) {
+ jobsMap.get(table).add(column.colName);
+ } else {
+ jobsMap.put(table, Collections.singleton(column.colName));
+ }
+ }
+ }
+ }
+
+ protected void appendToLowQueue(Map<TableIf, Set<String>> jobsMap) {
+
+ InternalCatalog catalog = Env.getCurrentInternalCatalog();
+ List<Long> sortedDbs =
catalog.getDbIds().stream().sorted().collect(Collectors.toList());
+ for (long dbId : sortedDbs) {
+ if (dbId < currentDbId
+ ||
StatisticConstants.SYSTEM_DBS.contains(catalog.getDbNullable(dbId).getFullName()))
{
+ continue;
+ }
+ currentDbId = dbId;
+ Optional<Database> db = catalog.getDb(dbId);
+ List<Table> tables = db.get().getTables().stream()
+ .sorted((t1, t2) -> (int) (t1.getId() -
t2.getId())).collect(Collectors.toList());
+ for (Table t : tables) {
+ if (t.getId() <= currentTableId) {
+ continue;
+ }
+ synchronized (jobsMap) {
+ // If job map reach the upper limit, stop putting new jobs.
+ if (!jobsMap.containsKey(t) && jobsMap.size() >=
JOB_MAP_SIZE) {
+ return;
+ }
+ Set<String> columns
+ = t.getColumns().stream().filter(c ->
!StatisticsUtil.isUnsupportedType(c.getType()))
+ .map(c -> c.getName()).collect(Collectors.toSet());
+ if (jobsMap.containsKey(t)) {
+ jobsMap.get(t).addAll(columns);
+ } else {
+ jobsMap.put(t, columns);
+ }
+ }
+ currentTableId = t.getId();
+ }
+ }
+ // All tables have been processed once, reset for the next loop.
+ currentDbId = 0;
+ currentTableId = 0;
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
deleted file mode 100644
index ddc19959a2d..00000000000
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
+++ /dev/null
@@ -1,546 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DatabaseIf;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.EnvFactory;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.Type;
-import org.apache.doris.catalog.View;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
-import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
-import org.apache.doris.statistics.AnalysisInfo.JobType;
-import org.apache.doris.statistics.util.StatisticsUtil;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.time.LocalTime;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class StatisticsAutoCollectorTest {
-
- @Test
- public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) {
- new MockUp<CatalogIf>() {
- @Mock
- public Collection<DatabaseIf> getAllDbs() {
- Database db1 = new Database(1, FeConstants.INTERNAL_DB_NAME);
- Database db2 = new Database(2, "anyDB");
- List<DatabaseIf> databaseIfs = new ArrayList<>();
- databaseIfs.add(db1);
- databaseIfs.add(db2);
- return databaseIfs;
- }
- };
- new MockUp<StatisticsAutoCollector>() {
- @Mock
- public List<AnalysisInfo>
constructAnalysisInfo(DatabaseIf<TableIf> db) {
- return Arrays.asList(analysisInfo, analysisInfo);
- }
-
- int count = 0;
-
- @Mock
- public AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo)
{
- return count++ == 0 ? null : jobInfo;
- }
-
- @Mock
- public void createSystemAnalysisJob(AnalysisInfo jobInfo)
- throws DdlException {
-
- }
- };
-
- StatisticsAutoCollector saa = new StatisticsAutoCollector();
- saa.runAfterCatalogReady();
- new Expectations() {
- {
- try {
- saa.createSystemAnalysisJob((AnalysisInfo) any);
- times = 1;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- @Test
- public void testConstructAnalysisInfo(
- @Injectable OlapTable o2, @Injectable View v) {
- new MockUp<Database>() {
- @Mock
- public List<Table> getTables() {
- List<Table> tableIfs = new ArrayList<>();
- tableIfs.add(o2);
- tableIfs.add(v);
- return tableIfs;
- }
-
- @Mock
- public String getFullName() {
- return "anyDb";
- }
- };
-
- new MockUp<OlapTable>() {
- @Mock
- public String getName() {
- return "anytable";
- }
-
- @Mock
- public List<Column> getSchemaAllIndexes(boolean full) {
- List<Column> columns = new ArrayList<>();
- columns.add(new Column("c1", PrimitiveType.INT));
- columns.add(new Column("c2", PrimitiveType.HLL));
- return columns;
- }
- };
- StatisticsAutoCollector saa = new StatisticsAutoCollector();
- List<AnalysisInfo> analysisInfoList = saa.constructAnalysisInfo(new
Database(1, "anydb"));
- Assertions.assertEquals(1, analysisInfoList.size());
- Assertions.assertEquals("c1",
analysisInfoList.get(0).colName.split(",")[0]);
- }
-
- @Test
- public void testGetReAnalyzeRequiredPart0() {
-
- TableIf tableIf = new OlapTable();
-
- new MockUp<OlapTable>() {
- @Mock
- protected Map<String, Set<String>> findReAnalyzeNeededPartitions()
{
- Set<String> partitionNames = new HashSet<>();
- partitionNames.add("p1");
- partitionNames.add("p2");
- Map<String, Set<String>> map = new HashMap<>();
- map.put("col1", partitionNames);
- return map;
- }
-
- @Mock
- public long getRowCount() {
- return 100;
- }
-
- @Mock
- public List<Column> getBaseSchema() {
- return Lists.newArrayList(new Column("col1", Type.INT), new
Column("col2", Type.INT));
- }
- };
-
- new MockUp<StatisticsUtil>() {
- @Mock
- public TableIf findTable(long catalogName, long dbName, long
tblName) {
- return tableIf;
- }
- };
- AnalysisInfo analysisInfo = new
AnalysisInfoBuilder().setAnalysisMethod(AnalysisMethod.FULL)
- .setColToPartitions(new HashMap<>()).setAnalysisType(
-
AnalysisType.FUNDAMENTALS).setColName("col1").setJobType(JobType.SYSTEM).build();
- new MockUp<AnalysisManager>() {
-
- int count = 0;
-
- TableStatsMeta[] tableStatsArr =
- new TableStatsMeta[] {new TableStatsMeta(0, analysisInfo,
tableIf),
- new TableStatsMeta(0, analysisInfo, tableIf),
null};
-
- {
- tableStatsArr[0].updatedRows.addAndGet(100);
- tableStatsArr[1].updatedRows.addAndGet(0);
- }
-
- @Mock
- public TableStatsMeta findTableStatsStatus(long tblId) {
- return tableStatsArr[count++];
- }
- };
-
- new MockUp<StatisticsAutoCollector>() {
- @Mock
- public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo,
TableIf table,
- Set<String> needRunPartitions) {
- return new AnalysisInfoBuilder().build();
- }
- };
- StatisticsAutoCollector statisticsAutoCollector = new
StatisticsAutoCollector();
- AnalysisInfo analysisInfo2 = new AnalysisInfoBuilder()
- .setCatalogId(0)
- .setDBId(0)
- .setTblId(0).build();
-
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
- // uncomment it when updatedRows gets ready
- //
Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
-
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
- }
-
- @Test
- public void testSkipWideTable() {
-
- TableIf tableIf = new OlapTable();
-
- new MockUp<OlapTable>() {
- @Mock
- public List<Column> getBaseSchema() {
- return Lists.newArrayList(new Column("col1", Type.INT), new
Column("col2", Type.INT));
- }
- };
-
- new MockUp<StatisticsUtil>() {
- int count = 0;
- int [] thresholds = {1, 10};
- @Mock
- public TableIf findTable(long catalogName, long dbName, long
tblName) {
- return tableIf;
- }
-
- @Mock
- public int getAutoAnalyzeTableWidthThreshold() {
- return thresholds[count++];
- }
- };
-
- new MockUp<OlapTable>() {
- @Mock
- public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
- HashMap<String, Set<String>> ret = Maps.newHashMap();
- ret.put("key1", Sets.newHashSet());
- return ret;
- }
- };
-
- AnalysisInfo analysisInfo = new AnalysisInfoBuilder().build();
- StatisticsAutoCollector statisticsAutoCollector = new
StatisticsAutoCollector();
-
Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo));
-
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo));
- }
-
- @Test
- public void testLoop() {
- AtomicBoolean timeChecked = new AtomicBoolean();
- AtomicBoolean switchChecked = new AtomicBoolean();
- new MockUp<StatisticsUtil>() {
-
- @Mock
- public boolean inAnalyzeTime(LocalTime now) {
- timeChecked.set(true);
- return true;
- }
-
- @Mock
- public boolean enableAutoAnalyze() {
- switchChecked.set(true);
- return true;
- }
- };
- StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
- autoCollector.collect();
- Assertions.assertTrue(timeChecked.get() && switchChecked.get());
-
- }
-
- @Test
- public void checkAvailableThread() {
- StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
-
Assertions.assertEquals(Config.auto_analyze_simultaneously_running_task_num,
-
autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize());
- }
-
- @Test
- public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta
stats, @Mocked TableIf anyOtherTable) {
- new MockUp<OlapTable>() {
-
- @Mock
- public long getDataSize(boolean singleReplica) {
- return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5
+ 1000000000;
- }
- };
-
- new MockUp<AnalysisManager>() {
-
- @Mock
- public TableStatsMeta findTableStatsStatus(long tblId) {
- return stats;
- }
- };
- // A very huge table has been updated recently, so we should skip it
this time
- stats.updatedTime = System.currentTimeMillis() - 1000;
- stats.newPartitionLoaded = new AtomicBoolean();
- stats.newPartitionLoaded.set(true);
- StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
- // Test new partition loaded data for the first time. Not skip.
- Assertions.assertFalse(autoCollector.skip(olapTable));
- stats.newPartitionLoaded.set(false);
- // Assertions.assertTrue(autoCollector.skip(olapTable));
- // The update of this huge table is long time ago, so we shouldn't
skip it this time
- stats.updatedTime = System.currentTimeMillis()
- - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() -
10000;
- Assertions.assertFalse(autoCollector.skip(olapTable));
- new MockUp<AnalysisManager>() {
-
- @Mock
- public TableStatsMeta findTableStatsStatus(long tblId) {
- return null;
- }
- };
- // can't find table stats meta, which means this table never get
analyzed, so we shouldn't skip it this time
- Assertions.assertFalse(autoCollector.skip(olapTable));
- new MockUp<AnalysisManager>() {
-
- @Mock
- public TableStatsMeta findTableStatsStatus(long tblId) {
- return stats;
- }
- };
- stats.userInjected = true;
- Assertions.assertTrue(autoCollector.skip(olapTable));
- // this is not olap table nor external table, so we should skip it
this time
- Assertions.assertTrue(autoCollector.skip(anyOtherTable));
- }
-
- // For small table, use full
- @Test
- public void testCreateAnalyzeJobForTbl1(
- @Injectable OlapTable t1,
- @Injectable Database db
- ) throws Exception {
- new MockUp<Database>() {
-
- @Mock
- public CatalogIf getCatalog() {
- return Env.getCurrentInternalCatalog();
- }
-
- @Mock
- public long getId() {
- return 0;
- }
- };
- new MockUp<OlapTable>() {
-
- int count = 0;
-
- @Mock
- public List<Column> getBaseSchema() {
- return Lists.newArrayList(new Column("test",
PrimitiveType.INT));
- }
-
- @Mock
- public long getDataSize(boolean singleReplica) {
- return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() - 1;
- }
-
- @Mock
- public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
- return new OlapAnalysisTask(info);
- }
- };
-
- new MockUp<StatisticsUtil>() {
- @Mock
- public TableIf findTable(long catalogId, long dbId, long tblId) {
- return t1;
- }
- };
-
- StatisticsAutoCollector sac = new StatisticsAutoCollector();
- List<AnalysisInfo> jobInfos = new ArrayList<>();
- sac.createAnalyzeJobForTbl(db, jobInfos, t1);
- AnalysisInfo jobInfo = jobInfos.get(0);
- Map<String, Set<String>> colToPartitions = new HashMap<>();
- colToPartitions.put("test", new HashSet<String>() {
- {
- add("p1");
- }
- });
- jobInfo = new
AnalysisInfoBuilder(jobInfo).setColToPartitions(colToPartitions).build();
- Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
- AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
- analysisManager.createTaskForEachColumns(jobInfo, analysisTasks,
false);
- Assertions.assertEquals(1, analysisTasks.size());
- for (BaseAnalysisTask task : analysisTasks.values()) {
- Assertions.assertNull(task.getTableSample());
- }
- }
-
- // for big table, use sample
- @Test
- public void testCreateAnalyzeJobForTbl2(
- @Injectable OlapTable t1,
- @Injectable Database db
- ) throws Exception {
- new MockUp<Database>() {
-
- @Mock
- public CatalogIf getCatalog() {
- return Env.getCurrentInternalCatalog();
- }
-
- @Mock
- public long getId() {
- return 0;
- }
- };
- new MockUp<OlapTable>() {
-
- int count = 0;
-
- @Mock
- public List<Column> getBaseSchema() {
- return Lists.newArrayList(new Column("test",
PrimitiveType.INT));
- }
-
- @Mock
- public long getDataSize(boolean singleReplica) {
- return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 2;
- }
-
- @Mock
- public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
- return new OlapAnalysisTask(info);
- }
- };
-
- new MockUp<StatisticsUtil>() {
- @Mock
- public TableIf findTable(long catalogId, long dbId, long tblId) {
- return t1;
- }
- };
-
- StatisticsAutoCollector sac = new StatisticsAutoCollector();
- List<AnalysisInfo> jobInfos = new ArrayList<>();
- sac.createAnalyzeJobForTbl(db, jobInfos, t1);
- AnalysisInfo jobInfo = jobInfos.get(0);
- Map<String, Set<String>> colToPartitions = new HashMap<>();
- colToPartitions.put("test", new HashSet<String>() {
- {
- add("p1");
- }
- });
- jobInfo = new
AnalysisInfoBuilder(jobInfo).setColToPartitions(colToPartitions).build();
- Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
- AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
- analysisManager.createTaskForEachColumns(jobInfo, analysisTasks,
false);
- Assertions.assertEquals(1, analysisTasks.size());
- for (BaseAnalysisTask task : analysisTasks.values()) {
- Assertions.assertNotNull(task.getTableSample());
- }
- }
-
- @Test
- public void testDisableAuto1() throws Exception {
- InternalCatalog catalog1 =
EnvFactory.getInstance().createInternalCatalog();
- List<CatalogIf> catalogs = Lists.newArrayList();
- catalogs.add(catalog1);
-
- new MockUp<StatisticsAutoCollector>() {
- @Mock
- public List<CatalogIf> getCatalogsInOrder() {
- return catalogs;
- }
-
- @Mock
- protected boolean canCollect() {
- return false;
- }
-
- };
-
- StatisticsAutoCollector sac = new StatisticsAutoCollector();
- new Expectations(catalog1) {{
- catalog1.enableAutoAnalyze();
- times = 0;
- }};
-
- sac.analyzeAll();
- }
-
- @Test
- public void testDisableAuto2() throws Exception {
- InternalCatalog catalog1 =
EnvFactory.getInstance().createInternalCatalog();
- List<CatalogIf> catalogs = Lists.newArrayList();
- catalogs.add(catalog1);
-
- Database db1 = new Database();
- List<DatabaseIf<? extends TableIf>> dbs = Lists.newArrayList();
- dbs.add(db1);
-
- new MockUp<StatisticsAutoCollector>() {
- int count = 0;
- boolean[] canCollectReturn = {true, false};
- @Mock
- public List<CatalogIf> getCatalogsInOrder() {
- return catalogs;
- }
-
- @Mock
- public List<DatabaseIf<? extends TableIf>>
getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
- return dbs;
- }
-
- @Mock
- protected boolean canCollect() {
- return canCollectReturn[count++];
- }
-
- };
-
- StatisticsAutoCollector sac = new StatisticsAutoCollector();
- new Expectations(catalog1, db1) {{
- catalog1.enableAutoAnalyze();
- result = true;
- times = 1;
- db1.getFullName();
- times = 0;
- }};
-
- sac.analyzeAll();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]