This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3ebd6e1649 [feat](stats) Support delete expired auto analysis tasks
(#19922)
3ebd6e1649 is described below
commit 3ebd6e1649eedc9214b53e374ee08dcc1a166679
Author: AKIRA <[email protected]>
AuthorDate: Thu May 25 13:25:11 2023 +0900
[feat](stats) Support delete expired auto analysis tasks (#19922)
---
.../apache/doris/statistics/StatisticsCleaner.java | 30 ++++++++++++++--------
.../doris/statistics/StatisticsRepository.java | 22 +++++++++++++---
2 files changed, 39 insertions(+), 13 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java
index de740be92e..b23310b240 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java
@@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
/**
@@ -101,13 +102,18 @@ public class StatisticsCleaner extends MasterDaemon {
}
private void clearJobTbl() {
+ clearJobTbl(StatisticsRepository::fetchExpiredAutoJob, true);
+ clearJobTbl(StatisticsRepository::fetchExpiredOnceJobs, false);
+ }
+
+ private void clearJobTbl(BiFunction<Integer, Long, List<ResultRow>>
fetchFunc, boolean taskOnly) {
List<String> jobIds = null;
long offset = 0;
do {
jobIds = new ArrayList<>();
- offset = findExpiredJobs(jobIds, offset);
+ offset = findExpiredJobs(jobIds, offset, fetchFunc);
doDelete("job_id", jobIds, FeConstants.INTERNAL_DB_NAME + "."
- + StatisticConstants.ANALYSIS_JOB_TABLE);
+ + StatisticConstants.ANALYSIS_JOB_TABLE, taskOnly);
} while (!jobIds.isEmpty());
}
@@ -165,22 +171,22 @@ public class StatisticsCleaner extends MasterDaemon {
private void deleteExpiredStats(ExpiredStats expiredStats, String tblName)
{
doDelete("catalog_id", expiredStats.expiredCatalog.stream()
.map(String::valueOf).collect(Collectors.toList()),
- FeConstants.INTERNAL_DB_NAME + "." + tblName);
+ FeConstants.INTERNAL_DB_NAME + "." + tblName, false);
doDelete("db_id", expiredStats.expiredDatabase.stream()
.map(String::valueOf).collect(Collectors.toList()),
- FeConstants.INTERNAL_DB_NAME + "." + tblName);
+ FeConstants.INTERNAL_DB_NAME + "." + tblName, false);
doDelete("tbl_id", expiredStats.expiredTable.stream()
.map(String::valueOf).collect(Collectors.toList()),
- FeConstants.INTERNAL_DB_NAME + "." + tblName);
+ FeConstants.INTERNAL_DB_NAME + "." + tblName, false);
doDelete("idx_id", expiredStats.expiredIdxId.stream()
.map(String::valueOf).collect(Collectors.toList()),
- FeConstants.INTERNAL_DB_NAME + "." + tblName);
+ FeConstants.INTERNAL_DB_NAME + "." + tblName, false);
doDelete("id", expiredStats.ids.stream()
.map(String::valueOf).collect(Collectors.toList()),
- FeConstants.INTERNAL_DB_NAME + "." + tblName);
+ FeConstants.INTERNAL_DB_NAME + "." + tblName, false);
}
- private void doDelete(String/*col name*/ colName, List<String> pred,
String tblName) {
+ private void doDelete(String/*col name*/ colName, List<String> pred,
String tblName, boolean taskOnly) {
String deleteTemplate = "DELETE FROM " + tblName + " WHERE ${left} IN
(${right})";
if (CollectionUtils.isEmpty(pred)) {
return;
@@ -190,6 +196,9 @@ public class StatisticsCleaner extends MasterDaemon {
params.put("left", colName);
params.put("right", right);
String sql = new StringSubstitutor(params).replace(deleteTemplate);
+ if (taskOnly) {
+ sql += " AND task_id != -1";
+ }
try {
StatisticsUtil.execUpdate(sql);
} catch (Exception e) {
@@ -255,10 +264,11 @@ public class StatisticsCleaner extends MasterDaemon {
return pos;
}
- private long findExpiredJobs(List<String> jobIds, long offset) {
+ private long findExpiredJobs(List<String> jobIds, long offset,
BiFunction<Integer, Long, List<ResultRow>>
+ fetchFunc) {
long pos = offset;
while (pos < jobTbl.getRowCount() && jobIds.size() <
Config.max_allowed_in_element_num_of_delete) {
- List<ResultRow> rows =
StatisticsRepository.fetchExpiredJobs(StatisticConstants.FETCH_LIMIT, pos);
+ List<ResultRow> rows =
fetchFunc.apply(StatisticConstants.FETCH_LIMIT, pos);
for (ResultRow r : rows) {
try {
jobIds.add(r.getColumnValue("job_id"));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
index 8b7789c5a0..ae16445942 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
@@ -93,7 +93,7 @@ public class StatisticsRepository {
private static final String DROP_TABLE_STATISTICS_TEMPLATE = "DELETE FROM
" + FeConstants.INTERNAL_DB_NAME
+ "." + "${tblName}" + " WHERE ${condition}";
- private static final String FIND_EXPIRED_JOBS = "SELECT job_id FROM "
+ private static final String FIND_EXPIRED_ONCE_JOBS = "SELECT job_id FROM "
+ FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME
+ " WHERE task_id = -1 AND ${now} - last_exec_time_in_ms > "
+
TimeUnit.HOURS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS)
@@ -101,6 +101,14 @@ public class StatisticsRepository {
+ " ORDER BY last_exec_time_in_ms"
+ " LIMIT ${limit} OFFSET ${offset}";
+ private static final String FIND_EXPIRED_AUTO_JOBS = "SELECT
DISTINCT(job_id) FROM (SELECT job_id FROM "
+ + FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME
+ + "WHERE task_id != -1 AND ${now} - last_exec_time_in_ms > "
+ +
TimeUnit.HOURS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS)
+ + " AND schedule_type = 'PERIOD' OR schedule_type = 'AUTOMATIC'"
+ + " ORDER BY last_exec_time_in_ms"
+ + " LIMIT ${limit} OFFSET ${offset}) t";
+
private static final String FETCH_RECENT_STATS_UPDATED_COL =
"SELECT * FROM "
+ FeConstants.INTERNAL_DB_NAME + "." +
StatisticConstants.STATISTIC_TBL_NAME
@@ -382,12 +390,20 @@ public class StatisticsRepository {
return StatisticsUtil.execStatisticQuery(new
StringSubstitutor(params).replace(FETCH_STATS_FULL_NAME));
}
- public static List<ResultRow> fetchExpiredJobs(long limit, long offset) {
+ public static List<ResultRow> fetchExpiredOnceJobs(long limit, long
offset) {
+ Map<String, String> params = new HashMap<>();
+ params.put("limit", String.valueOf(limit));
+ params.put("offset", String.valueOf(offset));
+ params.put("now", String.valueOf(System.currentTimeMillis()));
+ return StatisticsUtil.execStatisticQuery(new
StringSubstitutor(params).replace(FIND_EXPIRED_ONCE_JOBS));
+ }
+
+ public static List<ResultRow> fetchExpiredAutoJob(long limit, long offset)
{
Map<String, String> params = new HashMap<>();
params.put("limit", String.valueOf(limit));
params.put("offset", String.valueOf(offset));
params.put("now", String.valueOf(System.currentTimeMillis()));
- return StatisticsUtil.execStatisticQuery(new
StringSubstitutor(params).replace(FIND_EXPIRED_JOBS));
+ return StatisticsUtil.execStatisticQuery(new
StringSubstitutor(params).replace(FIND_EXPIRED_AUTO_JOBS));
}
public static Map<String, Set<Long>> fetchColAndPartsForStats(long tblId) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]