This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/high-priority-column by this
push:
new 65a71e86856 Support column level health value. (#31794)
65a71e86856 is described below
commit 65a71e86856721ff0304e8d8889b3ddbe5ba8ee5
Author: Jibing-Li <[email protected]>
AuthorDate: Tue Mar 5 16:19:08 2024 +0800
Support column level health value. (#31794)
---
.../apache/doris/analysis/ShowColumnStatsStmt.java | 4 +
.../apache/doris/datasource/InternalCatalog.java | 6 +-
.../java/org/apache/doris/qe/SessionVariable.java | 7 ++
.../org/apache/doris/statistics/AnalysisInfo.java | 17 ++--
.../doris/statistics/AnalysisInfoBuilder.java | 17 ++--
.../org/apache/doris/statistics/AnalysisJob.java | 8 +-
.../apache/doris/statistics/AnalysisManager.java | 13 ++-
.../org/apache/doris/statistics/ColStatsMeta.java | 16 ++--
.../apache/doris/statistics/OlapAnalysisTask.java | 2 +-
.../doris/statistics/StatisticsAutoCollector.java | 95 ++++++++++++++++++++--
.../doris/statistics/StatisticsJobAppender.java | 28 +++++--
.../apache/doris/statistics/TableStatsMeta.java | 33 ++++----
.../doris/statistics/util/StatisticsUtil.java | 10 +++
.../apache/doris/statistics/AnalysisJobTest.java | 4 +-
.../doris/statistics/AnalysisManagerTest.java | 2 +-
15 files changed, 198 insertions(+), 64 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
index 37be76b20df..749bfa7d360 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
@@ -61,6 +61,8 @@ public class ShowColumnStatsStmt extends ShowStmt {
.add("trigger")
.add("query_times")
.add("updated_time")
+ .add("update_rows")
+ .add("last_analyze_row_count")
.build();
private final TableName tableName;
@@ -160,6 +162,8 @@ public class ShowColumnStatsStmt extends ShowStmt {
row.add(String.valueOf(colStatsMeta == null ? "N/A" :
colStatsMeta.jobType));
row.add(String.valueOf(colStatsMeta == null ? "N/A" :
colStatsMeta.queriedTimes));
row.add(String.valueOf(p.second.updatedTime));
+ row.add(String.valueOf(colStatsMeta == null ? "N/A" :
colStatsMeta.updatedRows));
+ row.add(String.valueOf(colStatsMeta == null ? "N/A" :
colStatsMeta.rowCount));
result.add(row);
});
return new ShowResultSet(getMetaData(), result);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 95abc9e06c4..61a1073da34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3116,7 +3116,6 @@ public class InternalCatalog implements
CatalogIf<Database> {
rowsToTruncate += partition.getBaseIndex().getRowCount();
}
} else {
- rowsToTruncate = olapTable.getRowCount();
for (Partition partition : olapTable.getPartitions()) {
// If need absolutely correct, should check running txn
here.
// But if the txn is in prepare state, cann't known which
partitions had load data.
@@ -3125,6 +3124,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
origPartitions.put(partition.getName(), partition.getId());
partitionsDistributionInfo.put(partition.getId(),
partition.getDistributionInfo());
+ rowsToTruncate += partition.getBaseIndex().getRowCount();
}
}
// if table currently has no partitions, this sql like empty
command and do nothing, should return directly.
@@ -3285,10 +3285,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
if (truncateEntireTable) {
// Drop the whole table stats after truncate the entire table
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
- } else {
- // Update the updated rows in table stats after truncate some
partitions.
-
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords);
}
+
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords);
LOG.info("finished to truncate table {}, partitions: {}",
tblRef.getName().toSql(), tblRef.getPartitionNames());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 5f7e8ebc32e..a8830f6fc31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -468,6 +468,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_AUTO_ANALYZE = "enable_auto_analyze";
+ public static final String ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG =
"enable_auto_analyze_internal_catalog";
+
public static final String AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD =
"auto_analyze_table_width_threshold";
public static final String FASTER_FLOAT_CONVERT = "faster_float_convert";
@@ -1492,6 +1494,11 @@ public class SessionVariable implements Serializable,
Writable {
flag = VariableMgr.GLOBAL)
public boolean enableAutoAnalyze = true;
+ @VariableMgr.VarAttr(name = ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG,
+ description = {"临时参数,收否自动收集所有内表", "Temp variable, enable to auto
collect all OlapTable."},
+ flag = VariableMgr.GLOBAL)
+ public boolean enableAutoAnalyzeInternalCatalog = false;
+
@VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD,
description = {"参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集",
"Maximum table width to enable auto analyze, "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
index c707107e0e0..0c5047a53c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java
@@ -188,8 +188,11 @@ public class AnalysisInfo implements Writable {
@SerializedName("endTime")
public long endTime;
- @SerializedName("emptyJob")
- public final boolean emptyJob;
+ @SerializedName("rowCount")
+ public final long rowCount;
+
+ @SerializedName("updateRows")
+ public final long updateRows;
/**
*
* Used to store the newest partition version of tbl when creating this
job.
@@ -206,7 +209,8 @@ public class AnalysisInfo implements Writable {
long lastExecTimeInMs, long timeCostInMs, AnalysisState state,
ScheduleType scheduleType,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean
samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression
cronExpression, boolean forceFull,
- boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean
emptyJob, boolean userInject) {
+ boolean usingSqlForPartitionColumn, long tblUpdateTime, long
rowCount, boolean userInject,
+ long updateRows) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
@@ -242,8 +246,9 @@ public class AnalysisInfo implements Writable {
this.forceFull = forceFull;
this.usingSqlForPartitionColumn = usingSqlForPartitionColumn;
this.tblUpdateTime = tblUpdateTime;
- this.emptyJob = emptyJob;
+ this.rowCount = rowCount;
this.userInject = userInject;
+ this.updateRows = updateRows;
}
@Override
@@ -285,7 +290,9 @@ public class AnalysisInfo implements Writable {
}
sj.add("forceFull: " + forceFull);
sj.add("usingSqlForPartitionColumn: " + usingSqlForPartitionColumn);
- sj.add("emptyJob: " + emptyJob);
+ sj.add("rowCount: " + rowCount);
+ sj.add("userInject: " + userInject);
+ sj.add("updateRows: " + updateRows);
return sj.toString();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
index 22f3d22b3ce..527d503fd52 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java
@@ -62,8 +62,9 @@ public class AnalysisInfoBuilder {
private boolean forceFull;
private boolean usingSqlForPartitionColumn;
private long tblUpdateTime;
- private boolean emptyJob;
+ private long rowCount;
private boolean userInject;
+ private long updateRows;
public AnalysisInfoBuilder() {
}
@@ -101,8 +102,9 @@ public class AnalysisInfoBuilder {
forceFull = info.forceFull;
usingSqlForPartitionColumn = info.usingSqlForPartitionColumn;
tblUpdateTime = info.tblUpdateTime;
- emptyJob = info.emptyJob;
+ rowCount = info.rowCount;
userInject = info.userInject;
+ updateRows = info.updateRows;
}
public AnalysisInfoBuilder setJobId(long jobId) {
@@ -265,8 +267,8 @@ public class AnalysisInfoBuilder {
return this;
}
- public AnalysisInfoBuilder setEmptyJob(boolean emptyJob) {
- this.emptyJob = emptyJob;
+ public AnalysisInfoBuilder setRowCount(long rowCount) {
+ this.rowCount = rowCount;
return this;
}
@@ -275,12 +277,17 @@ public class AnalysisInfoBuilder {
return this;
}
+ public AnalysisInfoBuilder setUpdateRows(long updateRows) {
+ this.updateRows = updateRows;
+ return this;
+ }
+
public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId,
tblId, colToPartitions, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod,
analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message,
lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition,
isAllPartition, partitionCount,
- cronExpression, forceFull, usingSqlForPartitionColumn,
tblUpdateTime, emptyJob, userInject);
+ cronExpression, forceFull, usingSqlForPartitionColumn,
tblUpdateTime, rowCount, userInject, updateRows);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
index 22eab37f920..6846abea431 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
@@ -84,14 +84,12 @@ public class AnalysisJob {
protected void markOneTaskDone() {
if (queryingTask.isEmpty()) {
try {
- writeBuf();
- updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
- + (System.currentTimeMillis() - start) / 1000);
+ flushBuffer();
} finally {
deregisterJob();
}
} else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) {
- writeBuf();
+ flushBuffer();
}
}
@@ -115,7 +113,7 @@ public class AnalysisJob {
}
}
- protected void writeBuf() {
+ protected void flushBuffer() {
if (killed) {
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index e7ab342d091..0ff9e3a9e00 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -413,7 +413,10 @@ public class AnalysisManager implements Writable {
infoBuilder.setColToPartitions(colToPartitions);
infoBuilder.setTaskIds(Lists.newArrayList());
infoBuilder.setTblUpdateTime(table.getUpdateTime());
- infoBuilder.setEmptyJob(table instanceof OlapTable &&
table.getRowCount() == 0);
+ long rowCount = table.getRowCount();
+ infoBuilder.setRowCount(rowCount);
+ TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId());
+ infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 :
tableStatsStatus.updatedRows.get());
return infoBuilder.build();
}
@@ -569,7 +572,7 @@ public class AnalysisManager implements Writable {
}
TableStatsMeta tableStats = findTableStatsStatus(tbl.getId());
if (tableStats == null) {
- updateTableStatsStatus(new TableStatsMeta(jobInfo.emptyJob ? 0 :
tbl.getRowCount(), jobInfo, tbl));
+ updateTableStatsStatus(new TableStatsMeta(jobInfo.rowCount,
jobInfo, tbl));
} else {
tableStats.update(jobInfo, tbl);
logCreateTableStats(tableStats);
@@ -802,7 +805,7 @@ public class AnalysisManager implements Writable {
analysisInfo.dbId, analysisInfo.tblId);
return table.createAnalysisTask(analysisInfo);
} catch (Throwable t) {
- LOG.warn("Failed to find table", t);
+ LOG.warn("Failed to create task.", t);
throw new DdlException("Failed to create task", t);
}
}
@@ -1141,10 +1144,12 @@ public class AnalysisManager implements Writable {
public void updateColumnUsedInPredicate(Set<Slot> slotReferences) {
+ LOG.info("Add slots to high priority queues.");
updateColumn(slotReferences, highPriorityColumns);
}
public void updateQueriedColumn(Collection<Slot> slotReferences) {
+ LOG.info("Add slots to mid priority queues.");
updateColumn(slotReferences, midPriorityColumns);
}
@@ -1164,6 +1169,8 @@ public class AnalysisManager implements Writable {
if (catalog != null) {
queue.offer(new HighPriorityColumn(catalog.getId(),
database.getId(),
table.getId(),
optionalColumn.get().getName()));
+ LOG.info("Offer column " + table.getName() + "(" +
table.getId() + ")."
+ + optionalColumn.get().getName());
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
index 445641b2505..7e317d67bd7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java
@@ -43,16 +43,20 @@ public class ColStatsMeta {
@SerializedName("trigger")
public JobType jobType;
- public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod,
- AnalysisType analysisType, JobType jobType, long queriedTimes) {
+ @SerializedName("updatedRows")
+ public long updatedRows;
+
+ @SerializedName("rowCount")
+ public long rowCount;
+
+ public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod,
AnalysisType analysisType, JobType jobType,
+ long queriedTimes, long rowCount, long updatedRows) {
this.updatedTime = updatedTime;
this.analysisMethod = analysisMethod;
this.analysisType = analysisType;
this.jobType = jobType;
this.queriedTimes.addAndGet(queriedTimes);
- }
-
- public void clear() {
- updatedTime = 0;
+ this.updatedRows = updatedRows;
+ this.rowCount = rowCount;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index 5a1d5829c8c..179766d1af8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -65,7 +65,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
public void doExecute() throws Exception {
Set<String> partitionNames = info.colToPartitions.get(info.colName);
- if ((info.emptyJob &&
info.analysisMethod.equals(AnalysisInfo.AnalysisMethod.SAMPLE))
+ if ((info.rowCount == 0 &&
info.analysisMethod.equals(AnalysisInfo.AnalysisMethod.SAMPLE))
|| partitionNames == null || partitionNames.isEmpty()) {
if (partitionNames == null) {
LOG.warn("Table {}.{}.{}, partitionNames for column {} is
null. ColToPartitions:[{}]",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 19b9c69db08..c498881bfbf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -23,6 +23,8 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
@@ -33,6 +35,8 @@ import org.apache.logging.log4j.Logger;
import java.time.LocalTime;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -60,9 +64,16 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
}
try {
TableIf table = job.getKey();
+ if (!supportAutoAnalyze(table)) {
+ continue;
+ }
Set<String> columns = job.getValue()
.stream()
- .filter(c -> needAnalyzeColumn(table, c))
+ .filter(c -> {
+ boolean needAnalyzeColumn =
needAnalyzeColumn(table, c);
+ LOG.info("Need analyze column " + c + " ? " +
needAnalyzeColumn);
+ return needAnalyzeColumn;
+ })
.collect(Collectors.toSet());
processOneJob(table, columns);
} catch (Exception e) {
@@ -100,22 +111,92 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
}
protected void processOneJob(TableIf table, Set<String> columns) throws
DdlException {
- Set<String> collect = columns.stream().filter(c ->
needAnalyzeColumn(table, c)).collect(Collectors.toSet());
- if (collect.isEmpty()) {
+ appendPartitionColumns(table, columns);
+ if (columns.isEmpty()) {
return;
}
AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns);
+ LOG.info("Analyze job : {}", analyzeJob.toString());
createSystemAnalysisJob(analyzeJob);
}
+ protected void appendPartitionColumns(TableIf table, Set<String> columns) {
+ if (!(table instanceof OlapTable)) {
+ return;
+ }
+ AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
+ TableStatsMeta tableStatsStatus =
manager.findTableStatsStatus(table.getId());
+ if (tableStatsStatus != null &&
tableStatsStatus.newPartitionLoaded.get()) {
+ OlapTable olapTable = (OlapTable) table;
+ columns.addAll(olapTable.getPartitionNames());
+ }
+ }
+
protected boolean needAnalyzeColumn(TableIf table, String column) {
- //TODO: Calculate column health value.
- return true;
+ AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
+ TableStatsMeta tableStatsStatus =
manager.findTableStatsStatus(table.getId());
+ if (tableStatsStatus == null) {
+ return true;
+ }
+ if (tableStatsStatus.userInjected) {
+ return false;
+ }
+ ColStatsMeta columnStatsMeta =
tableStatsStatus.findColumnStatsMeta(column);
+ if (columnStatsMeta == null) {
+ return true;
+ }
+ if (table instanceof OlapTable) {
+ long currentUpdatedRows = tableStatsStatus.updatedRows.get();
+ long lastAnalyzeUpdateRows = columnStatsMeta.updatedRows;
+ if (lastAnalyzeUpdateRows == 0 && currentUpdatedRows > 0) {
+ return true;
+ }
+ OlapTable olapTable = (OlapTable) table;
+ if (tableStatsStatus.newPartitionLoaded.get() &&
olapTable.isPartitionColumn(column)) {
+ return true;
+ }
+ if (columnStatsMeta.rowCount == 0 && olapTable.getRowCount() > 0) {
+ return true;
+ }
+ if (currentUpdatedRows == lastAnalyzeUpdateRows) {
+ return false;
+ }
+ double healthValue = ((double) (currentUpdatedRows -
lastAnalyzeUpdateRows)
+ / (double) currentUpdatedRows) * 100.0;
+ LOG.info("Column " + column + " health value is " + healthValue);
+ return healthValue < StatisticsUtil.getTableStatsHealthThreshold();
+ } else {
+ if (!(table instanceof HMSExternalTable)) {
+ return false;
+ }
+ HMSExternalTable hmsTable = (HMSExternalTable) table;
+ if (!hmsTable.getDlaType().equals(DLAType.HIVE)) {
+ return false;
+ }
+ return System.currentTimeMillis()
+ - tableStatsStatus.updatedTime >
StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
+ }
+ }
+
+ protected boolean supportAutoAnalyze(TableIf tableIf) {
+ if (tableIf == null) {
+ return false;
+ }
+ return tableIf instanceof OlapTable
+ || tableIf instanceof HMSExternalTable
+ && ((HMSExternalTable)
tableIf).getDlaType().equals(HMSExternalTable.DLAType.HIVE);
}
protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set<String>
columns) {
AnalysisMethod analysisMethod = table.getDataSize(true) >=
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
+ AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
+ TableStatsMeta tableStatsStatus =
manager.findTableStatsStatus(table.getId());
+ long rowCount = table.getRowCount();
+ Map<String, Set<String>> colToPartitions = new HashMap<>();
+ Set<String> dummyPartition = new HashSet<>();
+ dummyPartition.add("dummy partition");
+ columns.stream().forEach(c -> colToPartitions.put(c, dummyPartition));
return new AnalysisInfoBuilder()
.setJobId(Env.getCurrentEnv().getNextId())
.setCatalogId(table.getDatabase().getCatalog().getId())
@@ -133,7 +214,9 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
.setLastExecTimeInMs(System.currentTimeMillis())
.setJobType(JobType.SYSTEM)
.setTblUpdateTime(table.getUpdateTime())
- .setEmptyJob(table instanceof OlapTable && table.getRowCount()
== 0)
+ .setRowCount(rowCount)
+ .setUpdateRows(tableStatsStatus == null ? 0 :
tableStatsStatus.updatedRows.get())
+ .setColToPartitions(colToPartitions)
.build();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
index 73d0d1340ad..71bb71d3cda 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
@@ -19,6 +19,7 @@ package org.apache.doris.statistics;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.util.MasterDaemon;
@@ -28,7 +29,7 @@ import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -70,34 +71,44 @@ public class StatisticsJobAppender extends MasterDaemon {
protected void appendJobs() {
AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager();
+ // LOG.info("Append column to high priority job map.");
appendColumnsToJobs(manager.highPriorityColumns,
manager.highPriorityJobs);
+ // LOG.info("Append column to mid priority job map.");
appendColumnsToJobs(manager.midPriorityColumns,
manager.midPriorityJobs);
- appendToLowQueue(manager.lowPriorityJobs);
+ if (StatisticsUtil.enableAutoAnalyzeInternalCatalog()) {
+ // LOG.info("Append column to low priority job map.");
+ appendToLowQueue(manager.lowPriorityJobs);
+ }
}
protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue,
Map<TableIf, Set<String>> jobsMap) {
int size = columnQueue.size();
for (int i = 0; i < size; i++) {
HighPriorityColumn column = columnQueue.poll();
+ LOG.info("Process column " + column.tblId + "." + column.colName);
TableIf table = StatisticsUtil.findTable(column.catalogId,
column.dbId, column.tblId);
synchronized (jobsMap) {
// If job map reach the upper limit, stop putting new jobs.
if (!jobsMap.containsKey(table) && jobsMap.size() >=
JOB_MAP_SIZE) {
+ LOG.info("Job map full.");
break;
}
if (jobsMap.containsKey(table)) {
jobsMap.get(table).add(column.colName);
} else {
- jobsMap.put(table, Collections.singleton(column.colName));
+ HashSet<String> columns = new HashSet<>();
+ columns.add(column.colName);
+ jobsMap.put(table, columns);
}
+ LOG.info("Column " + column.tblId + "." + column.colName + "
added");
}
}
}
- protected void appendToLowQueue(Map<TableIf, Set<String>> jobsMap) {
-
+ protected void appendToLowQueue(Map<TableIf, Set<String>> jobsMap) {
InternalCatalog catalog = Env.getCurrentInternalCatalog();
List<Long> sortedDbs =
catalog.getDbIds().stream().sorted().collect(Collectors.toList());
+ int batchSize = 100;
for (long dbId : sortedDbs) {
if (dbId < currentDbId
||
StatisticConstants.SYSTEM_DBS.contains(catalog.getDbNullable(dbId).getFullName()))
{
@@ -108,11 +119,11 @@ public class StatisticsJobAppender extends MasterDaemon {
List<Table> tables = db.get().getTables().stream()
.sorted((t1, t2) -> (int) (t1.getId() -
t2.getId())).collect(Collectors.toList());
for (Table t : tables) {
- if (t.getId() <= currentTableId) {
+ if (!(t instanceof OlapTable) || t.getId() <= currentTableId) {
continue;
}
synchronized (jobsMap) {
- // If job map reach the upper limit, stop putting new jobs.
+ // If job map reach the upper limit, stop adding new jobs.
if (!jobsMap.containsKey(t) && jobsMap.size() >=
JOB_MAP_SIZE) {
return;
}
@@ -126,6 +137,9 @@ public class StatisticsJobAppender extends MasterDaemon {
}
}
currentTableId = t.getId();
+ if (--batchSize <= 0) {
+ return;
+ }
}
}
// All tables have been processed once, reset for the next loop.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
index 9231c6a2bc7..96ca0aab54c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
@@ -126,11 +126,6 @@ public class TableStatsMeta implements Writable {
return colNameToColStatsMeta.keySet();
}
- public void reset() {
- updatedTime = 0;
- colNameToColStatsMeta.values().forEach(ColStatsMeta::clear);
- }
-
public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
updatedTime = analyzedJob.tblUpdateTime;
userInjected = analyzedJob.userInject;
@@ -145,34 +140,34 @@ public class TableStatsMeta implements Writable {
for (String col : cols) {
ColStatsMeta colStatsMeta = colNameToColStatsMeta.get(col);
if (colStatsMeta == null) {
- colNameToColStatsMeta.put(col, new ColStatsMeta(updatedTime,
- analyzedJob.analysisMethod, analyzedJob.analysisType,
analyzedJob.jobType, 0));
+ colNameToColStatsMeta.put(col, new ColStatsMeta(updatedTime,
analyzedJob.analysisMethod,
+ analyzedJob.analysisType, analyzedJob.jobType, 0,
analyzedJob.rowCount,
+ analyzedJob.updateRows));
} else {
colStatsMeta.updatedTime = updatedTime;
colStatsMeta.analysisType = analyzedJob.analysisType;
colStatsMeta.analysisMethod = analyzedJob.analysisMethod;
colStatsMeta.jobType = analyzedJob.jobType;
+ colStatsMeta.updatedRows = analyzedJob.updateRows;
+ colStatsMeta.rowCount = analyzedJob.rowCount;
}
}
jobType = analyzedJob.jobType;
if (tableIf != null) {
if (tableIf instanceof OlapTable) {
- rowCount = analyzedJob.emptyJob ? 0 : tableIf.getRowCount();
- }
- if (!analyzedJob.emptyJob && analyzedJob.colToPartitions.keySet()
- .containsAll(tableIf.getBaseSchema().stream()
- .filter(c ->
!StatisticsUtil.isUnsupportedType(c.getType()))
-
.map(Column::getName).collect(Collectors.toSet()))) {
- updatedRows.set(0);
- newPartitionLoaded.set(false);
- }
- if (tableIf instanceof OlapTable) {
+ rowCount = analyzedJob.rowCount;
PartitionInfo partitionInfo = ((OlapTable)
tableIf).getPartitionInfo();
- if (partitionInfo != null &&
analyzedJob.colToPartitions.keySet()
+ if (analyzedJob.rowCount != 0 && partitionInfo != null &&
analyzedJob.colToPartitions.keySet()
.containsAll(partitionInfo.getPartitionColumns().stream()
-
.map(Column::getName).collect(Collectors.toSet()))) {
+
.map(Column::getName).collect(Collectors.toSet()))) {
newPartitionLoaded.set(false);
}
+ if (analyzedJob.rowCount != 0 &&
analyzedJob.colToPartitions.keySet()
+ .containsAll(tableIf.getBaseSchema().stream()
+ .filter(c ->
!StatisticsUtil.isUnsupportedType(c.getType()))
+
.map(Column::getName).collect(Collectors.toSet()))) {
+ userInjected = false;
+ }
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 8ee08d57e69..65c2ee9e6da 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -867,6 +867,16 @@ public class StatisticsUtil {
return false;
}
+ public static boolean enableAutoAnalyzeInternalCatalog() {
+ try {
+ return findConfigFromGlobalSessionVar(
+
SessionVariable.ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG).enableAutoAnalyzeInternalCatalog;
+ } catch (Exception e) {
+ LOG.warn("Fail to get value of enable auto analyze internal
catalog, return false by default", e);
+ }
+ return false;
+ }
+
public static int getInsertMergeCount() {
try {
return
findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT)
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index 1bf2041bb4f..cb2637d5cf6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -184,7 +184,7 @@ public class AnalysisJobTest {
protected void syncLoadStats() {
}
};
- job.writeBuf();
+ job.flushBuffer();
Assertions.assertEquals(0, job.queryFinished.size());
}
@@ -210,7 +210,7 @@ public class AnalysisJobTest {
job.buf.add(new ColStatsData());
job.queryFinished = new HashSet<>();
job.queryFinished.add(task2);
- job.writeBuf();
+ job.flushBuffer();
Assertions.assertEquals(0, job.queryFinished.size());
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index f8a77fe06db..4e8bbfe5aff 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -306,7 +306,7 @@ public class AnalysisManagerTest {
Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2));
TableStatsMeta stats3 = new TableStatsMeta(0, new AnalysisInfoBuilder()
- .setColToPartitions(new
HashMap<>()).setEmptyJob(true).setColName("col1").build(), olapTable);
+ .setColToPartitions(new
HashMap<>()).setRowCount(0).setColName("col1").build(), olapTable);
Assertions.assertTrue(olapTable.needReAnalyzeTable(stats3));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]