This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 5ddabea2a93 [refactor](stats) refactor collection logic and opt some
config #26163 (#26858)
5ddabea2a93 is described below
commit 5ddabea2a936c3f37224872edb6b3216d70fa891
Author: AKIRA <[email protected]>
AuthorDate: Mon Nov 13 18:27:07 2023 +0900
[refactor](stats) refactor collection logic and opt some config #26163
(#26858)
picked from #26163
---
.../main/java/org/apache/doris/common/Config.java | 34 +--
.../java/org/apache/doris/catalog/OlapTable.java | 7 +-
.../doris/nereids/stats/StatsCalculator.java | 5 +
.../java/org/apache/doris/qe/SessionVariable.java | 70 ++++-
.../org/apache/doris/statistics/AnalysisJob.java | 193 ++++++++++++++
.../apache/doris/statistics/AnalysisManager.java | 90 ++-----
.../doris/statistics/AnalysisTaskExecutor.java | 28 +-
.../doris/statistics/AnalysisTaskWrapper.java | 16 +-
.../apache/doris/statistics/BaseAnalysisTask.java | 110 +++-----
.../org/apache/doris/statistics/ColStatsData.java | 14 +
.../apache/doris/statistics/HMSAnalysisTask.java | 135 +---------
.../apache/doris/statistics/JdbcAnalysisTask.java | 34 +--
.../apache/doris/statistics/MVAnalysisTask.java | 152 -----------
.../apache/doris/statistics/OlapAnalysisTask.java | 138 +---------
.../doris/statistics/StatisticConstants.java | 14 +-
.../doris/statistics/StatisticsAutoCollector.java | 9 +-
.../doris/statistics/StatisticsCollector.java | 11 +-
.../statistics/StatisticsPeriodCollector.java | 50 ----
.../java/org/apache/doris/statistics/StatsId.java | 15 +-
.../doris/statistics/util/StatisticsUtil.java | 81 +++++-
.../apache/doris/statistics/AnalysisJobTest.java | 233 ++++++++++-------
.../doris/statistics/AnalysisManagerTest.java | 37 ++-
.../doris/statistics/AnalysisTaskExecutorTest.java | 16 +-
.../{AnalysisJobTest.java => AnalyzeTest.java} | 8 +-
.../org/apache/doris/statistics/CacheTest.java | 32 +++
.../doris/statistics/OlapAnalysisTaskTest.java | 74 ++++--
.../statistics/StatisticsAutoCollectorTest.java | 289 +++++++++++++++++++++
.../doris/statistics/util/StatisticsUtilTest.java | 46 +++-
.../suites/statistics/analyze_stats.groovy | 148 ++++++++---
.../suites/statistics/test_agg_complex_type.groovy | 53 ++++
30 files changed, 1265 insertions(+), 877 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a361cd7291a..142616f2a0a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -19,8 +19,6 @@ package org.apache.doris.common;
import org.apache.doris.common.ExperimentalUtil.ExperimentalType;
-import java.util.concurrent.TimeUnit;
-
public class Config extends ConfigBase {
@ConfField(description = {"用户自定义配置文件的路径,用于存放 fe_custom.conf。该文件中的配置会覆盖
fe.conf 中的配置",
@@ -1745,7 +1743,7 @@ public class Config extends ConfigBase {
* Used to determined how many statistics collection SQL could run
simultaneously.
*/
@ConfField
- public static int statistics_simultaneously_running_task_num = 10;
+ public static int statistics_simultaneously_running_task_num = 3;
/**
* if table has too many replicas, Fe occur oom when schema change.
@@ -2046,7 +2044,7 @@ public class Config extends ConfigBase {
* FE OOM.
*/
@ConfField
- public static long stats_cache_size = 10_0000;
+ public static long stats_cache_size = 50_0000;
/**
* This configuration is used to enable the statistics of query
information, which will record
@@ -2069,9 +2067,6 @@ public class Config extends ConfigBase {
"Whether to enable binlog feature"})
public static boolean enable_feature_binlog = false;
- @ConfField
- public static int analyze_task_timeout_in_hours = 12;
-
@ConfField(mutable = true, masterOnly = true, description = {
"是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
"Whether to disable creating catalog with WITH RESOURCE
statement."})
@@ -2126,9 +2121,6 @@ public class Config extends ConfigBase {
@ConfField
public static boolean forbid_running_alter_job = false;
- @ConfField
- public static int table_stats_health_threshold = 80;
-
@ConfField(description = {
"暂时性配置项,开启后会自动将所有的olap表修改为可light schema change",
"temporary config filed, will make all olap tables enable light
schema change"
@@ -2154,28 +2146,6 @@ public class Config extends ConfigBase {
+ "but it will increase the memory overhead."})
public static int virtual_node_number = 2048;
- @ConfField(description = {"控制对大表的自动ANALYZE的最小时间间隔,"
- + "在该时间间隔内大小超过huge_table_lower_bound_size_in_bytes的表仅ANALYZE一次",
- "This controls the minimum time interval for automatic ANALYZE on
large tables. Within this interval,"
- + "tables larger than huge_table_lower_bound_size_in_bytes
are analyzed only once."})
- public static long huge_table_auto_analyze_interval_in_millis =
TimeUnit.HOURS.toMillis(12);
-
- @ConfField(description = {"定义大表的大小下界,在开启enable_auto_sample的情况下,"
- + "大小超过该值的表将会自动通过采样收集统计信息", "This defines the lower size bound for
large tables. "
- + "When enable_auto_sample is enabled, tables larger than this
value will automatically collect "
- + "statistics through sampling"})
- public static long huge_table_lower_bound_size_in_bytes = 5L * 1024 * 1024
* 1024;
-
- @ConfField(description = {"定义开启开启大表自动sample后,对大表的采样比例",
- "This defines the number of sample percent for large tables when
automatic sampling for"
- + "large tables is enabled"})
- public static int huge_table_default_sample_rows = 4194304;
-
- @ConfField(description =
{"是否开启大表自动sample,开启后对于大小超过huge_table_lower_bound_size_in_bytes会自动通过采样收集"
- + "统计信息", "Whether to enable automatic sampling for large tables,
which, when enabled, automatically"
- + "collects statistics through sampling for tables larger than
'huge_table_lower_bound_size_in_bytes'"})
- public static boolean enable_auto_sample = false;
-
@ConfField(description = {
"控制统计信息的自动触发作业执行记录的持久化行数",
"Determine the persist number of automatic triggered analyze job
execution status"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 0f6ffc3cf6b..576fda217e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -53,7 +53,6 @@ import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.HistogramTask;
-import org.apache.doris.statistics.MVAnalysisTask;
import org.apache.doris.statistics.OlapAnalysisTask;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
@@ -1102,11 +1101,9 @@ public class OlapTable extends Table {
public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
if (info.analysisType.equals(AnalysisType.HISTOGRAM)) {
return new HistogramTask(info);
- }
- if (info.analysisType.equals(AnalysisType.FUNDAMENTALS)) {
+ } else {
return new OlapAnalysisTask(info);
}
- return new MVAnalysisTask(info);
}
public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
@@ -1126,7 +1123,7 @@ public class OlapTable extends Table {
}
long updateRows = tblStats.updatedRows.get();
int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows);
- return tblHealth < Config.table_stats_health_threshold;
+ return tblHealth < StatisticsUtil.getTableStatsHealthThreshold();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
index bf45e128d8c..4f626948407 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
@@ -571,10 +571,15 @@ public class StatsCalculator extends
DefaultPlanVisitor<Statistics, Void> {
}
private ColumnStatistic getColumnStatistic(TableIf table, String colName) {
+ ConnectContext connectContext = ConnectContext.get();
+ if (connectContext != null &&
connectContext.getSessionVariable().internalSession) {
+ return ColumnStatistic.UNKNOWN;
+ }
if (totalColumnStatisticMap.get(table.getName() + colName) != null) {
return totalColumnStatisticMap.get(table.getName() + colName);
} else if (isPlayNereidsDump) {
return ColumnStatistic.UNKNOWN;
+
} else {
long catalogId;
long dbId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index fb02d9cc7cd..7f165a8ec13 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -57,6 +57,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* System variable.
@@ -412,6 +413,19 @@ public class SessionVariable implements Serializable,
Writable {
public static final String FASTER_FLOAT_CONVERT = "faster_float_convert";
+ public static final String ENABLE_DECIMAL256 = "enable_decimal256";
+
+ public static final String STATS_INSERT_MERGE_ITEM_COUNT =
"stats_insert_merge_item_count";
+
+ public static final String HUGE_TABLE_DEFAULT_SAMPLE_ROWS =
"huge_table_default_sample_rows";
+ public static final String HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES =
"huge_table_lower_bound_size_in_bytes";
+
+ public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
+ = "huge_table_auto_analyze_interval_in_millis";
+
+ public static final String TABLE_STATS_HEALTH_THRESHOLD
+ = "table_stats_health_threshold";
+
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@@ -465,7 +479,7 @@ public class SessionVariable implements Serializable,
Writable {
public int queryTimeoutS = 900;
// query timeout in second.
- @VariableMgr.VarAttr(name = ANALYZE_TIMEOUT, needForward = true)
+ @VariableMgr.VarAttr(name = ANALYZE_TIMEOUT, flag = VariableMgr.GLOBAL,
needForward = true)
public int analyzeTimeoutS = 43200;
// The global max_execution_time value provides the default for the
session value for new connections.
@@ -1156,6 +1170,12 @@ public class SessionVariable implements Serializable,
Writable {
+ " use a skiplist to optimize the intersection."})
public int invertedIndexConjunctionOptThreshold = 1000;
+ @VariableMgr.VarAttr(name = FULL_AUTO_ANALYZE_END_TIME, needForward =
true, checker = "checkAnalyzeTimeFormat",
+ description = {"该参数定义自动ANALYZE例程的结束时间",
+ "This parameter defines the end time for the automatic
ANALYZE routine."},
+ flag = VariableMgr.GLOBAL)
+ public String fullAutoAnalyzeEndTime = "23:59:59";
+
@VariableMgr.VarAttr(name = ENABLE_UNIQUE_KEY_PARTIAL_UPDATE, needForward
= true)
public boolean enableUniqueKeyPartialUpdate = false;
@@ -1177,12 +1197,6 @@ public class SessionVariable implements Serializable,
Writable {
flag = VariableMgr.GLOBAL)
public String fullAutoAnalyzeStartTime = "00:00:00";
- @VariableMgr.VarAttr(name = FULL_AUTO_ANALYZE_END_TIME, needForward =
true, checker = "checkAnalyzeTimeFormat",
- description = {"该参数定义自动ANALYZE例程的结束时间",
- "This parameter defines the end time for the automatic
ANALYZE routine."},
- flag = VariableMgr.GLOBAL)
- public String fullAutoAnalyzeEndTime = "02:00:00";
-
@VariableMgr.VarAttr(name = FASTER_FLOAT_CONVERT,
description = {"是否启用更快的浮点数转换算法,注意会影响输出格式", "Set true to enable
faster float pointer number convert"})
public boolean fasterFloatConvert = false;
@@ -1192,6 +1206,48 @@ public class SessionVariable implements Serializable,
Writable {
"the runtime filter id in IGNORE_RUNTIME_FILTER_IDS list
will not be generated"})
public String ignoreRuntimeFilterIds = "";
+
+ @VariableMgr.VarAttr(name = STATS_INSERT_MERGE_ITEM_COUNT, flag =
VariableMgr.GLOBAL, description = {
+ "控制统计信息相关INSERT攒批数量", "Controls the batch size for stats INSERT
merging."
+ }
+ )
+ public int statsInsertMergeItemCount = 200;
+
+ @VariableMgr.VarAttr(name = HUGE_TABLE_DEFAULT_SAMPLE_ROWS, flag =
VariableMgr.GLOBAL, description = {
+ "定义开启开启大表自动sample后,对大表的采样比例",
+ "This defines the number of sample percent for large tables when
automatic sampling for"
+ + "large tables is enabled"
+
+ })
+ public long hugeTableDefaultSampleRows = 4194304;
+
+
+ @VariableMgr.VarAttr(name = HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES, flag =
VariableMgr.GLOBAL,
+ description = {
+ "大小超过该值的表将会自动通过采样收集统计信息",
+ "This defines the lower size bound for large tables. "
+ + "When enable_auto_sample is enabled, tables"
+ + "larger than this value will automatically
collect "
+ + "statistics through sampling"})
+ public long hugeTableLowerBoundSizeInBytes = 5L * 1024 * 1024 * 1024;
+
+ @VariableMgr.VarAttr(name = HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS,
flag = VariableMgr.GLOBAL,
+ description = {"控制对大表的自动ANALYZE的最小时间间隔,"
+ +
"在该时间间隔内大小超过huge_table_lower_bound_size_in_bytes的表仅ANALYZE一次",
+ "This controls the minimum time interval for automatic
ANALYZE on large tables."
+ + "Within this interval,"
+ + "tables larger than
huge_table_lower_bound_size_in_bytes are analyzed only once."})
+ public long hugeTableAutoAnalyzeIntervalInMillis =
TimeUnit.HOURS.toMillis(12);
+
+ @VariableMgr.VarAttr(name = TABLE_STATS_HEALTH_THRESHOLD, flag =
VariableMgr.GLOBAL,
+ description = {"取值在0-100之间,当自上次统计信息收集操作之后"
+ + "数据更新量达到 (100 - table_stats_health_threshold)%
,认为该表的统计信息已过时",
+ "The value should be between 0 and 100. When the data
update quantity "
+ + "exceeds (100 - table_stats_health_threshold)%
since the last "
+ + "statistics collection operation, the statistics
for this table are"
+ + "considered outdated."})
+ public int tableStatsHealthThreshold = 60;
+
public static final String IGNORE_RUNTIME_FILTER_IDS =
"ignore_runtime_filter_ids";
public Set<Integer> getIgnoredRuntimeFilterIds() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
new file mode 100644
index 00000000000..904dc21e337
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.qe.AuditLogHelper;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.StringJoiner;
+
+public class AnalysisJob {
+
+ public static final Logger LOG = LogManager.getLogger(AnalysisJob.class);
+
+ protected Set<BaseAnalysisTask> queryingTask;
+
+ protected Set<BaseAnalysisTask> queryFinished;
+
+ protected List<ColStatsData> buf;
+
+ protected int totalTaskCount;
+
+ protected int queryFinishedTaskCount;
+
+ protected StmtExecutor stmtExecutor;
+
+ protected boolean killed;
+
+ protected long start;
+
+ protected AnalysisInfo jobInfo;
+
+ protected AnalysisManager analysisManager;
+
+ public AnalysisJob(AnalysisInfo jobInfo, Collection<? extends
BaseAnalysisTask> queryingTask) {
+ for (BaseAnalysisTask task : queryingTask) {
+ task.job = this;
+ }
+ this.queryingTask = new HashSet<>(queryingTask);
+ this.queryFinished = new HashSet<>();
+ this.buf = new ArrayList<>();
+ totalTaskCount = queryingTask.size();
+ start = System.currentTimeMillis();
+ this.jobInfo = jobInfo;
+ this.analysisManager = Env.getCurrentEnv().getAnalysisManager();
+ }
+
+ public synchronized void appendBuf(BaseAnalysisTask task,
List<ColStatsData> statsData) {
+ queryingTask.remove(task);
+ buf.addAll(statsData);
+ queryFinished.add(task);
+ queryFinishedTaskCount += 1;
+ if (queryFinishedTaskCount == totalTaskCount) {
+ writeBuf();
+ updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+ + (System.currentTimeMillis() - start) / 1000);
+ deregisterJob();
+ } else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) {
+ writeBuf();
+ }
+ }
+
+ // CHECKSTYLE OFF
+ // fallthrough here is expected
+ public void updateTaskState(AnalysisState state, String msg) {
+ long time = System.currentTimeMillis();
+ switch (state) {
+ case FAILED:
+ for (BaseAnalysisTask task : queryingTask) {
+ analysisManager.updateTaskStatus(task.info, state, msg,
time);
+ task.cancel();
+ }
+ killed = true;
+ case FINISHED:
+ for (BaseAnalysisTask task : queryFinished) {
+ analysisManager.updateTaskStatus(task.info, state, msg,
time);
+ }
+ default:
+ // DO NOTHING
+ }
+ }
+
+ protected void writeBuf() {
+ if (killed) {
+ return;
+ }
+ // buf could be empty when nothing need to do, for example user submit
an analysis task for table with no data
+ // change
+ if (!buf.isEmpty()) {
+ String insertStmt = "INSERT INTO " +
StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME + " VALUES ";
+ StringJoiner values = new StringJoiner(",");
+ for (ColStatsData data : buf) {
+ values.add(data.toSQL(true));
+ }
+ insertStmt += values.toString();
+ int retryTimes = 0;
+ while (retryTimes < StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
+ if (killed) {
+ return;
+ }
+ try (AutoCloseConnectContext r =
StatisticsUtil.buildConnectContext(false)) {
+ stmtExecutor = new StmtExecutor(r.connectContext,
insertStmt);
+ executeWithExceptionOnFail(stmtExecutor);
+ break;
+ } catch (Exception t) {
+ LOG.warn("Failed to write buf: " + insertStmt, t);
+ retryTimes++;
+ if (retryTimes >=
StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
+ updateTaskState(AnalysisState.FAILED, t.getMessage());
+ return;
+ }
+ }
+ }
+ }
+ updateTaskState(AnalysisState.FINISHED, "");
+ syncLoadStats();
+ queryFinished.clear();
+ }
+
+ protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor)
throws Exception {
+ if (killed) {
+ return;
+ }
+ LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt());
+ try {
+ stmtExecutor.execute();
+ QueryState queryState = stmtExecutor.getContext().getState();
+ if (queryState.getStateType().equals(MysqlStateType.ERR)) {
+ throw new RuntimeException(
+ "Failed to insert : " +
stmtExecutor.getOriginStmt().originStmt + "Error msg: "
+ + queryState.getErrorMessage());
+ }
+ } finally {
+ AuditLogHelper.logAuditLog(stmtExecutor.getContext(),
stmtExecutor.getOriginStmt().toString(),
+ stmtExecutor.getParsedStmt(),
stmtExecutor.getQueryStatisticsForAuditLog(),
+ true);
+ }
+ }
+
+ public void taskFailed(BaseAnalysisTask task, String reason) {
+ updateTaskState(AnalysisState.FAILED, reason);
+ cancel();
+ deregisterJob();
+ }
+
+ public void cancel() {
+ for (BaseAnalysisTask task : queryingTask) {
+ task.cancel();
+ }
+ }
+
+ public void deregisterJob() {
+ analysisManager.removeJob(jobInfo.jobId);
+ }
+
+ protected void syncLoadStats() {
+ long tblId = jobInfo.tblId;
+ for (BaseAnalysisTask task : queryFinished) {
+ String colName = task.col.getName();
+ if
(!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1,
colName)) {
+ analysisManager.removeColStatsStatus(tblId, colName);
+ }
+ }
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index b477a23680e..83441f4b2dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -42,7 +42,6 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
-import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -101,7 +100,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-public class AnalysisManager extends Daemon implements Writable {
+public class AnalysisManager implements Writable {
private static final Logger LOG =
LogManager.getLogger(AnalysisManager.class);
@@ -113,11 +112,11 @@ public class AnalysisManager extends Daemon implements
Writable {
private AnalysisTaskExecutor taskExecutor;
// Store task information in metadata.
- private final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
+ protected final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
Collections.synchronizedNavigableMap(new TreeMap<>());
// Store job information in metadata.
- private final NavigableMap<Long, AnalysisInfo> analysisJobInfoMap =
+ protected final NavigableMap<Long, AnalysisInfo> analysisJobInfoMap =
Collections.synchronizedNavigableMap(new TreeMap<>());
// Tracking system submitted job, keep in mem only
@@ -128,6 +127,8 @@ public class AnalysisManager extends Daemon implements
Writable {
private final Map<Long, TableStatsMeta> idToTblStats = new
ConcurrentHashMap<>();
+ private final Map<Long, AnalysisJob> idToAnalysisJob = new
ConcurrentHashMap<>();
+
protected SimpleQueue<AnalysisInfo> autoJobs = createSimpleQueue(null,
this);
private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w
-> {
@@ -237,7 +238,6 @@ public class AnalysisManager extends Daemon implements
Writable {
new Function[] {userJobStatusUpdater, systemJobStatusUpdater};
public AnalysisManager() {
-
super(TimeUnit.SECONDS.toMillis(StatisticConstants.ANALYZE_MANAGER_INTERVAL_IN_SECS));
if (!Env.isCheckpointThread()) {
this.taskExecutor = new
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num);
this.statisticsCache = new StatisticsCache();
@@ -245,44 +245,6 @@ public class AnalysisManager extends Daemon implements
Writable {
}
}
- @Override
- protected void runOneCycle() {
- clear();
- }
-
- private void clear() {
- clearExpiredAnalysisInfo(analysisJobInfoMap, (a) ->
- a.scheduleType.equals(ScheduleType.ONCE)
- && System.currentTimeMillis() -
a.lastExecTimeInMs
- >
TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
- (id) -> {
- Env.getCurrentEnv().getEditLog().logDeleteAnalysisJob(new
AnalyzeDeletionLog(id));
- return null;
- });
- clearExpiredAnalysisInfo(analysisTaskInfoMap, (a) ->
System.currentTimeMillis() - a.lastExecTimeInMs
- >
TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
- (id) -> {
- Env.getCurrentEnv().getEditLog().logDeleteAnalysisTask(new
AnalyzeDeletionLog(id));
- return null;
- });
- }
-
- private void clearExpiredAnalysisInfo(Map<Long, AnalysisInfo> infoMap,
Predicate<AnalysisInfo> isExpired,
- Function<Long, Void> writeLog) {
- synchronized (infoMap) {
- List<Long> expired = new ArrayList<>();
- for (Entry<Long, AnalysisInfo> entry : infoMap.entrySet()) {
- if (isExpired.test(entry.getValue())) {
- expired.add(entry.getKey());
- }
- }
- for (Long k : expired) {
- infoMap.remove(k);
- writeLog.apply(k);
- }
- }
- }
-
public StatisticsCache getStatisticsCache() {
return statisticsCache;
}
@@ -371,6 +333,7 @@ public class AnalysisManager extends Daemon implements
Writable {
boolean isSync = stmt.isSync();
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
+ constructJob(jobInfo, analysisTaskInfos.values());
if (!jobInfo.partitionOnly && stmt.isAllColumns()
&& StatisticsUtil.isExternalTable(jobInfo.catalogId,
jobInfo.dbId, jobInfo.tblId)) {
createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos,
isSync);
@@ -446,7 +409,6 @@ public class AnalysisManager extends Daemon implements
Writable {
*/
private Map<String, Set<String>> validateAndGetPartitions(TableIf table,
Set<String> columnNames,
Set<String> partitionNames, AnalysisType analysisType) throws
DdlException {
- long tableId = table.getId();
Map<String, Set<String>> columnToPartitions = columnNames.stream()
.collect(Collectors.toMap(
@@ -467,27 +429,6 @@ public class AnalysisManager extends Daemon implements
Writable {
return columnToPartitions;
}
- // Get the partition granularity statistics that have been collected
- Map<String, Set<String>> existColAndPartsForStats =
StatisticsRepository
- .fetchColAndPartsForStats(tableId);
-
- if (existColAndPartsForStats.isEmpty()) {
- // There is no historical statistical information, no need to do
validation
- return columnToPartitions;
- }
-
- Set<String> existPartIdsForStats = new HashSet<>();
-
existColAndPartsForStats.values().forEach(existPartIdsForStats::addAll);
- Set<String> idToPartition = StatisticsUtil.getPartitionIds(table);
- // Get an invalid set of partitions (those partitions were deleted)
- Set<String> invalidPartIds = existPartIdsForStats.stream()
- .filter(id ->
!idToPartition.contains(id)).collect(Collectors.toSet());
-
- if (!invalidPartIds.isEmpty()) {
- // Delete invalid partition statistics to avoid affecting table
statistics
- StatisticsRepository.dropStatistics(invalidPartIds);
- }
-
if (analysisType == AnalysisType.FUNDAMENTALS) {
Map<String, Set<String>> result =
table.findReAnalyzeNeededPartitions();
result.keySet().retainAll(columnNames);
@@ -720,11 +661,12 @@ public class AnalysisManager extends Daemon implements
Writable {
public void syncExecute(Collection<BaseAnalysisTask> tasks) {
SyncTaskCollection syncTaskCollection = new SyncTaskCollection(tasks);
ConnectContext ctx = ConnectContext.get();
+ ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze();
try {
ctxToSyncTask.put(ctx, syncTaskCollection);
- ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze();
syncTaskCollection.execute(syncExecPool);
} finally {
+ syncExecPool.shutdown();
ctxToSyncTask.remove(ctx);
}
}
@@ -737,7 +679,7 @@ public class AnalysisManager extends Daemon implements
Writable {
new SynchronousQueue(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC
ANALYZE" + "-%d")
.build(), new BlockedPolicy(poolName,
- (int)
TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours)));
+ StatisticsUtil.getAnalyzeTimeout()));
}
public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
@@ -759,6 +701,7 @@ public class AnalysisManager extends Daemon implements
Writable {
for (String col : cols) {
Env.getCurrentEnv().getStatisticsCache().invalidate(tblId,
-1L, col);
}
+ tableStats.updatedTime = 0;
}
logCreateTableStats(tableStats);
StatisticsRepository.dropStatistics(tblId, cols);
@@ -1128,4 +1071,17 @@ public class AnalysisManager extends Daemon implements
Writable {
}
return tableStats.findColumnStatsMeta(colName);
}
+
+ public AnalysisJob findJob(long id) {
+ return idToAnalysisJob.get(id);
+ }
+
+ public void constructJob(AnalysisInfo jobInfo, Collection<? extends
BaseAnalysisTask> tasks) {
+ AnalysisJob job = new AnalysisJob(jobInfo, tasks);
+ idToAnalysisJob.put(jobInfo.jobId, job);
+ }
+
+ public void removeJob(long id) {
+ idToAnalysisJob.remove(id);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 4b133ce0ebf..58bae9fe66b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -18,9 +18,9 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
-import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
+import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -36,7 +36,7 @@ public class AnalysisTaskExecutor extends Thread {
private static final Logger LOG =
LogManager.getLogger(AnalysisTaskExecutor.class);
- private final ThreadPoolExecutor executors;
+ protected final ThreadPoolExecutor executors;
private final BlockingQueue<AnalysisTaskWrapper> taskQueue =
new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
@@ -72,18 +72,22 @@ public class AnalysisTaskExecutor extends Thread {
private void doCancelExpiredJob() {
for (;;) {
+ tryToCancel();
+ }
+ }
+
+ protected void tryToCancel() {
+ try {
+ AnalysisTaskWrapper taskWrapper = taskQueue.take();
try {
- AnalysisTaskWrapper taskWrapper = taskQueue.take();
- try {
- long timeout =
TimeUnit.HOURS.toMillis(Config.analyze_task_timeout_in_hours)
- - (System.currentTimeMillis() -
taskWrapper.getStartTime());
- taskWrapper.get(timeout < 0 ? 0 : timeout,
TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- taskWrapper.cancel(e.getMessage());
- }
- } catch (Throwable throwable) {
- LOG.warn(throwable);
+ long timeout =
TimeUnit.SECONDS.toMillis(StatisticsUtil.getAnalyzeTimeout())
+ - (System.currentTimeMillis() -
taskWrapper.getStartTime());
+ taskWrapper.get(timeout < 0 ? 0 : timeout,
TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ taskWrapper.cancel(e.getMessage());
}
+ } catch (Throwable throwable) {
+ LOG.warn("cancel analysis task failed", throwable);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
index 9aa3d85992b..ffdd375ee9e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
@@ -17,7 +17,6 @@
package org.apache.doris.statistics;
-import org.apache.doris.catalog.Env;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
@@ -59,9 +58,8 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
if (task.info.scheduleType.equals(ScheduleType.AUTOMATIC) &&
!StatisticsUtil.inAnalyzeTime(
LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
// TODO: Do we need a separate AnalysisState here?
- Env.getCurrentEnv().getAnalysisManager()
- .updateTaskStatus(task.info, AnalysisState.FAILED,
"Auto task"
- + "doesn't get executed within specified time
range", System.currentTimeMillis());
+ task.job.taskFailed(task, "Auto task"
+ + "doesn't get executed within specified time
range");
return;
}
executor.putJob(this);
@@ -76,15 +74,7 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
if (!task.killed) {
if (except != null) {
LOG.warn("Analyze {} failed.", task.toString(), except);
- Env.getCurrentEnv().getAnalysisManager()
- .updateTaskStatus(task.info,
- AnalysisState.FAILED,
Util.getRootCauseMessage(except), System.currentTimeMillis());
- } else {
- LOG.debug("Analyze {} finished, cost time:{}",
task.toString(),
- System.currentTimeMillis() - startTime);
- Env.getCurrentEnv().getAnalysisManager()
- .updateTaskStatus(task.info,
- AnalysisState.FINISHED, "",
System.currentTimeMillis());
+ task.job.taskFailed(task,
Util.getRootCauseMessage(except));
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 4f7d588de73..3fcebd6c38b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -22,14 +22,12 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.Config;
import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.qe.AuditLogHelper;
-import org.apache.doris.qe.QueryState;
-import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
+import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.DBObjects;
import org.apache.doris.statistics.util.StatisticsUtil;
@@ -38,6 +36,7 @@ import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.Collections;
import java.util.concurrent.TimeUnit;
public abstract class BaseAnalysisTask {
@@ -52,59 +51,25 @@ public abstract class BaseAnalysisTask {
+ "else NDV(`${colName}`) * ${scaleFactor} end AS ndv, "
;
- /**
- * Stats stored in the column_statistics table basically has two types,
`part_id` is null which means it is
- * aggregate from partition level stats, `part_id` is not null which means
it is partition level stats.
- * For latter, it's id field contains part id, for previous doesn't.
- */
- protected static final String INSERT_PART_STATISTICS = "INSERT INTO "
- + "${internalDB}.${columnStatTbl}"
- + " SELECT "
- + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-',
${partId}) AS id, "
- + "${catalogId} AS catalog_id, "
- + "${dbId} AS db_id, "
- + "${tblId} AS tbl_id, "
- + "${idxId} AS idx_id, "
- + "'${colId}' AS col_id, "
- + "${partId} AS part_id, "
- + "COUNT(1) AS row_count, "
- + "NDV(`${colName}`) AS ndv, "
- + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS
null_count, "
- + "MIN(`${colName}`) AS min, "
- + "MAX(`${colName}`) AS max, "
- + "${dataSizeFunction} AS data_size, "
- + "NOW() ";
-
- protected static final String INSERT_COL_STATISTICS = "INSERT INTO "
- + "${internalDB}.${columnStatTbl}"
- + " SELECT id, catalog_id, db_id, tbl_id, idx_id, col_id,
part_id, row_count, "
- + " ndv, null_count,"
- + " to_base64(CAST(min AS string)), to_base64(CAST(max AS
string)), data_size, update_time\n"
- + " FROM \n"
- + " (SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}')
AS id, "
+ protected static final String COLLECT_COL_STATISTICS =
+ "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ " ${catalogId} AS catalog_id, "
+ " ${dbId} AS db_id, "
+ " ${tblId} AS tbl_id, "
+ " ${idxId} AS idx_id, "
+ " '${colId}' AS col_id, "
+ " NULL AS part_id, "
- + " SUM(count) AS row_count, \n"
- + " SUM(null_count) AS null_count, "
- + " MIN(CAST(from_base64(min) AS ${type})) AS min, "
- + " MAX(CAST(from_base64(max) AS ${type})) AS max, "
- + " SUM(data_size_in_bytes) AS data_size, "
- + " NOW() AS update_time \n"
- + " FROM ${internalDB}.${columnStatTbl}"
- + " WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND
"
- + " ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND "
- + " ${internalDB}.${columnStatTbl}.col_id='${colId}' AND "
- + " ${internalDB}.${columnStatTbl}.idx_id='${idxId}' AND "
- + " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL"
- + " ) t1, \n";
-
- protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = "INSERT
INTO "
- + "${internalDB}.${columnStatTbl}"
- + " SELECT "
+ + " COUNT(1) AS row_count, "
+ + " NDV(`${colName}`) AS ndv, "
+ + " COUNT(1) - COUNT(${colName}) AS null_count, "
+ + " CAST(MIN(${colName}) AS STRING) AS min, "
+ + " CAST(MAX(${colName}) AS STRING) AS max, "
+ + " ${dataSizeFunction} AS data_size, "
+ + " NOW() AS update_time "
+ + " FROM `${dbName}`.`${tblName}`";
+
+ protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE =
+ " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ "${catalogId} AS catalog_id, "
+ "${dbId} AS db_id, "
@@ -115,8 +80,8 @@ public abstract class BaseAnalysisTask {
+ "${row_count} AS row_count, "
+ "${ndv} AS ndv, "
+ "${null_count} AS null_count, "
- + "to_base64('${min}') AS min, "
- + "to_base64('${max}') AS max, "
+ + "'${min}' AS min, "
+ + "'${max}' AS max, "
+ "${data_size} AS data_size, "
+ "NOW() ";
@@ -136,6 +101,8 @@ public abstract class BaseAnalysisTask {
protected TableSample tableSample = null;
+ protected AnalysisJob job;
+
@VisibleForTesting
public BaseAnalysisTask() {
@@ -192,6 +159,7 @@ public abstract class BaseAnalysisTask {
}
LOG.warn("Failed to execute analysis task, retried times: {}",
retriedTimes++, t);
if (retriedTimes >
StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
+ job.taskFailed(this, t.getMessage());
throw new RuntimeException(t);
}
StatisticsUtil.sleep(TimeUnit.SECONDS.toMillis(2 ^
retriedTimes) * 10);
@@ -266,11 +234,10 @@ public abstract class BaseAnalysisTask {
return new TableSample(true, (long) info.samplePercent);
} else if (info.sampleRows > 0) {
return new TableSample(false, info.sampleRows);
- } else if (info.analysisMethod == AnalysisMethod.FULL
- && Config.enable_auto_sample
- && tbl.getDataSize(true) >
Config.huge_table_lower_bound_size_in_bytes) {
+ } else if (info.jobType.equals(JobType.SYSTEM) && info.analysisMethod
== AnalysisMethod.FULL
+ && tbl.getDataSize(true) >
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()) {
// If user doesn't specify sample percent/rows, use auto sample
and update sample rows in analysis info.
- return new TableSample(false, (long)
Config.huge_table_default_sample_rows);
+ return new TableSample(false,
StatisticsUtil.getHugeTableSampleRows());
} else {
return null;
}
@@ -283,23 +250,20 @@ public abstract class BaseAnalysisTask {
col == null ? "TableRowCount" : col.getName());
}
- protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor)
throws Exception {
- if (killed) {
- return;
- }
- LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt());
- try {
- stmtExecutor.execute();
- QueryState queryState = stmtExecutor.getContext().getState();
- if (queryState.getStateType().equals(MysqlStateType.ERR)) {
- throw new RuntimeException(String.format("Failed to analyze
%s.%s.%s, error: %s sql: %s",
- catalog.getName(), db.getFullName(), info.colName,
stmtExecutor.getOriginStmt().toString(),
- queryState.getErrorMessage()));
- }
+ public void setJob(AnalysisJob job) {
+ this.job = job;
+ }
+
+ protected void runQuery(String sql) {
+ long startTime = System.currentTimeMillis();
+ try (AutoCloseConnectContext a =
StatisticsUtil.buildConnectContext()) {
+ stmtExecutor = new StmtExecutor(a.connectContext, sql);
+ stmtExecutor.executeInternalQuery();
+ ColStatsData colStatsData = new
ColStatsData(stmtExecutor.executeInternalQuery().get(0));
+ job.appendBuf(this, Collections.singletonList(colStatsData));
} finally {
- AuditLogHelper.logAuditLog(stmtExecutor.getContext(),
stmtExecutor.getOriginStmt().toString(),
- stmtExecutor.getParsedStmt(),
stmtExecutor.getQueryStatisticsForAuditLog(),
- true);
+ LOG.debug("End cost time in secs: " + (System.currentTimeMillis()
- startTime) / 1000);
}
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java
index 6c94326a942..41936232afd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java
@@ -19,6 +19,8 @@ package org.apache.doris.statistics;
import org.apache.doris.statistics.util.StatisticsUtil;
+import com.google.common.annotations.VisibleForTesting;
+
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.StringJoiner;
@@ -54,6 +56,18 @@ public class ColStatsData {
public final String updateTime;
+ @VisibleForTesting
+ public ColStatsData() {
+ statsId = new StatsId();
+ count = 0;
+ ndv = 0;
+ nullCount = 0;
+ minLit = null;
+ maxLit = null;
+ dataSizeInBytes = 0;
+ updateTime = null;
+ }
+
public ColStatsData(ResultRow row) {
this.statsId = new StatsId(row);
this.count = (long) Double.parseDouble(row.get(7));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
index 4583237f8c6..049e80d52fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
@@ -23,26 +23,19 @@ import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.external.hive.util.HiveUtil;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.QueryState;
-import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.StringJoiner;
import java.util.stream.Collectors;
public class HMSAnalysisTask extends BaseAnalysisTask {
@@ -51,9 +44,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
// While doing sample analysis, the sampled ndv result will multiply a
factor (total size/sample size)
// if ndv(col)/count(col) is greater than this threshold.
- private static final String ANALYZE_TABLE_TEMPLATE = "INSERT INTO "
- + "${internalDB}.${columnStatTbl}"
- + " SELECT "
+ private static final String ANALYZE_TABLE_TEMPLATE = " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ "${catalogId} AS catalog_id, "
+ "${dbId} AS db_id, "
@@ -70,28 +61,9 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
+ "NOW() "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
- private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT "
- + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-',
${partId}) AS id, "
- + "${catalogId} AS catalog_id, "
- + "${dbId} AS db_id, "
- + "${tblId} AS tbl_id, "
- + "${idxId} AS idx_id, "
- + "'${colId}' AS col_id, "
- + "${partId} AS part_id, "
- + "COUNT(1) AS row_count, "
- + "NDV(`${colName}`) AS ndv, "
- + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS
null_count, "
- + "to_base64(MIN(`${colName}`)) AS min, "
- + "to_base64(MAX(`${colName}`)) AS max, "
- + "${dataSizeFunction} AS data_size, "
- + "NOW() FROM `${catalogName}`.`${dbName}`.`${tblName}` where ";
-
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT
ROUND(COUNT(1) * ${scaleFactor}) as rowCount "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
- // cache stats for each partition, it would be inserted into
column_statistics in a batch.
- private final List<List<ColStatsData>> buf = new ArrayList<>();
-
private final boolean isTableLevelTask;
private final boolean isPartitionOnly;
private Set<String> partitionNames;
@@ -131,25 +103,16 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
* Get column statistics and insert the result to
__internal_schema.column_statistics
*/
private void getTableColumnStats() throws Exception {
- if (isPartitionOnly) {
- getPartitionNames();
- List<String> partitionAnalysisSQLs = new ArrayList<>();
- for (String partId : this.partitionNames) {
- partitionAnalysisSQLs.add(generateSqlForPartition(partId));
- }
- execSQLs(partitionAnalysisSQLs);
- } else {
- if (!info.usingSqlForPartitionColumn && isPartitionColumn()) {
- try {
- getPartitionColumnStats();
- } catch (Exception e) {
- LOG.warn("Failed to collect stats for partition col {}
using metadata, "
- + "fallback to normal collection", col.getName(),
e);
- getOrdinaryColumnStats();
- }
- } else {
+ if (!info.usingSqlForPartitionColumn && isPartitionColumn()) {
+ try {
+ getPartitionColumnStats();
+ } catch (Exception e) {
+ LOG.warn("Failed to collect stats for partition col {} using
metadata, "
+ + "fallback to normal collection", col.getName(), e);
getOrdinaryColumnStats();
}
+ } else {
+ getOrdinaryColumnStats();
}
}
@@ -182,7 +145,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
params.put("maxFunction", getMaxFunction());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
- executeInsertSql(sql);
+ runQuery(sql);
}
private void getPartitionColumnStats() throws Exception {
@@ -227,7 +190,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
params.put("data_size", String.valueOf(dataSize));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql =
stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE);
- executeInsertSql(sql);
+ runQuery(sql);
}
private String updateMinValue(String currentMin, String value) {
@@ -278,7 +241,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
partitionNames = table.getPartitionNames();
} else if (info.partitionCount > 0) {
partitionNames = table.getPartitionNames().stream()
- .limit(info.partitionCount).collect(Collectors.toSet());
+
.limit(info.partitionCount).collect(Collectors.toSet());
}
if (partitionNames == null || partitionNames.isEmpty()) {
throw new RuntimeException("Not a partition table or no
partition specified.");
@@ -286,80 +249,6 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
}
}
- private String generateSqlForPartition(String partId) {
- StringBuilder sb = new StringBuilder();
- sb.append(ANALYZE_PARTITION_TEMPLATE);
- String[] splits = partId.split("/");
- for (int i = 0; i < splits.length; i++) {
- String[] kv = splits[i].split("=");
- sb.append(kv[0]);
- sb.append("='");
- sb.append(kv[1]);
- sb.append("'");
- if (i < splits.length - 1) {
- sb.append(" and ");
- }
- }
- Map<String, String> params = buildStatsParams(partId);
- params.put("dataSizeFunction", getDataSizeFunction(col));
- return new StringSubstitutor(params).replace(sb.toString());
- }
-
- public void execSQLs(List<String> partitionAnalysisSQLs) throws Exception {
- long startTime = System.currentTimeMillis();
- LOG.debug("analyze task {} start at {}", info.toString(), new Date());
- try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext())
{
- List<List<String>> sqlGroups =
Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT);
- for (List<String> group : sqlGroups) {
- if (killed) {
- return;
- }
- StringJoiner partitionCollectSQL = new StringJoiner("UNION
ALL");
- group.forEach(partitionCollectSQL::add);
- stmtExecutor = new StmtExecutor(r.connectContext,
partitionCollectSQL.toString());
- buf.add(stmtExecutor.executeInternalQuery()
-
.stream().map(ColStatsData::new).collect(Collectors.toList()));
- QueryState queryState = r.connectContext.getState();
- if
(queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
- throw new RuntimeException(String.format("Failed to
analyze %s.%s.%s, error: %s sql: %s",
- catalog.getName(), db.getFullName(), info.colName,
partitionCollectSQL,
- queryState.getErrorMessage()));
- }
- }
- for (List<ColStatsData> colStatsDataList : buf) {
- StringBuilder batchInsertSQL =
- new StringBuilder("INSERT INTO " +
StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME
- + " VALUES ");
- StringJoiner sj = new StringJoiner(",");
- colStatsDataList.forEach(c -> sj.add(c.toSQL(true)));
- batchInsertSQL.append(sj);
- stmtExecutor = new StmtExecutor(r.connectContext,
batchInsertSQL.toString());
- executeWithExceptionOnFail(stmtExecutor);
- }
- } finally {
- LOG.debug("analyze task {} end. cost {}ms", info,
System.currentTimeMillis() - startTime);
- }
-
- }
-
- private void executeInsertSql(String sql) throws Exception {
- long startTime = System.currentTimeMillis();
- try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext())
{
- r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
- this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
- r.connectContext.setExecutor(stmtExecutor);
- this.stmtExecutor.execute();
- QueryState queryState = r.connectContext.getState();
- if
(queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
- LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s],
error: [%s]",
- catalog.getName(), db.getFullName(), info.colName,
sql, queryState.getErrorMessage()));
- throw new RuntimeException(queryState.getErrorMessage());
- }
- LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d
ms.",
- catalog.getName(), db.getFullName(), info.colName, sql,
(System.currentTimeMillis() - startTime)));
- }
- }
-
private Map<String, String> buildStatsParams(String partId) {
Map<String, String> commonParams = new HashMap<>();
String id = StatisticsUtil.constructId(tbl.getId(), -1);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
index 5ae66d292dc..649b075c673 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java
@@ -20,25 +20,17 @@ package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.JdbcExternalTable;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.QueryState;
-import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.commons.text.StringSubstitutor;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class JdbcAnalysisTask extends BaseAnalysisTask {
- private static final Logger LOG =
LogManager.getLogger(JdbcAnalysisTask.class);
- private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
- + "${internalDB}.${columnStatTbl}"
- + " SELECT "
+ private static final String ANALYZE_SQL_TABLE_TEMPLATE = " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ "${catalogId} AS catalog_id, "
+ "${dbId} AS db_id, "
@@ -49,8 +41,8 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
+ "COUNT(1) AS row_count, "
+ "NDV(`${colName}`) AS ndv, "
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS
null_count, "
- + "to_base64(MIN(`${colName}`)) AS min, "
- + "to_base64(MAX(`${colName}`)) AS max, "
+ + "MIN(`${colName}`) AS min, "
+ + "MAX(`${colName}`) AS max, "
+ "${dataSizeFunction} AS data_size, "
+ "NOW() "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
@@ -117,25 +109,7 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
params.put("dataSizeFunction", getDataSizeFunction(col));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
- executeInsertSql(sql);
- }
-
- private void executeInsertSql(String sql) throws Exception {
- long startTime = System.currentTimeMillis();
- try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext())
{
- r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
- this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
- r.connectContext.setExecutor(stmtExecutor);
- this.stmtExecutor.execute();
- QueryState queryState = r.connectContext.getState();
- if
(queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
- LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s],
error: [%s]",
- catalog.getName(), db.getFullName(), info.colName,
sql, queryState.getErrorMessage()));
- throw new RuntimeException(queryState.getErrorMessage());
- }
- LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d
ms.",
- catalog.getName(), db.getFullName(), info.colName, sql,
(System.currentTimeMillis() - startTime)));
- }
+ runQuery(sql);
}
private Map<String, String> buildTableStatsParams(String partId) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java
deleted file mode 100644
index 6a43c5092fa..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java
+++ /dev/null
@@ -1,152 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.analysis.CreateMaterializedViewStmt;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.FunctionCallExpr;
-import org.apache.doris.analysis.PartitionNames;
-import org.apache.doris.analysis.SelectListItem;
-import org.apache.doris.analysis.SelectStmt;
-import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.analysis.SqlParser;
-import org.apache.doris.analysis.SqlScanner;
-import org.apache.doris.analysis.TableRef;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.MaterializedIndexMeta;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.util.SqlParserUtils;
-import org.apache.doris.statistics.util.StatisticsUtil;
-
-import com.google.common.base.Preconditions;
-
-import java.io.StringReader;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Analysis for the materialized view, only gets constructed when the
AnalyzeStmt is not set which
- * columns to be analyzed.
- * TODO: Supports multi-table mv
- */
-public class MVAnalysisTask extends BaseAnalysisTask {
-
- private static final String ANALYZE_MV_PART = INSERT_PART_STATISTICS
- + " FROM (${sql}) mv ${sampleExpr}";
-
- private static final String ANALYZE_MV_COL = INSERT_COL_STATISTICS
- + " (SELECT NDV(`${colName}`) AS ndv "
- + " FROM (${sql}) mv) t2";
-
- private MaterializedIndexMeta meta;
-
- private SelectStmt selectStmt;
-
- private OlapTable olapTable;
-
- public MVAnalysisTask(AnalysisInfo info) {
- super(info);
- init();
- }
-
- private void init() {
- olapTable = (OlapTable) tbl;
- meta = olapTable.getIndexMetaByIndexId(info.indexId);
- Preconditions.checkState(meta != null);
- String mvDef = meta.getDefineStmt().originStmt;
- SqlScanner input =
- new SqlScanner(new StringReader(mvDef), 0L);
- SqlParser parser = new SqlParser(input);
- CreateMaterializedViewStmt cmv = null;
- try {
- cmv = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser,
0);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- selectStmt = cmv.getSelectStmt();
- selectStmt.getTableRefs().get(0).getName().setDb(db.getFullName());
- }
-
- @Override
- public void doExecute() throws Exception {
- for (Column column : meta.getSchema()) {
- SelectStmt selectOne = (SelectStmt) selectStmt.clone();
- TableRef tableRef = selectOne.getTableRefs().get(0);
- SelectListItem selectItem = selectOne.getSelectList().getItems()
- .stream()
- .filter(i -> isCorrespondingToColumn(i, column))
- .findFirst()
- .get();
- selectItem.setAlias(column.getName());
- Map<String, String> params = new HashMap<>();
- for (String partName : tbl.getPartitionNames()) {
- PartitionNames partitionName = new PartitionNames(false,
- Collections.singletonList(partName));
- tableRef.setPartitionNames(partitionName);
- String sql = selectOne.toSql();
- params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
- params.put("columnStatTbl",
StatisticConstants.STATISTIC_TBL_NAME);
- params.put("catalogId", String.valueOf(catalog.getId()));
- params.put("dbId", String.valueOf(db.getId()));
- params.put("tblId", String.valueOf(tbl.getId()));
- params.put("idxId", String.valueOf(meta.getIndexId()));
- String colName = column.getName();
- params.put("colId", colName);
- String partId = olapTable.getPartition(partName) == null ?
"NULL" :
-
String.valueOf(olapTable.getPartition(partName).getId());
- params.put("partId", partId);
- params.put("dataSizeFunction", getDataSizeFunction(column));
- params.put("dbName", db.getFullName());
- params.put("colName", colName);
- params.put("tblName", tbl.getName());
- params.put("sql", sql);
- StatisticsUtil.execUpdate(ANALYZE_MV_PART, params);
- }
- params.remove("partId");
- params.remove("sampleExpr");
- params.put("type", column.getType().toString());
- StatisticsUtil.execUpdate(ANALYZE_MV_COL, params);
- Env.getCurrentEnv().getStatisticsCache()
- .refreshColStatsSync(meta.getIndexId(), meta.getIndexId(),
column.getName());
- }
- }
-
- // Based on the fact that materialized view create statement's select
expr only contains basic SlotRef and
- // AggregateFunction.
- private boolean isCorrespondingToColumn(SelectListItem item, Column
column) {
- Expr expr = item.getExpr();
- if (expr instanceof SlotRef) {
- SlotRef slotRef = (SlotRef) expr;
- return slotRef.getColumnName().equalsIgnoreCase(column.getName());
- }
- if (expr instanceof FunctionCallExpr) {
- FunctionCallExpr func = (FunctionCallExpr) expr;
- SlotRef slotRef = (SlotRef) func.getChild(0);
- return slotRef.getColumnName().equalsIgnoreCase(column.getName());
- }
- return false;
- }
-
- @Override
- protected void afterExecution() {
- // DO NOTHING
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index 185a582cde4..b0c4b0b6c0e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -22,28 +22,21 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
-import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.QueryState;
-import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
import org.apache.commons.text.StringSubstitutor;
import java.security.SecureRandom;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.StringJoiner;
import java.util.stream.Collectors;
/**
@@ -51,29 +44,6 @@ import java.util.stream.Collectors;
*/
public class OlapAnalysisTask extends BaseAnalysisTask {
- // TODO Currently, NDV is computed for the full table; in fact,
- // NDV should only be computed for the relevant partition.
- private static final String ANALYZE_COLUMN_SQL_TEMPLATE =
INSERT_COL_STATISTICS
- + " (SELECT NDV(`${colName}`) AS ndv "
- + " FROM `${dbName}`.`${tblName}`) t2";
-
- private static final String COLLECT_PARTITION_STATS_SQL_TEMPLATE =
- " SELECT "
- + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-',
${partId}) AS id, "
- + "${catalogId} AS catalog_id, "
- + "${dbId} AS db_id, "
- + "${tblId} AS tbl_id, "
- + "${idxId} AS idx_id, "
- + "'${colId}' AS col_id, "
- + "${partId} AS part_id, "
- + "COUNT(1) AS row_count, "
- + "NDV(`${colName}`) AS ndv, "
- + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END)
AS null_count, "
- + "MIN(`${colName}`) AS min, "
- + "MAX(`${colName}`) AS max, "
- + "${dataSizeFunction} AS data_size, "
- + "NOW() FROM `${dbName}`.`${tblName}` PARTITION
${partitionName}";
-
private static final String SAMPLE_COLUMN_SQL_TEMPLATE = "SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ "${catalogId} AS catalog_id, "
@@ -92,9 +62,6 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
+ "FROM `${dbName}`.`${tblName}`"
+ "${tablets}";
- // cache stats for each partition, it would be inserted into
column_statistics in a batch.
- private final List<List<ColStatsData>> buf = new ArrayList<>();
-
@VisibleForTesting
public OlapAnalysisTask() {
}
@@ -148,45 +115,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
stmtExecutor = new StmtExecutor(r.connectContext,
stringSubstitutor.replace(SAMPLE_COLUMN_SQL_TEMPLATE));
// Scalar query only return one row
ColStatsData colStatsData = new
ColStatsData(stmtExecutor.executeInternalQuery().get(0));
- OlapTable olapTable = (OlapTable) tbl;
- Collection<Partition> partitions = olapTable.getPartitions();
- int partitionCount = partitions.size();
- List<String> values = partitions.stream().map(p -> String.format(
- "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
NOW())",
-
StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1, col.getName(),
p.getId())),
- InternalCatalog.INTERNAL_CATALOG_ID,
- db.getId(),
- tbl.getId(),
- -1,
- StatisticsUtil.quote(col.getName()),
- p.getId(),
- colStatsData.count / partitionCount,
- colStatsData.ndv / partitionCount,
- colStatsData.nullCount / partitionCount,
- StatisticsUtil.quote(colStatsData.minLit),
- StatisticsUtil.quote(colStatsData.maxLit),
- colStatsData.dataSizeInBytes /
partitionCount)).collect(Collectors.toList());
- values.add(String.format(
- "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
NOW())",
-
StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1,
col.getName())),
- InternalCatalog.INTERNAL_CATALOG_ID,
- db.getId(),
- tbl.getId(),
- -1,
- StatisticsUtil.quote(col.getName()),
- "NULL",
- colStatsData.count,
- colStatsData.ndv,
- colStatsData.nullCount,
- StatisticsUtil.quote(colStatsData.minLit),
- StatisticsUtil.quote(colStatsData.maxLit),
- colStatsData.dataSizeInBytes));
- String insertSQL = "INSERT INTO "
- + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME
- + " VALUES "
- + String.join(",", values);
- stmtExecutor = new StmtExecutor(r.connectContext, insertSQL);
- executeWithExceptionOnFail(stmtExecutor);
+ job.appendBuf(this, Collections.singletonList(colStatsData));
}
}
@@ -198,6 +127,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
protected void doFull() throws Exception {
Set<String> partitionNames = info.colToPartitions.get(info.colName);
if (partitionNames.isEmpty()) {
+ job.appendBuf(this, Collections.emptyList());
return;
}
Map<String, String> params = new HashMap<>();
@@ -212,68 +142,14 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
params.put("dbName", db.getFullName());
params.put("colName", String.valueOf(info.colName));
params.put("tblName", String.valueOf(tbl.getName()));
- List<String> partitionAnalysisSQLs = new ArrayList<>();
- try {
- tbl.readLock();
-
- for (String partitionName : partitionNames) {
- Partition part = tbl.getPartition(partitionName);
- if (part == null) {
- continue;
- }
- params.put("partId",
String.valueOf(tbl.getPartition(partitionName).getId()));
- // Avoid error when get the default partition
- params.put("partitionName", "`" + partitionName + "`");
- StringSubstitutor stringSubstitutor = new
StringSubstitutor(params);
-
partitionAnalysisSQLs.add(stringSubstitutor.replace(COLLECT_PARTITION_STATS_SQL_TEMPLATE));
- }
- } finally {
- tbl.readUnlock();
- }
- execSQLs(partitionAnalysisSQLs, params);
+ execSQL(params);
}
@VisibleForTesting
- public void execSQLs(List<String> partitionAnalysisSQLs, Map<String,
String> params) throws Exception {
- long startTime = System.currentTimeMillis();
- LOG.debug("analyze task {} start at {}", info.toString(), new Date());
- try (AutoCloseConnectContext r =
StatisticsUtil.buildConnectContext(info.jobType.equals(JobType.SYSTEM))) {
- List<List<String>> sqlGroups =
Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT);
- for (List<String> group : sqlGroups) {
- if (killed) {
- return;
- }
- StringJoiner partitionCollectSQL = new StringJoiner("UNION
ALL");
- group.forEach(partitionCollectSQL::add);
- stmtExecutor = new StmtExecutor(r.connectContext,
partitionCollectSQL.toString());
- buf.add(stmtExecutor.executeInternalQuery()
-
.stream().map(ColStatsData::new).collect(Collectors.toList()));
- QueryState queryState = r.connectContext.getState();
- if (queryState.getStateType().equals(MysqlStateType.ERR)) {
- throw new RuntimeException(String.format("Failed to
analyze %s.%s.%s, error: %s sql: %s",
- catalog.getName(), db.getFullName(), info.colName,
partitionCollectSQL,
- queryState.getErrorMessage()));
- }
- }
- for (List<ColStatsData> colStatsDataList : buf) {
- StringBuilder batchInsertSQL =
- new StringBuilder("INSERT INTO " +
StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME
- + " VALUES ");
- StringJoiner sj = new StringJoiner(",");
- colStatsDataList.forEach(c -> sj.add(c.toSQL(true)));
- batchInsertSQL.append(sj.toString());
- stmtExecutor = new StmtExecutor(r.connectContext,
batchInsertSQL.toString());
- executeWithExceptionOnFail(stmtExecutor);
- }
- params.put("type", col.getType().toString());
- StringSubstitutor stringSubstitutor = new
StringSubstitutor(params);
- String sql =
stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
- stmtExecutor = new StmtExecutor(r.connectContext, sql);
- executeWithExceptionOnFail(stmtExecutor);
- } finally {
- LOG.debug("analyze task {} end. cost {}ms", info,
- System.currentTimeMillis() - startTime);
- }
+ public void execSQL(Map<String, String> params) throws Exception {
+ StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
+ String collectColStats =
stringSubstitutor.replace(COLLECT_COL_STATISTICS);
+ runQuery(collectColStats);
}
// Get sample tablets id and scale up scaleFactor
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index e6b8297d0c0..f008c8fe301 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -78,12 +78,20 @@ public class StatisticConstants {
public static final int LOAD_RETRY_TIMES = 3;
- // union more relation than 512 may cause StackOverFlowException in the
future.
- public static final int UNION_ALL_LIMIT = 512;
-
public static final String FULL_AUTO_ANALYZE_START_TIME = "00:00:00";
public static final String FULL_AUTO_ANALYZE_END_TIME = "23:59:59";
+ public static final int INSERT_MERGE_ITEM_COUNT = 200;
+
+ public static final long HUGE_TABLE_DEFAULT_SAMPLE_ROWS = 4194304;
+ public static final long HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES = 5L * 1024
* 1024 * 1024;
+
+ public static final long HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS =
TimeUnit.HOURS.toMillis(12);
+
+ public static final int TABLE_STATS_HEALTH_THRESHOLD = 60;
+
+ public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;
+
static {
SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER
+ ClusterNamespace.CLUSTER_DELIMITER +
FeConstants.INTERNAL_DB_NAME);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index 325065d6e26..32cf5cfb24b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -113,7 +113,7 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
if (!(table instanceof OlapTable || table instanceof ExternalTable)) {
return true;
}
- if (table.getDataSize(true) <
Config.huge_table_lower_bound_size_in_bytes) {
+ if (table.getDataSize(true) <
StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) {
return false;
}
TableStatsMeta tableStats =
Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
@@ -121,12 +121,13 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
if (tableStats == null) {
return false;
}
- return System.currentTimeMillis() - tableStats.updatedTime <
Config.huge_table_auto_analyze_interval_in_millis;
+ return System.currentTimeMillis()
+ - tableStats.updatedTime <
StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis();
}
protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
List<AnalysisInfo> analysisInfos, TableIf table) {
- AnalysisMethod analysisMethod = table.getDataSize(true) >
Config.huge_table_lower_bound_size_in_bytes
+ AnalysisMethod analysisMethod = table.getDataSize(true) >
StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
AnalysisInfo jobInfo = new AnalysisInfoBuilder()
.setJobId(Env.getCurrentEnv().getNextId())
@@ -141,7 +142,7 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
.setAnalysisMethod(analysisMethod)
- .setSampleRows(Config.huge_table_default_sample_rows)
+ .setSampleRows(StatisticsUtil.getHugeTableSampleRows())
.setScheduleType(ScheduleType.AUTOMATIC)
.setState(AnalysisState.PENDING)
.setTaskIds(new ArrayList<>())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
index c2f1db6bc4a..638db553987 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -73,14 +73,15 @@ public abstract class StatisticsCollector extends
MasterDaemon {
return;
}
- Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
+ Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
- analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos,
false);
+ analysisManager.createTaskForEachColumns(jobInfo, analysisTasks,
false);
+ Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo,
analysisTasks.values());
if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId,
jobInfo.tblId)) {
- analysisManager.createTableLevelTaskForExternalTable(jobInfo,
analysisTaskInfos, false);
+ analysisManager.createTableLevelTaskForExternalTable(jobInfo,
analysisTasks, false);
}
- Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo,
analysisTaskInfos);
- analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask);
+ Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo,
analysisTasks);
+ analysisTasks.values().forEach(analysisTaskExecutor::submitTask);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
deleted file mode 100644
index f34ad0f1221..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.Config;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class StatisticsPeriodCollector extends StatisticsCollector {
- private static final Logger LOG =
LogManager.getLogger(StatisticsPeriodCollector.class);
-
- public StatisticsPeriodCollector() {
- super("Automatic Analyzer",
-
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2,
- new
AnalysisTaskExecutor(Config.period_analyze_simultaneously_running_task_num));
- }
-
- @Override
- protected void collect() {
- try {
- AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
- List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs();
- for (AnalysisInfo jobInfo : jobInfos) {
- createSystemAnalysisJob(jobInfo);
- }
- } catch (Exception e) {
- LOG.warn("Failed to periodically analyze the statistics." + e);
- }
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java
index 3f9b2641b75..7cd8817a1a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java
@@ -19,6 +19,8 @@ package org.apache.doris.statistics;
import org.apache.doris.statistics.util.StatisticsUtil;
+import com.google.common.annotations.VisibleForTesting;
+
import java.util.StringJoiner;
public class StatsId {
@@ -34,6 +36,17 @@ public class StatsId {
// nullable
public final String partId;
+ @VisibleForTesting
+ public StatsId() {
+ this.id = null;
+ this.catalogId = -1;
+ this.dbId = -1;
+ this.tblId = -1;
+ this.idxId = -1;
+ this.colId = null;
+ this.partId = null;
+ }
+
public StatsId(ResultRow row) {
this.id = row.get(0);
this.catalogId = Long.parseLong(row.get(1));
@@ -52,7 +65,7 @@ public class StatsId {
sj.add(String.valueOf(tblId));
sj.add(String.valueOf(idxId));
sj.add(StatisticsUtil.quote(colId));
- sj.add(StatisticsUtil.quote(partId));
+ sj.add(partId);
return sj.toString();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index cc0fb334a39..931f22d7b02 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -177,12 +177,14 @@ public class StatisticsUtil {
sessionVariable.enablePageCache = false;
sessionVariable.parallelExecInstanceNum =
Config.statistics_sql_parallel_exec_instance_num;
sessionVariable.parallelPipelineTaskNum =
Config.statistics_sql_parallel_exec_instance_num;
- sessionVariable.setEnableNereidsPlanner(false);
+ sessionVariable.setEnableNereidsPlanner(true);
+ sessionVariable.setEnablePipelineEngine(false);
sessionVariable.enableProfile = false;
sessionVariable.enableScanRunSerial = limitScan;
- sessionVariable.queryTimeoutS = Config.analyze_task_timeout_in_hours *
60 * 60;
- sessionVariable.insertTimeoutS = Config.analyze_task_timeout_in_hours
* 60 * 60;
+ sessionVariable.queryTimeoutS = StatisticsUtil.getAnalyzeTimeout();
+ sessionVariable.insertTimeoutS = StatisticsUtil.getAnalyzeTimeout();
sessionVariable.enableFileCache = false;
+ sessionVariable.forbidUnknownColStats = false;
connectContext.setEnv(Env.getCurrentEnv());
connectContext.setDatabase(FeConstants.INTERNAL_DB_NAME);
connectContext.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
@@ -807,7 +809,7 @@ public class StatisticsUtil {
public static boolean inAnalyzeTime(LocalTime now) {
try {
- Pair<LocalTime, LocalTime> range = findRangeFromGlobalSessionVar();
+ Pair<LocalTime, LocalTime> range =
findConfigFromGlobalSessionVar();
if (range == null) {
return false;
}
@@ -824,16 +826,16 @@ public class StatisticsUtil {
}
}
- private static Pair<LocalTime, LocalTime> findRangeFromGlobalSessionVar() {
+ private static Pair<LocalTime, LocalTime> findConfigFromGlobalSessionVar()
{
try {
String startTime =
-
findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME)
+
findConfigFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME)
.fullAutoAnalyzeStartTime;
// For compatibility
if (StringUtils.isEmpty(startTime)) {
startTime = StatisticConstants.FULL_AUTO_ANALYZE_START_TIME;
}
- String endTime =
findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME)
+ String endTime =
findConfigFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME)
.fullAutoAnalyzeEndTime;
if (StringUtils.isEmpty(startTime)) {
endTime = StatisticConstants.FULL_AUTO_ANALYZE_END_TIME;
@@ -845,7 +847,7 @@ public class StatisticsUtil {
}
}
- private static SessionVariable findRangeFromGlobalSessionVar(String
varName) throws Exception {
+ protected static SessionVariable findConfigFromGlobalSessionVar(String
varName) throws Exception {
SessionVariable sessionVariable = VariableMgr.newSessionVariable();
VariableExpr variableExpr = new VariableExpr(varName, SetType.GLOBAL);
VariableMgr.getValue(sessionVariable, variableExpr);
@@ -854,10 +856,71 @@ public class StatisticsUtil {
public static boolean enableAutoAnalyze() {
try {
- return
findRangeFromGlobalSessionVar(SessionVariable.ENABLE_FULL_AUTO_ANALYZE).enableFullAutoAnalyze;
+ return
findConfigFromGlobalSessionVar(SessionVariable.ENABLE_FULL_AUTO_ANALYZE).enableFullAutoAnalyze;
} catch (Exception e) {
LOG.warn("Fail to get value of enable auto analyze, return false
by default", e);
}
return false;
}
+
+ public static int getInsertMergeCount() {
+ try {
+ return
findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT)
+ .statsInsertMergeItemCount;
+ } catch (Exception e) {
+ LOG.warn("Failed to get value of insert_merge_item_count, return
default", e);
+ }
+ return StatisticConstants.INSERT_MERGE_ITEM_COUNT;
+ }
+
+ public static long getHugeTableSampleRows() {
+ try {
+ return
findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_DEFAULT_SAMPLE_ROWS)
+ .hugeTableDefaultSampleRows;
+ } catch (Exception e) {
+ LOG.warn("Failed to get value of huge_table_default_sample_rows,
return default", e);
+ }
+ return StatisticConstants.HUGE_TABLE_DEFAULT_SAMPLE_ROWS;
+ }
+
+ public static long getHugeTableLowerBoundSizeInBytes() {
+ try {
+ return
findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES)
+ .hugeTableLowerBoundSizeInBytes;
+ } catch (Exception e) {
+ LOG.warn("Failed to get value of
huge_table_lower_bound_size_in_bytes, return default", e);
+ }
+ return StatisticConstants.HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES;
+ }
+
+ public static long getHugeTableAutoAnalyzeIntervalInMillis() {
+ try {
+ return
findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS)
+ .hugeTableAutoAnalyzeIntervalInMillis;
+ } catch (Exception e) {
+ LOG.warn("Failed to get value of
huge_table_auto_analyze_interval_in_millis, return default", e);
+ }
+ return StatisticConstants.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS;
+ }
+
+ public static long getTableStatsHealthThreshold() {
+ try {
+ return
findConfigFromGlobalSessionVar(SessionVariable.TABLE_STATS_HEALTH_THRESHOLD)
+ .tableStatsHealthThreshold;
+ } catch (Exception e) {
+ LOG.warn("Failed to get value of table_stats_health_threshold,
return default", e);
+ }
+ return StatisticConstants.TABLE_STATS_HEALTH_THRESHOLD;
+ }
+
+ public static int getAnalyzeTimeout() {
+ try {
+ return
findConfigFromGlobalSessionVar(SessionVariable.ANALYZE_TIMEOUT)
+ .analyzeTimeoutS;
+ } catch (Exception e) {
+ LOG.warn("Failed to get value of table_stats_health_threshold,
return default", e);
+ }
+ return StatisticConstants.ANALYZE_TIMEOUT_IN_SEC;
+ }
+
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index f01485f642f..d4dedd17123 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -17,25 +17,10 @@
package org.apache.doris.statistics;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.InternalSchemaInitializer;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.catalog.Env;
import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
-import org.apache.doris.statistics.AnalysisInfo.AnalysisMode;
-import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
-import org.apache.doris.statistics.AnalysisInfo.JobType;
-import org.apache.doris.statistics.util.DBObjects;
import org.apache.doris.statistics.util.StatisticsUtil;
-import org.apache.doris.utframe.TestWithFeService;
-import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
@@ -44,136 +29,196 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class AnalysisJobTest extends TestWithFeService {
-
- @Override
- protected void runBeforeAll() throws Exception {
- try {
- InternalSchemaInitializer.createDB();
- createDatabase("analysis_job_test");
- connectContext.setDatabase("default_cluster:analysis_job_test");
- createTable("CREATE TABLE t1 (col1 int not null, col2 int not
null, col3 int not null)\n"
- + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n"
- + "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n"
- + ");");
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- FeConstants.runningUnitTest = true;
- }
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AnalysisJobTest {
+ // make user task has been set corresponding job
@Test
- public void testCreateAnalysisJob() throws Exception {
+ public void initTest(@Mocked AnalysisInfo jobInfo, @Mocked
OlapAnalysisTask task) {
+ AnalysisJob analysisJob = new AnalysisJob(jobInfo,
Arrays.asList(task));
+ Assertions.assertSame(task.job, analysisJob);
+ }
- new MockUp<StatisticsUtil>() {
+ @Test
+ public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, @Mocked
OlapAnalysisTask olapAnalysisTask) {
+ AtomicInteger writeBufInvokeTimes = new AtomicInteger();
+ new MockUp<AnalysisJob>() {
+ @Mock
+ protected void writeBuf() {
+ writeBufInvokeTimes.incrementAndGet();
+ }
@Mock
- public AutoCloseConnectContext buildConnectContext() {
- return new AutoCloseConnectContext(connectContext);
+ public void updateTaskState(AnalysisState state, String msg) {
}
@Mock
- public void execUpdate(String sql) throws Exception {
+ public void deregisterJob() {
}
};
+ AnalysisJob job = new AnalysisJob(analysisInfo,
Arrays.asList(olapAnalysisTask));
+ job.queryingTask = new HashSet<>();
+ job.queryingTask.add(olapAnalysisTask);
+ job.queryFinished = new HashSet<>();
+ job.buf = new ArrayList<>();
+ job.totalTaskCount = 20;
+
+ // not all task finished nor cached limit exceed, shouldn't write
+ job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
+ Assertions.assertEquals(0, writeBufInvokeTimes.get());
+ }
- new MockUp<StmtExecutor>() {
+ @Test
+ public void testAppendBufTest2(@Mocked AnalysisInfo analysisInfo, @Mocked
OlapAnalysisTask olapAnalysisTask) {
+ AtomicInteger writeBufInvokeTimes = new AtomicInteger();
+ AtomicInteger deregisterTimes = new AtomicInteger();
+
+ new MockUp<AnalysisJob>() {
@Mock
- public List<ResultRow> executeInternalQuery() {
- return Collections.emptyList();
+ protected void writeBuf() {
+ writeBufInvokeTimes.incrementAndGet();
}
- };
- new MockUp<ConnectContext>() {
+ @Mock
+ public void updateTaskState(AnalysisState state, String msg) {
+ }
@Mock
- public ConnectContext get() {
- return connectContext;
+ public void deregisterJob() {
+ deregisterTimes.getAndIncrement();
}
};
- String sql = "ANALYZE TABLE t1";
- Assertions.assertNotNull(getSqlStmtExecutor(sql));
+ AnalysisJob job = new AnalysisJob(analysisInfo,
Arrays.asList(olapAnalysisTask));
+ job.queryingTask = new HashSet<>();
+ job.queryingTask.add(olapAnalysisTask);
+ job.queryFinished = new HashSet<>();
+ job.buf = new ArrayList<>();
+ job.totalTaskCount = 1;
+
+ job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
+ // all task finished, should write and deregister this job
+ Assertions.assertEquals(1, writeBufInvokeTimes.get());
+ Assertions.assertEquals(1, deregisterTimes.get());
}
@Test
- public void testJobExecution(@Mocked StmtExecutor stmtExecutor, @Mocked
InternalCatalog catalog, @Mocked
- Database database,
- @Mocked OlapTable olapTable)
- throws Exception {
- new MockUp<OlapTable>() {
+ public void testAppendBufTest3(@Mocked AnalysisInfo analysisInfo, @Mocked
OlapAnalysisTask olapAnalysisTask) {
+ AtomicInteger writeBufInvokeTimes = new AtomicInteger();
+ new MockUp<AnalysisJob>() {
@Mock
- public Column getColumn(String name) {
- return new Column("col1", PrimitiveType.INT);
+ protected void writeBuf() {
+ writeBufInvokeTimes.incrementAndGet();
}
- };
-
- new MockUp<StatisticsUtil>() {
@Mock
- public ConnectContext buildConnectContext() {
- return connectContext;
+ public void updateTaskState(AnalysisState state, String msg) {
}
@Mock
- public void execUpdate(String sql) throws Exception {
+ public void deregisterJob() {
}
+ };
+ AnalysisJob job = new AnalysisJob(analysisInfo,
Arrays.asList(olapAnalysisTask));
+ job.queryingTask = new HashSet<>();
+ job.queryingTask.add(olapAnalysisTask);
+ job.queryFinished = new HashSet<>();
+ job.buf = new ArrayList<>();
+ ColStatsData colStatsData = new ColStatsData();
+ for (int i = 0; i < StatisticsUtil.getInsertMergeCount(); i++) {
+ job.buf.add(colStatsData);
+ }
+ job.totalTaskCount = 100;
+
+ job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
+ // cache limit exceed, should write them
+ Assertions.assertEquals(1, writeBufInvokeTimes.get());
+ }
+ @Test
+ public void testUpdateTaskState(
+ @Mocked AnalysisInfo info,
+ @Mocked OlapAnalysisTask task1,
+ @Mocked OlapAnalysisTask task2) {
+ AtomicInteger updateTaskStatusInvokeTimes = new AtomicInteger();
+ new MockUp<AnalysisManager>() {
@Mock
- public DBObjects convertIdToObjects(long catalogId, long dbId,
long tblId) {
- return new DBObjects(catalog, database, olapTable);
+ public void updateTaskStatus(AnalysisInfo info, AnalysisState
taskState, String message, long time) {
+ updateTaskStatusInvokeTimes.getAndIncrement();
}
};
- new MockUp<StatisticsCache>() {
-
+ AnalysisManager analysisManager = new AnalysisManager();
+ new MockUp<Env>() {
@Mock
- public void syncLoadColStats(long tableId, long idxId, String
colName) {
+ public AnalysisManager getAnalysisManager() {
+ return analysisManager;
}
};
- new MockUp<StmtExecutor>() {
+ AnalysisJob job = new AnalysisJob(info,
Collections.singletonList(task1));
+ job.queryFinished = new HashSet<>();
+ job.queryFinished.add(task2);
+ job.updateTaskState(AnalysisState.FAILED, "");
+ Assertions.assertEquals(2, updateTaskStatusInvokeTimes.get());
+ }
+ @Test
+ public void testWriteBuf1(@Mocked AnalysisInfo info,
+ @Mocked OlapAnalysisTask task1, @Mocked OlapAnalysisTask task2) {
+ AnalysisJob job = new AnalysisJob(info,
Collections.singletonList(task1));
+ job.queryFinished = new HashSet<>();
+ job.queryFinished.add(task2);
+ new MockUp<AnalysisJob>() {
@Mock
- public void execute() throws Exception {
-
+ public void updateTaskState(AnalysisState state, String msg) {
}
@Mock
- public List<ResultRow> executeInternalQuery() {
- return new ArrayList<>();
- }
- };
+ protected void executeWithExceptionOnFail(StmtExecutor
stmtExecutor) throws Exception {
- new MockUp<OlapAnalysisTask>() {
+ }
@Mock
- public void execSQLs(List<String> partitionAnalysisSQLs,
Map<String, String> params) throws Exception {}
+ protected void syncLoadStats() {
+ }
};
- HashMap<String, Set<String>> colToPartitions = Maps.newHashMap();
- colToPartitions.put("col1", Collections.singleton("t1"));
- AnalysisInfo analysisJobInfo = new
AnalysisInfoBuilder().setJobId(0).setTaskId(0)
- .setCatalogId(0)
- .setDBId(0)
- .setTblId(0)
- .setColName("col1").setJobType(JobType.MANUAL)
- .setAnalysisMode(AnalysisMode.FULL)
- .setAnalysisMethod(AnalysisMethod.FULL)
- .setAnalysisType(AnalysisType.FUNDAMENTALS)
- .setColToPartitions(colToPartitions)
- .setState(AnalysisState.RUNNING)
- .build();
- new OlapAnalysisTask(analysisJobInfo).doExecute();
new Expectations() {
{
- stmtExecutor.execute();
+ job.syncLoadStats();
times = 1;
}
};
+ job.writeBuf();
+
+ Assertions.assertEquals(0, job.queryFinished.size());
+ }
+
+ @Test
+ public void testWriteBuf2(@Mocked AnalysisInfo info,
+ @Mocked OlapAnalysisTask task1, @Mocked OlapAnalysisTask task2) {
+ new MockUp<AnalysisJob>() {
+ @Mock
+ public void updateTaskState(AnalysisState state, String msg) {
+ }
+
+ @Mock
+ protected void executeWithExceptionOnFail(StmtExecutor
stmtExecutor) throws Exception {
+ throw new RuntimeException();
+ }
+
+ @Mock
+ protected void syncLoadStats() {
+ }
+ };
+ AnalysisJob job = new AnalysisJob(info,
Collections.singletonList(task1));
+ job.buf.add(new ColStatsData());
+ job.queryFinished = new HashSet<>();
+ job.queryFinished.add(task2);
+ job.writeBuf();
+ Assertions.assertEquals(1, job.queryFinished.size());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index c995710da44..6372ce97d6e 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisInfo.JobType;
@@ -340,7 +341,7 @@ public class AnalysisManagerTest {
};
OlapTable olapTable = new OlapTable();
TableStatsMeta stats1 = new TableStatsMeta(0, 50, new
AnalysisInfoBuilder().setColName("col1").build());
- stats1.updatedRows.addAndGet(30);
+ stats1.updatedRows.addAndGet(50);
Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1));
TableStatsMeta stats2 = new TableStatsMeta(0, 190, new
AnalysisInfoBuilder().setColName("col1").build());
@@ -349,4 +350,38 @@ public class AnalysisManagerTest {
}
+ @Test
+ public void testRecordLimit1() {
+ Config.analyze_record_limit = 2;
+ AnalysisManager analysisManager = new AnalysisManager();
+ analysisManager.replayCreateAnalysisJob(new
AnalysisInfoBuilder().setJobId(1).build());
+ analysisManager.replayCreateAnalysisJob(new
AnalysisInfoBuilder().setJobId(2).build());
+ analysisManager.replayCreateAnalysisJob(new
AnalysisInfoBuilder().setJobId(3).build());
+ Assertions.assertEquals(2, analysisManager.analysisJobInfoMap.size());
+
Assertions.assertTrue(analysisManager.analysisJobInfoMap.containsKey(2L));
+
Assertions.assertTrue(analysisManager.analysisJobInfoMap.containsKey(3L));
+ }
+
+ @Test
+ public void testRecordLimit2() {
+ Config.analyze_record_limit = 2;
+ AnalysisManager analysisManager = new AnalysisManager();
+ analysisManager.replayCreateAnalysisTask(new
AnalysisInfoBuilder().setTaskId(1).build());
+ analysisManager.replayCreateAnalysisTask(new
AnalysisInfoBuilder().setTaskId(2).build());
+ analysisManager.replayCreateAnalysisTask(new
AnalysisInfoBuilder().setTaskId(3).build());
+ Assertions.assertEquals(2, analysisManager.analysisTaskInfoMap.size());
+
Assertions.assertTrue(analysisManager.analysisTaskInfoMap.containsKey(2L));
+
Assertions.assertTrue(analysisManager.analysisTaskInfoMap.containsKey(3L));
+ }
+
+ @Test
+ public void testRecordLimit3() {
+ Config.analyze_record_limit = 2;
+ AnalysisManager analysisManager = new AnalysisManager();
+ analysisManager.autoJobs.offer(new
AnalysisInfoBuilder().setJobId(1).build());
+ analysisManager.autoJobs.offer(new
AnalysisInfoBuilder().setJobId(2).build());
+ analysisManager.autoJobs.offer(new
AnalysisInfoBuilder().setJobId(3).build());
+ Assertions.assertEquals(2, analysisManager.autoJobs.size());
+ }
+
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index 19d7798041a..8cfcfeabd28 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Maps;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
@@ -45,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
public class AnalysisTaskExecutorTest extends TestWithFeService {
@@ -82,6 +84,15 @@ public class AnalysisTaskExecutorTest extends
TestWithFeService {
return new Column("col1", PrimitiveType.INT);
}
};
+ final AtomicBoolean cancelled = new AtomicBoolean();
+ new MockUp<AnalysisTaskWrapper>() {
+
+ @Mock
+ public boolean cancel(String msg) {
+ cancelled.set(true);
+ return true;
+ }
+ };
AnalysisInfo analysisJobInfo = new
AnalysisInfoBuilder().setJobId(0).setTaskId(0)
.setCatalogId(0)
.setDBId(0)
@@ -98,7 +109,10 @@ public class AnalysisTaskExecutorTest extends
TestWithFeService {
AnalysisTaskWrapper analysisTaskWrapper = new
AnalysisTaskWrapper(analysisTaskExecutor, analysisJob);
Deencapsulation.setField(analysisTaskWrapper, "startTime", 5);
b.put(analysisTaskWrapper);
- analysisTaskExecutor.start();
+ analysisTaskExecutor.tryToCancel();
+ Assertions.assertTrue(cancelled.get());
+ Assertions.assertTrue(b.isEmpty());
+
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
similarity index 97%
copy from
fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
copy to fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
index f01485f642f..268540885da 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
@@ -50,7 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-public class AnalysisJobTest extends TestWithFeService {
+public class AnalyzeTest extends TestWithFeService {
@Override
protected void runBeforeAll() throws Exception {
@@ -154,6 +154,12 @@ public class AnalysisJobTest extends TestWithFeService {
@Mock
public void execSQLs(List<String> partitionAnalysisSQLs,
Map<String, String> params) throws Exception {}
};
+
+ new MockUp<BaseAnalysisTask>() {
+
+ @Mock
+ protected void runQuery(String sql) {}
+ };
HashMap<String, Set<String>> colToPartitions = Maps.newHashMap();
colToPartitions.put("col1", Collections.singleton("t1"));
AnalysisInfo analysisJobInfo = new
AnalysisInfoBuilder().setJobId(0).setTaskId(0)
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
index f5b98a47ce0..95ed5023e36 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.ha.FrontendNodeType;
@@ -31,6 +32,9 @@ import org.apache.doris.system.Frontend;
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
import org.apache.doris.utframe.TestWithFeService;
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
@@ -40,9 +44,11 @@ import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
+import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -50,6 +56,7 @@ import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
public class CacheTest extends TestWithFeService {
@@ -350,4 +357,29 @@ public class CacheTest extends TestWithFeService {
}
};
}
+
+ @Test
+ public void testEvict() {
+ ThreadPoolExecutor threadPool
+ = ThreadPoolManager.newDaemonFixedThreadPool(
+ 1, Integer.MAX_VALUE, "STATS_FETCH", true);
+ AsyncLoadingCache<Integer, Integer> columnStatisticsCache =
+ Caffeine.newBuilder()
+ .maximumSize(1)
+
.refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
+ .executor(threadPool)
+ .buildAsync(new AsyncCacheLoader<Integer, Integer>() {
+ @Override
+ public @NonNull CompletableFuture<Integer>
asyncLoad(@NonNull Integer integer,
+ @NonNull Executor executor) {
+ return CompletableFuture.supplyAsync(() -> {
+ return integer;
+ }, threadPool);
+ }
+ });
+ columnStatisticsCache.get(1);
+ columnStatisticsCache.get(2);
+
Assertions.assertTrue(columnStatisticsCache.synchronous().asMap().containsKey(2));
+ Assertions.assertEquals(1,
columnStatisticsCache.synchronous().asMap().size());
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java
index d618a5fa538..f2b9f84f0d0 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java
@@ -19,47 +19,36 @@ package org.apache.doris.statistics;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.Config;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
+import org.apache.doris.statistics.AnalysisInfo.JobType;
+import org.apache.doris.statistics.util.StatisticsUtil;
-import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class OlapAnalysisTaskTest {
+ // test manual
@Test
- public void testAutoSample(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf
databaseIf, @Mocked TableIf tableIf) {
- new Expectations() {
- {
- tableIf.getDataSize(true);
- result = 60_0000_0000L;
- }
- };
+ public void testSample1(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf
databaseIf, @Mocked TableIf tableIf) {
AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder()
.setAnalysisMethod(AnalysisMethod.FULL);
+ analysisInfoBuilder.setJobType(JobType.MANUAL);
OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
olapAnalysisTask.info = analysisInfoBuilder.build();
olapAnalysisTask.tbl = tableIf;
- Config.enable_auto_sample = true;
TableSample tableSample = olapAnalysisTask.getTableSample();
- Assertions.assertEquals(4194304, tableSample.getSampleValue());
- Assertions.assertFalse(tableSample.isPercent());
-
- new Expectations() {
- {
- tableIf.getDataSize(true);
- result = 1_0000_0000L;
- }
- };
- tableSample = olapAnalysisTask.getTableSample();
Assertions.assertNull(tableSample);
analysisInfoBuilder.setSampleRows(10);
+ analysisInfoBuilder.setJobType(JobType.MANUAL);
analysisInfoBuilder.setAnalysisMethod(AnalysisMethod.SAMPLE);
olapAnalysisTask.info = analysisInfoBuilder.build();
tableSample = olapAnalysisTask.getTableSample();
@@ -67,4 +56,49 @@ public class OlapAnalysisTaskTest {
Assertions.assertFalse(tableSample.isPercent());
}
+ // test auto big table
+ @Test
+ public void testSample2(@Mocked OlapTable tbl) {
+ new MockUp<OlapTable>() {
+
+ @Mock
+ public long getDataSize(boolean singleReplica) {
+ return 1000_0000_0000L;
+ }
+ };
+
+ AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder()
+ .setAnalysisMethod(AnalysisMethod.FULL);
+ analysisInfoBuilder.setJobType(JobType.SYSTEM);
+ OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
+ olapAnalysisTask.info = analysisInfoBuilder.build();
+ olapAnalysisTask.tbl = tbl;
+ TableSample tableSample = olapAnalysisTask.getTableSample();
+ Assertions.assertNotNull(tableSample);
+ Assertions.assertEquals(StatisticsUtil.getHugeTableSampleRows(),
tableSample.getSampleValue());
+
+ }
+
+ // test auto small table
+ @Test
+ public void testSample3(@Mocked OlapTable tbl) {
+ new MockUp<OlapTable>() {
+
+ @Mock
+ public long getDataSize(boolean singleReplica) {
+ return 1000;
+ }
+ };
+
+ AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder()
+ .setAnalysisMethod(AnalysisMethod.FULL);
+ analysisInfoBuilder.setJobType(JobType.SYSTEM);
+ OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
+ olapAnalysisTask.info = analysisInfoBuilder.build();
+ olapAnalysisTask.tbl = tbl;
+ TableSample tableSample = olapAnalysisTask.getTableSample();
+ Assertions.assertNull(tableSample);
+
+ }
+
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
new file mode 100644
index 00000000000..d441ce5b09d
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.View;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
+import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
+import org.apache.doris.statistics.AnalysisInfo.JobType;
+import org.apache.doris.statistics.util.StatisticsUtil;
+import org.apache.doris.system.SystemInfoService;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.apache.hadoop.util.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class StatisticsAutoCollectorTest {
+
+ @Test
+ public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) {
+ new MockUp<CatalogIf>() {
+ @Mock
+ public Collection<DatabaseIf> getAllDbs() {
+ Database db1 = new Database(1,
SystemInfoService.DEFAULT_CLUSTER
+ + ClusterNamespace.CLUSTER_DELIMITER +
FeConstants.INTERNAL_DB_NAME);
+ Database db2 = new Database(2, "anyDB");
+ List<DatabaseIf> databaseIfs = new ArrayList<>();
+ databaseIfs.add(db1);
+ databaseIfs.add(db2);
+ return databaseIfs;
+ }
+ };
+ new MockUp<StatisticsAutoCollector>() {
+ @Mock
+ public List<AnalysisInfo>
constructAnalysisInfo(DatabaseIf<TableIf> db) {
+ return Arrays.asList(analysisInfo, analysisInfo);
+ }
+
+ int count = 0;
+
+ @Mock
+ public AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo)
{
+ return count++ == 0 ? null : jobInfo;
+ }
+
+ @Mock
+ public void createSystemAnalysisJob(AnalysisInfo jobInfo)
+ throws DdlException {
+
+ }
+ };
+
+ StatisticsAutoCollector saa = new StatisticsAutoCollector();
+ saa.runAfterCatalogReady();
+ new Expectations() {
+ {
+ try {
+ saa.createSystemAnalysisJob((AnalysisInfo) any);
+ times = 1;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ @Test
+ public void testConstructAnalysisInfo(
+ @Injectable OlapTable o2, @Injectable View v) {
+ new MockUp<Database>() {
+ @Mock
+ public List<Table> getTables() {
+ List<Table> tableIfs = new ArrayList<>();
+ tableIfs.add(o2);
+ tableIfs.add(v);
+ return tableIfs;
+ }
+
+ @Mock
+ public String getFullName() {
+ return "anyDb";
+ }
+ };
+
+ new MockUp<OlapTable>() {
+ @Mock
+ public String getName() {
+ return "anytable";
+ }
+
+ @Mock
+ public List<Column> getBaseSchema() {
+ List<Column> columns = new ArrayList<>();
+ columns.add(new Column("c1", PrimitiveType.INT));
+ columns.add(new Column("c2", PrimitiveType.HLL));
+ return columns;
+ }
+ };
+ StatisticsAutoCollector saa = new StatisticsAutoCollector();
+ List<AnalysisInfo> analysisInfos =
+ saa.constructAnalysisInfo(new Database(1, "anydb"));
+ Assertions.assertEquals(1, analysisInfos.size());
+ Assertions.assertEquals("c1",
analysisInfos.get(0).colName.split(",")[0]);
+ }
+
+ @Test
+ public void testGetReAnalyzeRequiredPart0() {
+
+ TableIf tableIf = new OlapTable();
+
+ new MockUp<OlapTable>() {
+ @Mock
+ protected Map<String, Set<String>> findReAnalyzeNeededPartitions()
{
+ Set<String> partitionNames = new HashSet<>();
+ partitionNames.add("p1");
+ partitionNames.add("p2");
+ Map<String, Set<String>> map = new HashMap<>();
+ map.put("col1", partitionNames);
+ return map;
+ }
+
+ @Mock
+ public long getRowCount() {
+ return 100;
+ }
+
+ @Mock
+ public List<Column> getBaseSchema() {
+ return Lists.newArrayList(new Column("col1", Type.INT), new
Column("col2", Type.INT));
+ }
+ };
+
+ new MockUp<StatisticsUtil>() {
+ @Mock
+ public TableIf findTable(long catalogName, long dbName, long
tblName) {
+ return tableIf;
+ }
+ };
+ AnalysisInfo analysisInfo = new
AnalysisInfoBuilder().setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(
+
AnalysisType.FUNDAMENTALS).setColName("col1").setJobType(JobType.SYSTEM).build();
+ new MockUp<AnalysisManager>() {
+
+ int count = 0;
+
+ TableStatsMeta[] tableStatsArr =
+ new TableStatsMeta[] {new TableStatsMeta(0, 0,
analysisInfo),
+ new TableStatsMeta(0, 0, analysisInfo), null};
+
+ {
+ tableStatsArr[0].updatedRows.addAndGet(100);
+ tableStatsArr[1].updatedRows.addAndGet(0);
+ }
+
+ @Mock
+ public TableStatsMeta findTableStatsStatus(long tblId) {
+ return tableStatsArr[count++];
+ }
+ };
+
+ new MockUp<StatisticsAutoCollector>() {
+ @Mock
+ public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo,
TableIf table,
+ Set<String> needRunPartitions) {
+ return new AnalysisInfoBuilder().build();
+ }
+ };
+ StatisticsAutoCollector statisticsAutoCollector = new
StatisticsAutoCollector();
+ AnalysisInfo analysisInfo2 = new AnalysisInfoBuilder()
+ .setCatalogId(0)
+ .setDBId(0)
+ .setTblId(0).build();
+
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
+ // uncomment it when updatedRows gets ready
+ //
Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
+
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
+ }
+
+ @Test
+ public void testLoop() {
+ AtomicBoolean timeChecked = new AtomicBoolean();
+ AtomicBoolean switchChecked = new AtomicBoolean();
+ new MockUp<StatisticsUtil>() {
+
+ @Mock
+ public boolean inAnalyzeTime(LocalTime now) {
+ timeChecked.set(true);
+ return true;
+ }
+
+ @Mock
+ public boolean enableAutoAnalyze() {
+ switchChecked.set(true);
+ return true;
+ }
+ };
+ StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
+ autoCollector.collect();
+ Assertions.assertTrue(timeChecked.get() && switchChecked.get());
+
+ }
+
+ @Test
+ public void checkAvailableThread() {
+ StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
+
Assertions.assertEquals(Config.full_auto_analyze_simultaneously_running_task_num,
+
autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize());
+ }
+
+ @Test
+ public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta
stats, @Mocked TableIf anyOtherTable) {
+ new MockUp<OlapTable>() {
+
+ @Mock
+ public long getDataSize(boolean singleReplica) {
+ return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5
+ 1000000000;
+ }
+ };
+
+ new MockUp<AnalysisManager>() {
+
+ @Mock
+ public TableStatsMeta findTableStatsStatus(long tblId) {
+ return stats;
+ }
+ };
+ // A very huge table has been updated recently, so we should skip it
this time
+ stats.updatedTime = System.currentTimeMillis() - 1000;
+ StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
+ Assertions.assertTrue(autoCollector.skip(olapTable));
+ // The update of this huge table is long time ago, so we shouldn't
skip it this time
+ stats.updatedTime = System.currentTimeMillis()
+ - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() -
10000;
+ Assertions.assertFalse(autoCollector.skip(olapTable));
+ new MockUp<AnalysisManager>() {
+
+ @Mock
+ public TableStatsMeta findTableStatsStatus(long tblId) {
+ return null;
+ }
+ };
+ // can't find table stats meta, which means this table never get
analyzed, so we shouldn't skip it this time
+ Assertions.assertFalse(autoCollector.skip(olapTable));
+ // this is not olap table nor external table, so we should skip it
this time
+ Assertions.assertTrue(autoCollector.skip(anyOtherTable));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
index c0d4a656d75..c0c790c9c25 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java
@@ -19,9 +19,15 @@ package org.apache.doris.statistics.util;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.qe.SessionVariable;
-import org.junit.Test;
+import mockit.Mock;
+import mockit.MockUp;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
public class StatisticsUtilTest {
@Test
@@ -67,4 +73,42 @@ public class StatisticsUtilTest {
Assertions.fail();
}
}
+
+ @Test
+ public void testInAnalyzeTime1() {
+ new MockUp<StatisticsUtil>() {
+
+ @Mock
+ protected SessionVariable findConfigFromGlobalSessionVar(String
varName) throws Exception {
+ SessionVariable sessionVariable = new SessionVariable();
+ sessionVariable.fullAutoAnalyzeStartTime = "00:00:00";
+ sessionVariable.fullAutoAnalyzeEndTime = "02:00:00";
+ return sessionVariable;
+ }
+ };
+ DateTimeFormatter timeFormatter =
DateTimeFormatter.ofPattern("HH:mm:ss");
+ String now = "01:00:00";
+
Assertions.assertTrue(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now,
timeFormatter)));
+ now = "13:00:00";
+
Assertions.assertFalse(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now,
timeFormatter)));
+ }
+
+ @Test
+ public void testInAnalyzeTime2() {
+ new MockUp<StatisticsUtil>() {
+
+ @Mock
+ protected SessionVariable findConfigFromGlobalSessionVar(String
varName) throws Exception {
+ SessionVariable sessionVariable = new SessionVariable();
+ sessionVariable.fullAutoAnalyzeStartTime = "00:00:00";
+ sessionVariable.fullAutoAnalyzeEndTime = "23:00:00";
+ return sessionVariable;
+ }
+ };
+ DateTimeFormatter timeFormatter =
DateTimeFormatter.ofPattern("HH:mm:ss");
+ String now = "15:00:00";
+
Assertions.assertTrue(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now,
timeFormatter)));
+ now = "23:30:00";
+
Assertions.assertFalse(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now,
timeFormatter)));
+ }
}
diff --git a/regression-test/suites/statistics/analyze_stats.groovy
b/regression-test/suites/statistics/analyze_stats.groovy
index 4e4c4a08425..3a33b672bfe 100644
--- a/regression-test/suites/statistics/analyze_stats.groovy
+++ b/regression-test/suites/statistics/analyze_stats.groovy
@@ -57,7 +57,11 @@ suite("test_analyze") {
`analyzetestlimitedk8` double null comment "",
`analyzetestlimitedk9` float null comment "",
`analyzetestlimitedk12` string null comment "",
- `analyzetestlimitedk13` largeint(40) null comment ""
+ `analyzetestlimitedk13` largeint(40) null comment "",
+ `analyzetestlimitedk14` ARRAY<int(11)> NULL COMMENT "",
+ `analyzetestlimitedk15` Map<STRING, INT> NULL COMMENT "",
+ `analyzetestlimitedk16` STRUCT<s_id:int(11), s_name:string,
s_address:string> NULL,
+ `analyzetestlimitedk17` JSON NULL
) engine=olap
DUPLICATE KEY(`analyzetestlimitedk3`)
DISTRIBUTED BY HASH(`analyzetestlimitedk3`) BUCKETS 5
properties("replication_num" = "1")
@@ -67,26 +71,39 @@ suite("test_analyze") {
INSERT INTO `${tbl}` VALUES
(-2103297891,1,101,15248,4761818404925265645,939926.283,
'UTmCFKMbprf0zSVOIlBJRNOl3JcNBdOsnCDt','2022-09-28','2022-10-28
01:56:56','tVvGDSrN6kyn',
-954349107.187117,-40.46286,'g1ZP9nqVgaGKya3kPERdBofTWJQ4TIJEz972Xvw4hfPpTpWwlmondiLVTCyld7rSBlSWrE7NJRB0pvPGEFQKOx1s3',
- '-1559301292834325905'),
+ '-1559301292834325905', NULL, NULL, NULL, NULL),
(-2094982029,0,-81,-14746,-2618177187906633064,121889.100,NULL,'2023-05-01','2022-11-25
00:24:12',
'36jVI0phYfhFucAOEASbh4OdvUYcI7QZFgQSveNyfGcRRUtQG9HGN1UcCmUH',-82250254.174239,NULL,
-
'bTUHnMC4v7dI8U3TK0z4wZHdytjfHQfF1xKdYAVwPVNMT4fT4F92hj8ENQXmCkWtfp','6971810221218612372'),
+
'bTUHnMC4v7dI8U3TK0z4wZHdytjfHQfF1xKdYAVwPVNMT4fT4F92hj8ENQXmCkWtfp','6971810221218612372',
NULL, NULL, NULL, NULL),
(-1840301109,1,NULL,NULL,7805768460922079440,546556.220,'wC7Pif9SJrg9b0wicGfPz2ezEmEKotmN6AMI',NULL,
'2023-05-20 18:13:14','NM5SLu62SGeuD',-1555800813.9748349,-11122.953,
-
'NH97wIjXk7dspvvfUUKe41ZetUnDmqLxGg8UYXwOwK3Jlu7dxO2GE9UJjyKW0NBxqUk1DY','-5004534044262380098'),
+
'NH97wIjXk7dspvvfUUKe41ZetUnDmqLxGg8UYXwOwK3Jlu7dxO2GE9UJjyKW0NBxqUk1DY','-5004534044262380098',
NULL, NULL, NULL, NULL),
(-1819679967,0,10,NULL,-5772413527188525359,-532045.626,'kqMe4VYEZAmajLthCLRkl8StDQHKrDWz91AQ','2022-06-30',
'2023-02-22
15:30:38','wAbeF3p84j5pFJTInQuKZOezFbsy8HIjmuUF',-1766437367.4377379,1791.4128,
-
'6OWmBD04UeKt1xI2XnR8t1kPG7qEYrf4J8RkA8UMs4HF33Yl','-8433424551792664598'),
+
'6OWmBD04UeKt1xI2XnR8t1kPG7qEYrf4J8RkA8UMs4HF33Yl','-8433424551792664598',
NULL, NULL, NULL, NULL),
(-1490846276,0,NULL,7744,6074522476276146996,594200.976,NULL,'2022-11-27','2023-03-11
21:28:44',
'yr8AuJLr2ud7DIwlt06cC7711UOsKslcDyySuqqfQE5X7Vjic6azHOrM6W',-715849856.288922,3762.217,
-
'4UpWZJ0Twrefw0Tm0AxFS38V5','7406302706201801560'),(-1465848366,1,72,29170,-5585523608136628843,-34210.874,
+ '4UpWZJ0Twrefw0Tm0AxFS38V5','7406302706201801560', NULL, NULL, NULL,
NULL),(-1465848366,1,72,29170,-5585523608136628843,-34210.874,
'rMGygAWU91Wa3b5A7l1wheo6EF0o6zhw4YeE','2022-09-20','2023-06-11
18:17:16','B6m9S9O2amsa4SXrEKK0ivJ2x9m1u8av',
-
862085772.298349,-22304.209,'1','-3399178642401166400'),(-394034614,1,65,5393,-200651968801088119,NULL,
+ 862085772.298349,-22304.209,'1','-3399178642401166400', NULL, NULL,
NULL, NULL),(-394034614,1,65,5393,-200651968801088119,NULL,
'9MapWX9pn8zes9Gey1lhRsH3ATyQPIysjQYi','2023-05-11','2022-07-02
02:56:53','z5VWbuKr6HiK7yC7MRIoQGrb98VUS',
-
1877828963.091433,-1204.1926,'fSDQqT38rkrJEi6fwc90rivgQcRPaW5V1aEmZpdSvUm','8882970420609470903'),
+
1877828963.091433,-1204.1926,'fSDQqT38rkrJEi6fwc90rivgQcRPaW5V1aEmZpdSvUm','8882970420609470903',
NULL, NULL, NULL, NULL),
(-287465855,0,-10,-32484,-5161845307234178602,748718.592,'n64TXbG25DQL5aw5oo9o9cowSjHCXry9HkId','2023-01-02',
'2022-11-17
14:58:52','d523m4PwLdHZtPTqSoOBo5IGivCKe4A1Sc8SKCILFxgzYLe0',NULL,27979.855,
-
'ps7qwcZjBjkGfcXYMw5HQMwnElzoHqinwk8vhQCbVoGBgfotc4oSkpD3tP34h4h0tTogDMwFu60iJm1bofUzyUQofTeRwZk8','4692206687866847780')
+
'ps7qwcZjBjkGfcXYMw5HQMwnElzoHqinwk8vhQCbVoGBgfotc4oSkpD3tP34h4h0tTogDMwFu60iJm1bofUzyUQofTeRwZk8','4692206687866847780',
NULL, NULL, NULL, NULL)
+ """
+
+ sql """
+ SET enable_nereids_planner=true;
+
+ """
+
+ sql """
+ SET forbid_unknown_col_stats=false;
+ """
+
+ sql """
+ SELECT * FROM ${tbl}
"""
sql """
@@ -97,10 +114,6 @@ suite("test_analyze") {
ANALYZE DATABASE ${db} WITH SYNC
"""
- sql """
- SET enable_nereids_planner=true;
-
- """
sql """
SET enable_fallback_to_original_planner=false;
"""
@@ -110,9 +123,9 @@ suite("test_analyze") {
Thread.sleep(1000 * 60)
- sql """
- SELECT * FROM ${tbl};
- """
+// sql """
+// SELECT * FROM ${tbl};
+// """
sql """
DROP STATS ${tbl}(analyzetestlimitedk3)
@@ -120,51 +133,51 @@ suite("test_analyze") {
def exception = null
- try {
- sql """
- SELECT * FROM ${tbl};
- """
- } catch (Exception e) {
- exception = e
- }
-
- assert exception != null
-
- exception = null
+// try {
+// sql """
+// SELECT * FROM ${tbl};
+// """
+// } catch (Exception e) {
+// exception = e
+// }
+//
+// assert exception != null
+//
+// exception = null
sql """
ANALYZE TABLE ${tbl} WITH SYNC
"""
- sql """
- SELECT * FROM ${tbl};
- """
+// sql """
+// SELECT * FROM ${tbl};
+// """
sql """
DROP STATS ${tbl}
"""
- try {
- sql """
- SELECT * FROM ${tbl};
- """
- } catch (Exception e) {
- exception = e
- }
+// try {
+// sql """
+// SELECT * FROM ${tbl};
+// """
+// } catch (Exception e) {
+// exception = e
+// }
- a_result_1 = sql """
+ def a_result_1 = sql """
ANALYZE DATABASE ${db} WITH SYNC WITH SAMPLE PERCENT 10
"""
- a_result_2 = sql """
+ def a_result_2 = sql """
ANALYZE DATABASE ${db} WITH SYNC WITH SAMPLE PERCENT 5
"""
- a_result_3 = sql """
+ def a_result_3 = sql """
ANALYZE DATABASE ${db} WITH SAMPLE PERCENT 5
"""
- show_result = sql """
+ def show_result = sql """
SHOW ANALYZE
"""
@@ -891,8 +904,24 @@ PARTITION `p599` VALUES IN (599)
}
assert expected_col_stats(id_col_stats, 600, 1)
- assert expected_col_stats(id_col_stats, 599, 7)
+ assert (int) Double.parseDouble(id_col_stats[0][2]) < 700
+ && (int) Double.parseDouble(id_col_stats[0][2]) > 500
+ assert expected_col_stats(id_col_stats, 0, 3)
+ assert expected_col_stats(id_col_stats, 2400, 4)
+ assert expected_col_stats(id_col_stats, 4, 5)
assert expected_col_stats(id_col_stats, 0, 6)
+ assert expected_col_stats(id_col_stats, 599, 7)
+
+ def update_time = id_col_stats[0][8]
+
+ sql """ANALYZE TABLE test_600_partition_table_analyze WITH SYNC"""
+
+ // Data has no change, update time shouldn't be update since this table
don't need to analyze again
+ id_col_stats_2 = sql """
+ SHOW COLUMN CACHED STATS test_600_partition_table_analyze(id);
+ """
+
+ assert update_time == id_col_stats_2[0][8]
sql """DROP TABLE IF EXISTS increment_analyze_test"""
sql """
@@ -1151,4 +1180,39 @@ PARTITION `p599` VALUES IN (599)
return (r[0][7]).equals(expected_value)
}
expected_max(max, "测试")
+
+ show_result = sql """
+ SHOW ANALYZE ${tbl}
+ """
+
+ def tbl_name_as_expetected = { r,name ->
+ for (int i = 0; i < r.size; i++) {
+ if (r[i][3] != name) {
+ return false
+ }
+ }
+ return true
+ }
+
+ assert show_result[0][9] == "FINISHED"
+ assert tbl_name_as_expetected(show_result, "${tbl}")
+
+ show_result = sql """
+ SHOW ANALYZE ${tbl} WHERE STATE = "FINISHED"
+ """
+
+ assert show_result.size() > 0
+
+ def all_finished = { r ->
+ for (int i = 0; i < r.size; i++) {
+ if (r[i][9] != "FINISHED") {
+ return false
+ }
+ }
+ return true
+ }
+
+ assert all_finished(show_result)
+
+
}
diff --git a/regression-test/suites/statistics/test_agg_complex_type.groovy
b/regression-test/suites/statistics/test_agg_complex_type.groovy
new file mode 100644
index 00000000000..55af87f35bd
--- /dev/null
+++ b/regression-test/suites/statistics/test_agg_complex_type.groovy
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_analyze_with_agg_complex_type") {
+ sql """drop table if exists test_agg_complex_type;"""
+
+ sql """create table test_agg_complex_type (
+ datekey int,
+ device_id bitmap BITMAP_UNION NULL,
+ hll_test hll hll_union,
+ qs QUANTILE_STATE QUANTILE_UNION
+ )
+ aggregate key (datekey)
+ distributed by hash(datekey) buckets 1
+ properties(
+ "replication_num" = "1"
+ );"""
+
+ sql """insert into test_agg_complex_type values (1,to_bitmap(1),
hll_hash("11"), TO_QUANTILE_STATE("11", 1.0));"""
+
+ sql """insert into test_agg_complex_type values (2, to_bitmap(1),
hll_hash("12"), TO_QUANTILE_STATE("11", 1.0));"""
+
+ sql """ANALYZE TABLE test_agg_complex_type WITH SYNC"""
+
+ def show_result = sql """SHOW COLUMN CACHED STATS test_agg_complex_type"""
+
+ assert show_result.size() == 1
+
+ def expected_col_stats = { r, expected_value, idx ->
+ return (int) Double.parseDouble(r[0][idx]) == expected_value
+ }
+
+ assert expected_col_stats(show_result, 2, 1)
+ assert expected_col_stats(show_result, 0, 3)
+ assert expected_col_stats(show_result, 8, 4)
+ assert expected_col_stats(show_result, 4, 5)
+ assert expected_col_stats(show_result, 1, 6)
+ assert expected_col_stats(show_result, 2, 7)
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]