This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ebd6e1649 [feat](stats) Support delete expired auto analysis tasks 
(#19922)
3ebd6e1649 is described below

commit 3ebd6e1649eedc9214b53e374ee08dcc1a166679
Author: AKIRA <[email protected]>
AuthorDate: Thu May 25 13:25:11 2023 +0900

    [feat](stats) Support delete expired auto analysis tasks (#19922)
---
 .../apache/doris/statistics/StatisticsCleaner.java | 30 ++++++++++++++--------
 .../doris/statistics/StatisticsRepository.java     | 22 +++++++++++++---
 2 files changed, 39 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java
index de740be92e..b23310b240 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 /**
@@ -101,13 +102,18 @@ public class StatisticsCleaner extends MasterDaemon {
     }
 
     private void clearJobTbl() {
+        clearJobTbl(StatisticsRepository::fetchExpiredAutoJob, true);
+        clearJobTbl(StatisticsRepository::fetchExpiredOnceJobs, false);
+    }
+
+    private void clearJobTbl(BiFunction<Integer, Long, List<ResultRow>> 
fetchFunc, boolean taskOnly) {
         List<String> jobIds = null;
         long offset = 0;
         do {
             jobIds = new ArrayList<>();
-            offset = findExpiredJobs(jobIds, offset);
+            offset = findExpiredJobs(jobIds, offset, fetchFunc);
             doDelete("job_id", jobIds, FeConstants.INTERNAL_DB_NAME + "."
-                    + StatisticConstants.ANALYSIS_JOB_TABLE);
+                    + StatisticConstants.ANALYSIS_JOB_TABLE, taskOnly);
         } while (!jobIds.isEmpty());
     }
 
@@ -165,22 +171,22 @@ public class StatisticsCleaner extends MasterDaemon {
     private void deleteExpiredStats(ExpiredStats expiredStats, String tblName) 
{
         doDelete("catalog_id", expiredStats.expiredCatalog.stream()
                         .map(String::valueOf).collect(Collectors.toList()),
-                FeConstants.INTERNAL_DB_NAME + "." + tblName);
+                FeConstants.INTERNAL_DB_NAME + "." + tblName, false);
         doDelete("db_id", expiredStats.expiredDatabase.stream()
                         .map(String::valueOf).collect(Collectors.toList()),
-                FeConstants.INTERNAL_DB_NAME + "." + tblName);
+                FeConstants.INTERNAL_DB_NAME + "." + tblName, false);
         doDelete("tbl_id", expiredStats.expiredTable.stream()
                         .map(String::valueOf).collect(Collectors.toList()),
-                FeConstants.INTERNAL_DB_NAME + "." + tblName);
+                FeConstants.INTERNAL_DB_NAME + "." + tblName, false);
         doDelete("idx_id", expiredStats.expiredIdxId.stream()
                         .map(String::valueOf).collect(Collectors.toList()),
-                FeConstants.INTERNAL_DB_NAME + "." + tblName);
+                FeConstants.INTERNAL_DB_NAME + "." + tblName, false);
         doDelete("id", expiredStats.ids.stream()
                         .map(String::valueOf).collect(Collectors.toList()),
-                FeConstants.INTERNAL_DB_NAME + "." + tblName);
+                FeConstants.INTERNAL_DB_NAME + "." + tblName, false);
     }
 
-    private void doDelete(String/*col name*/ colName, List<String> pred, 
String tblName) {
+    private void doDelete(String/*col name*/ colName, List<String> pred, 
String tblName, boolean taskOnly) {
         String deleteTemplate = "DELETE FROM " + tblName + " WHERE ${left} IN 
(${right})";
         if (CollectionUtils.isEmpty(pred)) {
             return;
@@ -190,6 +196,9 @@ public class StatisticsCleaner extends MasterDaemon {
         params.put("left", colName);
         params.put("right", right);
         String sql = new StringSubstitutor(params).replace(deleteTemplate);
+        if (taskOnly) {
+            sql += " AND task_id != -1";
+        }
         try {
             StatisticsUtil.execUpdate(sql);
         } catch (Exception e) {
@@ -255,10 +264,11 @@ public class StatisticsCleaner extends MasterDaemon {
         return pos;
     }
 
-    private long findExpiredJobs(List<String> jobIds, long offset) {
+    private long findExpiredJobs(List<String> jobIds, long offset, 
BiFunction<Integer, Long, List<ResultRow>>
+            fetchFunc) {
         long pos = offset;
         while (pos < jobTbl.getRowCount() && jobIds.size() < 
Config.max_allowed_in_element_num_of_delete) {
-            List<ResultRow> rows = 
StatisticsRepository.fetchExpiredJobs(StatisticConstants.FETCH_LIMIT, pos);
+            List<ResultRow> rows = 
fetchFunc.apply(StatisticConstants.FETCH_LIMIT, pos);
             for (ResultRow r : rows) {
                 try {
                     jobIds.add(r.getColumnValue("job_id"));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
index 8b7789c5a0..ae16445942 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java
@@ -93,7 +93,7 @@ public class StatisticsRepository {
     private static final String DROP_TABLE_STATISTICS_TEMPLATE = "DELETE FROM 
" + FeConstants.INTERNAL_DB_NAME
             + "." + "${tblName}" + " WHERE ${condition}";
 
-    private static final String FIND_EXPIRED_JOBS = "SELECT job_id FROM "
+    private static final String FIND_EXPIRED_ONCE_JOBS = "SELECT job_id FROM "
             + FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME
             + " WHERE task_id = -1 AND ${now} - last_exec_time_in_ms  > "
             + 
TimeUnit.HOURS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS)
@@ -101,6 +101,14 @@ public class StatisticsRepository {
             + " ORDER BY last_exec_time_in_ms"
             + " LIMIT ${limit} OFFSET ${offset}";
 
+    private static final String FIND_EXPIRED_AUTO_JOBS = "SELECT 
DISTINCT(job_id) FROM (SELECT job_id FROM "
+            + FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME
+            + "WHERE task_id != -1 AND ${now} - last_exec_time_in_ms  > "
+            + 
TimeUnit.HOURS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS)
+            + " AND schedule_type = 'PERIOD' OR schedule_type = 'AUTOMATIC'"
+            + " ORDER BY last_exec_time_in_ms"
+            + " LIMIT ${limit} OFFSET ${offset}) t";
+
     private static final String FETCH_RECENT_STATS_UPDATED_COL =
             "SELECT * FROM "
                     + FeConstants.INTERNAL_DB_NAME + "." + 
StatisticConstants.STATISTIC_TBL_NAME
@@ -382,12 +390,20 @@ public class StatisticsRepository {
         return StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params).replace(FETCH_STATS_FULL_NAME));
     }
 
-    public static List<ResultRow> fetchExpiredJobs(long limit, long offset) {
+    public static List<ResultRow> fetchExpiredOnceJobs(long limit, long 
offset) {
+        Map<String, String> params = new HashMap<>();
+        params.put("limit", String.valueOf(limit));
+        params.put("offset", String.valueOf(offset));
+        params.put("now", String.valueOf(System.currentTimeMillis()));
+        return StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params).replace(FIND_EXPIRED_ONCE_JOBS));
+    }
+
+    public static List<ResultRow> fetchExpiredAutoJob(long limit, long offset) 
{
         Map<String, String> params = new HashMap<>();
         params.put("limit", String.valueOf(limit));
         params.put("offset", String.valueOf(offset));
         params.put("now", String.valueOf(System.currentTimeMillis()));
-        return StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params).replace(FIND_EXPIRED_JOBS));
+        return StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params).replace(FIND_EXPIRED_AUTO_JOBS));
     }
 
     public static Map<String, Set<Long>> fetchColAndPartsForStats(long tblId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to