This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e23bb788e7f [improvement](statistics)Make partition batch size
configurable and limit total rows to scan in partition analyze. (#36410)
e23bb788e7f is described below
commit e23bb788e7fa27f1fc59ef64c1a8b2110872f0e1
Author: Jibing-Li <[email protected]>
AuthorDate: Wed Jun 19 10:13:24 2024 +0800
[improvement](statistics)Make partition batch size configurable and limit
total rows to scan in partition analyze. (#36410)
1. Make the number of partitions to collect in one batch configurable
2. Add a limit to the total row count while doing partition analyzing in
one batch.
---
.../src/main/java/org/apache/doris/qe/GlobalVariable.java | 8 ++++++++
.../src/main/java/org/apache/doris/qe/SessionVariable.java | 1 -
.../java/org/apache/doris/statistics/BaseAnalysisTask.java | 14 +++++++-------
.../org/apache/doris/statistics/util/StatisticsUtil.java | 5 +++++
4 files changed, 20 insertions(+), 8 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java
index 14d8c35ff72..4790cd2172b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java
@@ -63,6 +63,8 @@ public final class GlobalVariable {
public static final String DEFAULT_USING_META_CACHE_FOR_EXTERNAL_CATALOG
= "default_using_meta_cache_for_external_catalog";
+ public static final String PARTITION_ANALYZE_BATCH_SIZE =
"partition_analyze_batch_size";
+
@VariableMgr.VarAttr(name = VERSION_COMMENT, flag = VariableMgr.READ_ONLY)
public static String versionComment = "Doris version "
+ Version.DORIS_BUILD_VERSION + "-" +
Version.DORIS_BUILD_SHORT_HASH;
@@ -155,6 +157,12 @@ public final class GlobalVariable {
"Only for compatibility with MySQL ecosystem, no practical
meaning"})
public static boolean super_read_only = true;
+ @VariableMgr.VarAttr(name = PARTITION_ANALYZE_BATCH_SIZE, flag =
VariableMgr.GLOBAL,
+ description = {
+ "批量收集分区信息的分区数",
+ "Number of partitions to collect in one batch."})
+ public static int partitionAnalyzeBatchSize = 10;
+
// Don't allow creating instance.
private GlobalVariable() {
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 309611c6ab4..b4125810c24 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -520,7 +520,6 @@ public class SessionVariable implements Serializable,
Writable {
public static final String HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES =
"huge_table_lower_bound_size_in_bytes";
public static final String HUGE_PARTITION_LOWER_BOUND_ROWS =
"huge_partition_lower_bound_rows";
-
// for spill to disk
public static final String EXTERNAL_SORT_BYTES_THRESHOLD =
"external_sort_bytes_threshold";
public static final String EXTERNAL_AGG_BYTES_THRESHOLD =
"external_agg_bytes_threshold";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 5091ce53a20..920d528317d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -61,7 +61,6 @@ public abstract class BaseAnalysisTask {
public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB
public static final double LIMIT_FACTOR = 1.2;
- public static final int PARTITION_BATCH_SIZE = 100;
protected static final String FULL_ANALYZE_TEMPLATE =
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
@@ -367,6 +366,7 @@ public abstract class BaseAnalysisTask {
List<String> sqls = Lists.newArrayList();
Set<String> partNames = Sets.newHashSet();
int count = 0;
+ long batchRowCount = 0;
AnalysisManager analysisManager =
Env.getServingEnv().getAnalysisManager();
TableStatsMeta tableStatsStatus =
analysisManager.findTableStatsStatus(tbl.getId());
String idxName = info.indexId == -1 ? tbl.getName() : ((OlapTable)
tbl).getIndexNameById(info.indexId);
@@ -374,22 +374,24 @@ public abstract class BaseAnalysisTask {
? null : tableStatsStatus.findColumnStatsMeta(idxName,
col.getName());
boolean hasHughPartition = false;
long hugePartitionThreshold =
StatisticsUtil.getHugePartitionLowerBoundRows();
+ int partitionBatchSize = StatisticsUtil.getPartitionAnalyzeBatchSize();
// Find jobInfo for this task.
AnalysisInfo jobInfo =
analysisManager.findJobInfo(job.getJobInfo().jobId);
// For sync job, get jobInfo from job.jobInfo.
boolean isSync = jobInfo == null;
jobInfo = isSync ? job.jobInfo : jobInfo;
- StatisticsCache cache = Env.getCurrentEnv().getStatisticsCache();
for (String part : partitionNames) {
// External table partition is null.
Partition partition = tbl.getPartition(part);
if (partition != null) {
// For huge partition, skip analyze it.
- if (partition.getBaseIndex().getRowCount() >
hugePartitionThreshold) {
+ long partitionRowCount =
partition.getBaseIndex().getRowCount();
+ if (partitionRowCount > hugePartitionThreshold) {
hasHughPartition = true;
LOG.info("Partition {} in table {} is too large, skip
it.", part, tbl.getName());
continue;
}
+ batchRowCount += partitionRowCount;
// For cluster upgrade compatible (older version metadata
doesn't have partition update rows map)
// and insert before first analyze, set partition update rows
to 0.
jobInfo.partitionUpdateRows.putIfAbsent(partition.getId(), 0L);
@@ -413,12 +415,9 @@ public abstract class BaseAnalysisTask {
params.put("partitionInfo", getPartitionInfo(part));
StringSubstitutor stringSubstitutor = new
StringSubstitutor(params);
sqls.add(stringSubstitutor.replace(PARTITION_ANALYZE_TEMPLATE));
- // TODO: invalidate remote FE's cache.
- cache.invalidatePartitionColumnStatsCache(
- info.catalogId, info.dbId, info.tblId, info.indexId, part,
col.getName());
count++;
partNames.add(part);
- if (count == PARTITION_BATCH_SIZE) {
+ if (count == partitionBatchSize || batchRowCount >
hugePartitionThreshold) {
String sql = "INSERT INTO " +
StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+ Joiner.on(" UNION ALL ").join(sqls);
runInsert(sql);
@@ -427,6 +426,7 @@ public abstract class BaseAnalysisTask {
partNames.clear();
sqls.clear();
count = 0;
+ batchRowCount = 0;
}
}
if (count > 0) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 646d0235b5c..f6041a6a767 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -63,6 +63,7 @@ import
org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
@@ -877,6 +878,10 @@ public class StatisticsUtil {
return StatisticConstants.HUGE_PARTITION_LOWER_BOUND_ROWS;
}
+ public static int getPartitionAnalyzeBatchSize() {
+ return GlobalVariable.partitionAnalyzeBatchSize;
+ }
+
public static long getHugeTableAutoAnalyzeIntervalInMillis() {
try {
return
findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]