This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/high-priority-column by this
push:
new 87494cbb462 Check column health value earlier, show job priority.
(#32064)
87494cbb462 is described below
commit 87494cbb462b1df7078382565e22ec99eaa47a76
Author: Jibing-Li <[email protected]>
AuthorDate: Mon Mar 11 19:46:20 2024 +0800
Check column health value earlier, show job priority. (#32064)
---
.../org/apache/doris/analysis/ShowAnalyzeStmt.java | 1 +
.../java/org/apache/doris/qe/SessionVariable.java | 2 +-
.../java/org/apache/doris/qe/ShowExecutor.java | 4 +-
.../org/apache/doris/statistics/AnalysisInfo.java | 8 +-
.../doris/statistics/AnalysisInfoBuilder.java | 10 +-
.../apache/doris/statistics/AnalysisManager.java | 10 +-
.../doris/statistics/FollowerColumnSender.java | 23 +++--
.../doris/statistics/HighPriorityColumn.java | 6 +-
.../org/apache/doris/statistics/JobPriority.java | 3 +-
.../doris/statistics/StatisticsAutoCollector.java | 102 ++++-----------------
.../doris/statistics/StatisticsJobAppender.java | 8 +-
.../doris/statistics/util/StatisticsUtil.java | 88 +++++++++++++++++-
gensrc/thrift/FrontendService.thrift | 6 +-
13 files changed, 161 insertions(+), 110 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java
index efcfc517024..734073901fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java
@@ -62,6 +62,7 @@ public class ShowAnalyzeStmt extends ShowStmt {
.add("schedule_type")
.add("start_time")
.add("end_time")
+ .add("priority")
.build();
private long jobId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 13c3e8baf0c..8c1c0326faf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1503,7 +1503,7 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG,
description = {"临时参数,收否自动收集所有内表", "Temp variable, enable to auto
collect all OlapTable."},
flag = VariableMgr.GLOBAL)
- public boolean enableAutoAnalyzeInternalCatalog = false;
+ public boolean enableAutoAnalyzeInternalCatalog = true;
@VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD,
description = {"参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index dda7045eabb..fa2046e5467 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -2859,6 +2859,7 @@ public class ShowExecutor {
java.time.ZoneId.systemDefault());
row.add(startTime.format(formatter));
row.add(endTime.format(formatter));
+ row.add(analysisInfo.priority.name());
resultRows.add(row);
} catch (Exception e) {
LOG.warn("Failed to get analyze info for table {}.{}.{},
reason: {}",
@@ -2876,8 +2877,7 @@ public class ShowExecutor {
for (AutoAnalysisPendingJob job : jobs) {
try {
List<String> row = new ArrayList<>();
- CatalogIf<? extends DatabaseIf<? extends TableIf>> c
- = StatisticsUtil.findCatalog(job.catalogName);
+ CatalogIf<? extends DatabaseIf<? extends TableIf>> c =
StatisticsUtil.findCatalog(job.catalogName);
row.add(c.getName());
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf =
c.getDb(job.dbName);
row.add(databaseIf.isPresent() ?
databaseIf.get().getFullName() : "DB may get deleted");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
index 0c5047a53c5..24cf6f38d68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
@@ -200,8 +200,12 @@ public class AnalysisInfo implements Writable {
*/
public final long tblUpdateTime;
+ @SerializedName("userInject")
public final boolean userInject;
+ @SerializedName("priority")
+ public final JobPriority priority;
+
public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long
catalogId, long dbId, long tblId,
Map<String, Set<String>> colToPartitions, Set<String>
partitionNames, String colName, Long indexId,
JobType jobType, AnalysisMode analysisMode, AnalysisMethod
analysisMethod, AnalysisType analysisType,
@@ -210,7 +214,7 @@ public class AnalysisInfo implements Writable {
boolean isExternalTableLevelTask, boolean partitionOnly, boolean
samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression
cronExpression, boolean forceFull,
boolean usingSqlForPartitionColumn, long tblUpdateTime, long
rowCount, boolean userInject,
- long updateRows) {
+ long updateRows, JobPriority priority) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
@@ -249,6 +253,7 @@ public class AnalysisInfo implements Writable {
this.rowCount = rowCount;
this.userInject = userInject;
this.updateRows = updateRows;
+ this.priority = priority;
}
@Override
@@ -293,6 +298,7 @@ public class AnalysisInfo implements Writable {
sj.add("rowCount: " + rowCount);
sj.add("userInject: " + userInject);
sj.add("updateRows: " + updateRows);
+ sj.add("priority: " + priority.name());
return sj.toString();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
index 527d503fd52..2f60b258598 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
@@ -65,6 +65,7 @@ public class AnalysisInfoBuilder {
private long rowCount;
private boolean userInject;
private long updateRows;
+ private JobPriority priority;
public AnalysisInfoBuilder() {
}
@@ -105,6 +106,7 @@ public class AnalysisInfoBuilder {
rowCount = info.rowCount;
userInject = info.userInject;
updateRows = info.updateRows;
+ priority = info.priority;
}
public AnalysisInfoBuilder setJobId(long jobId) {
@@ -282,12 +284,18 @@ public class AnalysisInfoBuilder {
return this;
}
+ public AnalysisInfoBuilder setPriority(JobPriority priority) {
+ this.priority = priority;
+ return this;
+ }
+
public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId,
tblId, colToPartitions, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod,
analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message,
lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition,
isAllPartition, partitionCount,
- cronExpression, forceFull, usingSqlForPartitionColumn,
tblUpdateTime, rowCount, userInject, updateRows);
+ cronExpression, forceFull, usingSqlForPartitionColumn,
tblUpdateTime, rowCount, userInject, updateRows,
+ priority);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index c6713b3a7c1..ddcdf459e6e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -219,7 +219,8 @@ public class AnalysisManager implements Writable {
public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws
DdlException {
// Using auto analyzer if user specifies.
if
(stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) {
-
Env.getCurrentEnv().getStatisticsAutoCollector().processOneJob(stmt.getTable(),
stmt.getColumnNames());
+ Env.getCurrentEnv().getStatisticsAutoCollector()
+ .processOneJob(stmt.getTable(), stmt.getColumnNames(),
JobPriority.HIGH);
return;
}
AnalysisInfo jobInfo = buildAndAssignJob(stmt);
@@ -422,6 +423,7 @@ public class AnalysisManager implements Writable {
infoBuilder.setRowCount(rowCount);
TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId());
infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 :
tableStatsStatus.updatedRows.get());
+ infoBuilder.setPriority(JobPriority.MANUAL);
return infoBuilder.build();
}
@@ -1230,12 +1232,14 @@ public class AnalysisManager implements Writable {
public void mergeFollowerQueryColumns(Collection<TQueryColumn> highColumns,
Collection<TQueryColumn> midColumns) {
for (TQueryColumn c : highColumns) {
- if (!highPriorityColumns.offer(new HighPriorityColumn(c.catalogId,
c.dbId, c.tblId, c.colName))) {
+ if (!highPriorityColumns.offer(new
HighPriorityColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId),
+ Long.parseLong(c.tblId), c.colName))) {
break;
}
}
for (TQueryColumn c : midColumns) {
- if (!midPriorityColumns.offer(new HighPriorityColumn(c.catalogId,
c.dbId, c.tblId, c.colName))) {
+ if (!midPriorityColumns.offer(new
HighPriorityColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId),
+ Long.parseLong(c.tblId), c.colName))) {
break;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java
index 181000c1ef2..0a804152694 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/FollowerColumnSender.java
@@ -32,14 +32,16 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
public class FollowerColumnSender extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(FollowerColumnSender.class);
- public static final long INTERVAL = 5000;
+ public static final long INTERVAL = 60000;
public FollowerColumnSender() {
super("Follower Column Sender", INTERVAL);
@@ -68,21 +70,28 @@ public class FollowerColumnSender extends MasterDaemon {
if (analysisManager.highPriorityColumns.isEmpty() &&
analysisManager.midPriorityColumns.isEmpty()) {
return;
}
- List<TQueryColumn> highPriorityColumns
+ Set<TQueryColumn> highPriorityColumns
= analysisManager.highPriorityColumns
.stream()
+ .filter(c -> StatisticsUtil.needAnalyzeColumn(c))
.map(HighPriorityColumn::toThrift)
- .collect(Collectors.toList());
- List<TQueryColumn> midPriorityColumns
+ .collect(Collectors.toSet());
+ Set<TQueryColumn> midPriorityColumns
= analysisManager.midPriorityColumns
.stream()
+ .filter(c -> StatisticsUtil.needAnalyzeColumn(c))
+ .filter(c -> !highPriorityColumns.contains(c))
.map(HighPriorityColumn::toThrift)
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
analysisManager.highPriorityColumns.clear();
analysisManager.midPriorityColumns.clear();
TSyncQueryColumns queryColumns = new TSyncQueryColumns();
- queryColumns.highPriorityColumns = highPriorityColumns;
- queryColumns.midPriorityColumns = midPriorityColumns;
+ List<TQueryColumn> highs = new ArrayList<>();
+ highs.addAll(highPriorityColumns);
+ queryColumns.highPriorityColumns = highs;
+ List<TQueryColumn> mids = new ArrayList<>();
+ mids.addAll(midPriorityColumns);
+ queryColumns.midPriorityColumns = mids;
Frontend master = null;
try {
InetSocketAddress masterAddress =
currentEnv.getHaProtocol().getLeader();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
index b2292ef725d..d619ef82c08 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
@@ -57,9 +57,9 @@ public class HighPriorityColumn {
public TQueryColumn toThrift() {
TQueryColumn tQueryColumn = new TQueryColumn();
- tQueryColumn.catalogId = catalogId;
- tQueryColumn.dbId = dbId;
- tQueryColumn.tblId = tblId;
+ tQueryColumn.catalogId = String.valueOf(catalogId);
+ tQueryColumn.dbId = String.valueOf(dbId);
+ tQueryColumn.tblId = String.valueOf(tblId);
tQueryColumn.colName = colName;
return tQueryColumn;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
index 2786b063563..c3656b92927 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
@@ -20,5 +20,6 @@ package org.apache.doris.statistics;
public enum JobPriority {
HIGH,
MID,
- LOW;
+ LOW,
+ MANUAL;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 227074dbb5c..c26e7b05efd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -23,9 +23,9 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.hive.HMSExternalTable;
-import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
@@ -39,6 +39,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -58,29 +59,22 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
@Override
protected void collect() {
while (canCollect()) {
- Map.Entry<TableName, Set<String>> job = getJob();
+ Pair<Entry<TableName, Set<String>>, JobPriority> job = getJob();
if (job == null) {
// No more job to process, break and sleep.
break;
}
try {
- TableName tblName = job.getKey();
+ TableName tblName = job.first.getKey();
TableIf table = StatisticsUtil.findTable(tblName.getCtl(),
tblName.getDb(), tblName.getTbl());
if (!supportAutoAnalyze(table)) {
continue;
}
- Set<String> columns = job.getValue()
- .stream()
- .filter(c -> {
- boolean needAnalyzeColumn =
needAnalyzeColumn(table, c);
- LOG.info("Need analyze column " + c + " ? " +
needAnalyzeColumn);
- return needAnalyzeColumn;
- })
- .collect(Collectors.toSet());
- processOneJob(table, columns);
+ Set<String> columns =
job.first.getValue().stream().collect(Collectors.toSet());
+ processOneJob(table, columns, job.second);
} catch (Exception e) {
- LOG.warn("Failed to analyze table {} with columns [{}]",
- job.getKey().getTbl(),
job.getValue().stream().collect(Collectors.joining(",")), e);
+ LOG.warn("Failed to analyze table {} with columns [{}]",
job.first.getKey().getTbl(),
+
job.first.getValue().stream().collect(Collectors.joining(",")), e);
}
}
}
@@ -90,18 +84,18 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
&&
StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
}
- protected Map.Entry<TableName, Set<String>> getJob() {
+ protected Pair<Entry<TableName, Set<String>>, JobPriority> getJob() {
AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
- Optional<Map.Entry<TableName, Set<String>>> job =
fetchJobFromMap(manager.highPriorityJobs);
+ Optional<Entry<TableName, Set<String>>> job =
fetchJobFromMap(manager.highPriorityJobs);
if (job.isPresent()) {
- return job.get();
+ return Pair.of(job.get(), JobPriority.HIGH);
}
job = fetchJobFromMap(manager.midPriorityJobs);
if (job.isPresent()) {
- return job.get();
+ return Pair.of(job.get(), JobPriority.MID);
}
job = fetchJobFromMap(manager.lowPriorityJobs);
- return job.isPresent() ? job.get() : null;
+ return job.isPresent() ? Pair.of(job.get(), JobPriority.LOW) : null;
}
protected Optional<Map.Entry<TableName, Set<String>>>
fetchJobFromMap(Map<TableName, Set<String>> jobMap) {
@@ -112,12 +106,12 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
}
}
- protected void processOneJob(TableIf table, Set<String> columns) throws
DdlException {
+ protected void processOneJob(TableIf table, Set<String> columns,
JobPriority priority) throws DdlException {
appendPartitionColumns(table, columns);
if (columns.isEmpty()) {
return;
}
- AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns);
+ AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns,
priority);
LOG.info("Analyze job : {}", analyzeJob.toString());
createSystemAnalysisJob(analyzeJob);
}
@@ -134,69 +128,6 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
}
}
- // TODO: Need refactor, hard to understand now.
- protected boolean needAnalyzeColumn(TableIf table, String column) {
- AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
- TableStatsMeta tableStatsStatus =
manager.findTableStatsStatus(table.getId());
- if (tableStatsStatus == null) {
- return true;
- }
- if (tableStatsStatus.userInjected) {
- return false;
- }
- ColStatsMeta columnStatsMeta =
tableStatsStatus.findColumnStatsMeta(column);
- if (columnStatsMeta == null) {
- return true;
- }
- if (table instanceof OlapTable) {
- long currentUpdatedRows = tableStatsStatus.updatedRows.get();
- long lastAnalyzeUpdateRows = columnStatsMeta.updatedRows;
- if (lastAnalyzeUpdateRows == 0 && currentUpdatedRows > 0) {
- return true;
- }
- if (lastAnalyzeUpdateRows > currentUpdatedRows) {
- // Shouldn't happen. Just in case.
- return true;
- }
- OlapTable olapTable = (OlapTable) table;
- long currentRowCount = olapTable.getRowCount();
- long lastAnalyzeRowCount = columnStatsMeta.rowCount;
- if (tableStatsStatus.newPartitionLoaded.get() &&
olapTable.isPartitionColumn(column)) {
- return true;
- }
- if (lastAnalyzeRowCount == 0 && currentRowCount > 0) {
- return true;
- }
- if (currentUpdatedRows == lastAnalyzeUpdateRows) {
- return false;
- }
- double healthValue = ((double) (currentUpdatedRows -
lastAnalyzeUpdateRows)
- / (double) currentUpdatedRows) * 100.0;
- LOG.info("Column " + column + " update rows health value is " +
healthValue);
- if (healthValue < StatisticsUtil.getTableStatsHealthThreshold()) {
- return true;
- }
- if (currentRowCount == 0 && lastAnalyzeRowCount != 0) {
- return true;
- }
- if (currentRowCount == 0 && lastAnalyzeRowCount == 0) {
- return false;
- }
- healthValue = ((double) (currentRowCount - lastAnalyzeRowCount) /
(double) currentRowCount) * 100.0;
- return healthValue < StatisticsUtil.getTableStatsHealthThreshold();
- } else {
- if (!(table instanceof HMSExternalTable)) {
- return false;
- }
- HMSExternalTable hmsTable = (HMSExternalTable) table;
- if (!hmsTable.getDlaType().equals(DLAType.HIVE)) {
- return false;
- }
- return System.currentTimeMillis()
- - tableStatsStatus.updatedTime >
StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
- }
- }
-
protected boolean supportAutoAnalyze(TableIf tableIf) {
if (tableIf == null) {
return false;
@@ -206,7 +137,7 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
&& ((HMSExternalTable)
tableIf).getDlaType().equals(HMSExternalTable.DLAType.HIVE);
}
- protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set<String>
columns) {
+ protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set<String>
columns, JobPriority priority) {
AnalysisMethod analysisMethod = table.getDataSize(true) >=
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
@@ -236,6 +167,7 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
.setRowCount(rowCount)
.setUpdateRows(tableStatsStatus == null ? 0 :
tableStatsStatus.updatedRows.get())
.setColToPartitions(colToPartitions)
+ .setPriority(priority)
.build();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
index 93d03a3fdb8..9e07c65e2fe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
@@ -86,6 +86,9 @@ public class StatisticsJobAppender extends MasterDaemon {
int size = columnQueue.size();
for (int i = 0; i < size; i++) {
HighPriorityColumn column = columnQueue.poll();
+ if (!StatisticsUtil.needAnalyzeColumn(column)) {
+ continue;
+ }
LOG.info("Process column " + column.tblId + "." + column.colName);
TableIf table = StatisticsUtil.findTable(column.catalogId,
column.dbId, column.tblId);
TableName tableName = new
TableName(table.getDatabase().getCatalog().getName(),
@@ -132,8 +135,9 @@ public class StatisticsJobAppender extends MasterDaemon {
if (!jobsMap.containsKey(tableName) && jobsMap.size() >=
JOB_MAP_SIZE) {
return;
}
- Set<String> columns
- = t.getColumns().stream().filter(c ->
!StatisticsUtil.isUnsupportedType(c.getType()))
+ Set<String> columns = t.getColumns().stream()
+ .filter(c ->
!StatisticsUtil.isUnsupportedType(c.getType()))
+ .filter(c -> StatisticsUtil.needAnalyzeColumn(t,
c.getName()))
.map(c -> c.getName()).collect(Collectors.toSet());
if (jobsMap.containsKey(tableName)) {
jobsMap.get(tableName).addAll(columns);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 65c2ee9e6da..27aa83ba716 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -58,6 +58,7 @@ import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
@@ -68,11 +69,15 @@ import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
+import org.apache.doris.statistics.AnalysisManager;
+import org.apache.doris.statistics.ColStatsMeta;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
+import org.apache.doris.statistics.HighPriorityColumn;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.StatisticConstants;
+import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.system.Frontend;
import com.google.common.base.Preconditions;
@@ -874,7 +879,7 @@ public class StatisticsUtil {
} catch (Exception e) {
LOG.warn("Fail to get value of enable auto analyze internal
catalog, return false by default", e);
}
- return false;
+ return true;
}
public static int getInsertMergeCount() {
@@ -984,4 +989,85 @@ public class StatisticsUtil {
||
columnName.startsWith(CreateMaterializedViewStmt.MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX);
}
+ // TODO: Need refactor, hard to understand now.
+ public static boolean needAnalyzeColumn(TableIf table, String column) {
+ AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
+ TableStatsMeta tableStatsStatus =
manager.findTableStatsStatus(table.getId());
+ if (tableStatsStatus == null) {
+ return true;
+ }
+ if (tableStatsStatus.userInjected) {
+ return false;
+ }
+ ColStatsMeta columnStatsMeta =
tableStatsStatus.findColumnStatsMeta(column);
+ if (columnStatsMeta == null) {
+ return true;
+ }
+ if (table instanceof OlapTable) {
+ long currentUpdatedRows = tableStatsStatus.updatedRows.get();
+ long lastAnalyzeUpdateRows = columnStatsMeta.updatedRows;
+ if (lastAnalyzeUpdateRows == 0 && currentUpdatedRows > 0) {
+ return true;
+ }
+ if (lastAnalyzeUpdateRows > currentUpdatedRows) {
+ // Shouldn't happen. Just in case.
+ return true;
+ }
+ OlapTable olapTable = (OlapTable) table;
+ long currentRowCount = olapTable.getRowCount();
+ long lastAnalyzeRowCount = columnStatsMeta.rowCount;
+ if (tableStatsStatus.newPartitionLoaded.get() &&
olapTable.isPartitionColumn(column)) {
+ return true;
+ }
+ if (lastAnalyzeRowCount == 0 && currentRowCount > 0) {
+ return true;
+ }
+ if (currentUpdatedRows == lastAnalyzeUpdateRows) {
+ return false;
+ }
+ double healthValue = ((double) (currentUpdatedRows -
lastAnalyzeUpdateRows)
+ / (double) currentUpdatedRows) * 100.0;
+ LOG.info("Column " + column + " update rows health value is " +
healthValue);
+ if (healthValue < StatisticsUtil.getTableStatsHealthThreshold()) {
+ return true;
+ }
+ if (currentRowCount == 0 && lastAnalyzeRowCount != 0) {
+ return true;
+ }
+ if (currentRowCount == 0 && lastAnalyzeRowCount == 0) {
+ return false;
+ }
+ healthValue = ((double) (currentRowCount - lastAnalyzeRowCount) /
(double) currentRowCount) * 100.0;
+ return healthValue < StatisticsUtil.getTableStatsHealthThreshold();
+ } else {
+ if (!(table instanceof HMSExternalTable)) {
+ return false;
+ }
+ HMSExternalTable hmsTable = (HMSExternalTable) table;
+ if (!hmsTable.getDlaType().equals(DLAType.HIVE)) {
+ return false;
+ }
+ return System.currentTimeMillis()
+ - tableStatsStatus.updatedTime >
StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
+ }
+ }
+
+ public static boolean needAnalyzeColumn(HighPriorityColumn column) {
+ if (column == null) {
+ return false;
+ }
+ TableIf table;
+ Column col;
+ try {
+ table = StatisticsUtil.findTable(column.catalogId, column.dbId,
column.tblId);
+ col = table.getColumn(column.colName);
+ } catch (Exception e) {
+ LOG.warn("Failed to find table for column {}", column.colName, e);
+ return false;
+ }
+ return col != null
+ && !StatisticsUtil.isUnsupportedType(col.getType())
+ && StatisticsUtil.needAnalyzeColumn(table, column.colName);
+ }
+
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index d2cdecbbe56..6be02613d5e 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1411,9 +1411,9 @@ struct TReportCommitTxnResultRequest {
}
struct TQueryColumn {
- 1: optional i64 catalogId
- 2: optional i64 dbId
- 3: optional i64 tblId
+ 1: optional string catalogId
+ 2: optional string dbId
+ 3: optional string tblId
4: optional string colName
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]