This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new cd339658bd KYLIN-5381 Avoid timeout when cleaning query history by 
limiting the number of data deleted each time (#29355)
cd339658bd is described below

commit cd339658bd37822b720ced6526c3356d3938b068
Author: Wang Hui <wanda1...@users.noreply.github.com>
AuthorDate: Mon Oct 31 12:03:03 2022 +0800

    KYLIN-5381 Avoid timeout when cleaning query history by limiting the number 
of data deleted each time (#29355)
    
    Co-authored-by: hui.wang <hui.w...@kyligence.io>
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../metadata/query/JdbcQueryHistoryStore.java      | 101 +++++++++++++++------
 .../kylin/metadata/query/QueryHistoryDAO.java      |   8 +-
 .../kylin/metadata/query/QueryHistoryMapper.java   |   6 ++
 .../metadata/query/QueryHistoryProjectInfo.java    |  20 ++++
 .../kylin/metadata/query/RDBMSQueryHistoryDAO.java |  78 +++++++++++-----
 .../metadata/query/util/QueryHisStoreUtil.java     |  39 +++++---
 .../metadata/query/RDBMSQueryHistoryDaoTest.java   |  72 +++++++++++++--
 8 files changed, 253 insertions(+), 75 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9c4601705f..c917c96e7c 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2801,6 +2801,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Integer.parseInt(getOptional("kylin.query.queryhistory.project-max-size", 
"1000000"));
     }
 
+    public int getQueryHistorySingleDeletionSize() {
+        return 
Integer.parseInt(getOptional("kylin.query.queryhistory.single-deletion-size", 
"2000"));
+    }
+
     public long getQueryHistorySurvivalThreshold() {
         return 
TimeUtil.timeStringAs(getOptional("kylin.query.queryhistory.survival-time-threshold",
 "30d"),
                 TimeUnit.MILLISECONDS);
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
index a4b3194f34..250c704991 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
@@ -25,6 +25,7 @@ import static 
org.mybatis.dynamic.sql.SqlBuilder.isGreaterThan;
 import static org.mybatis.dynamic.sql.SqlBuilder.isGreaterThanOrEqualTo;
 import static org.mybatis.dynamic.sql.SqlBuilder.isIn;
 import static org.mybatis.dynamic.sql.SqlBuilder.isLessThan;
+import static org.mybatis.dynamic.sql.SqlBuilder.isLessThanOrEqualTo;
 import static org.mybatis.dynamic.sql.SqlBuilder.isLike;
 import static org.mybatis.dynamic.sql.SqlBuilder.isLikeCaseInsensitive;
 import static org.mybatis.dynamic.sql.SqlBuilder.isNotEqualTo;
@@ -40,8 +41,10 @@ import java.io.PrintWriter;
 import java.nio.charset.Charset;
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
@@ -67,6 +70,7 @@ import org.mybatis.dynamic.sql.render.RenderingStrategies;
 import org.mybatis.dynamic.sql.select.QueryExpressionDSL;
 import org.mybatis.dynamic.sql.select.SelectModel;
 import org.mybatis.dynamic.sql.select.join.EqualTo;
+import org.mybatis.dynamic.sql.select.aggregate.Count;
 import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
 import org.mybatis.dynamic.sql.update.render.UpdateStatementProvider;
 
@@ -198,24 +202,28 @@ public class JdbcQueryHistoryStore {
         try (SqlSession session = sqlSessionFactory.openSession()) {
             QueryHistoryMapper mapper = 
session.getMapper(QueryHistoryMapper.class);
             SelectStatementProvider statementProvider = 
selectDistinct(queryHistoryRealizationTable.queryId)
-                    
.from(queryHistoryRealizationTable).where(queryHistoryRealizationTable.model, 
isIn(modelIds))
+                    .from(queryHistoryRealizationTable) //
+                    .where(queryHistoryRealizationTable.model, isIn(modelIds)) 
//
                     .build().render(RenderingStrategies.MYBATIS3);
-            return 
mapper.selectMany(statementProvider).stream().map(QueryHistory::getQueryId).collect(Collectors.toList());
+            return 
mapper.selectMany(statementProvider).stream().map(QueryHistory::getQueryId)
+                    .collect(Collectors.toList());
         }
     }
 
     public List<QueryStatistics> 
queryQueryHistoriesModelIds(QueryHistoryRequest request, int size) {
         try (SqlSession session = sqlSessionFactory.openSession()) {
             QueryStatisticsMapper mapper = 
session.getMapper(QueryStatisticsMapper.class);
-            SelectStatementProvider statementProvider1 = 
selectDistinct(queryHistoryTable.engineType).from(queryHistoryTable)
-                    .where(queryHistoryTable.engineType, 
isNotEqualTo("NATIVE"))
-                    .and(queryHistoryTable.projectName, 
isEqualTo(request.getProject()))
+            SelectStatementProvider statementProvider1 = 
selectDistinct(queryHistoryTable.engineType)
+                    .from(queryHistoryTable) //
+                    .where(queryHistoryTable.engineType, 
isNotEqualTo("NATIVE")) //
+                    .and(queryHistoryTable.projectName, 
isEqualTo(request.getProject())) //
                     .build().render(RenderingStrategies.MYBATIS3);
             List<QueryStatistics> engineTypes = 
mapper.selectMany(statementProvider1);
 
-            SelectStatementProvider statementProvider2 = 
selectDistinct(queryHistoryRealizationTable.model).from(queryHistoryRealizationTable)
-                    .where(queryHistoryRealizationTable.projectName, 
isEqualTo(request.getProject()))
-                    .limit(size)
+            SelectStatementProvider statementProvider2 = 
selectDistinct(queryHistoryRealizationTable.model)
+                    .from(queryHistoryRealizationTable) //
+                    .where(queryHistoryRealizationTable.projectName, 
isEqualTo(request.getProject())) //
+                    .limit(size) //
                     .build().render(RenderingStrategies.MYBATIS3);
             List<QueryStatistics> modelIds = 
mapper.selectMany(statementProvider2);
             engineTypes.addAll(modelIds);
@@ -223,33 +231,70 @@ public class JdbcQueryHistoryStore {
         }
     }
 
-    public QueryHistory queryOldestQueryHistory(long maxSize) {
+    public QueryHistory getOldestQueryHistory(long index) {
         try (SqlSession session = sqlSessionFactory.openSession()) {
             QueryHistoryMapper mapper = 
session.getMapper(QueryHistoryMapper.class);
             SelectStatementProvider statementProvider = 
select(getSelectFields(queryHistoryTable))
                     .from(queryHistoryTable) //
-                    .orderBy(queryHistoryTable.id.descending()) //
+                    .orderBy(queryHistoryTable.id) //
                     .limit(1) //
-                    .offset(maxSize - 1) //
+                    .offset(index - 1L) //
                     .build().render(RenderingStrategies.MYBATIS3);
             return mapper.selectOne(statementProvider);
         }
     }
 
-    public QueryHistory queryOldestQueryHistory(long maxSize, String project) {
+    public QueryHistory getOldestQueryHistory(String project, long index) {
         try (SqlSession session = sqlSessionFactory.openSession()) {
             QueryHistoryMapper mapper = 
session.getMapper(QueryHistoryMapper.class);
-            SelectStatementProvider statementProvider = 
select(getSelectFields(queryHistoryTable)) //
+            SelectStatementProvider statementProvider = 
select(getSelectFields(queryHistoryTable))
                     .from(queryHistoryTable) //
                     .where(queryHistoryTable.projectName, isEqualTo(project)) 
//
-                    .orderBy(queryHistoryTable.id.descending()) //
+                    .orderBy(queryHistoryTable.id) //
                     .limit(1) //
-                    .offset(maxSize - 1) //
+                    .offset(index - 1L) //
                     .build().render(RenderingStrategies.MYBATIS3);
             return mapper.selectOne(statementProvider);
         }
     }
 
+    public Long getCountOnQueryHistory() {
+        try (SqlSession session = sqlSessionFactory.openSession()) {
+            QueryHistoryMapper mapper = 
session.getMapper(QueryHistoryMapper.class);
+            SelectStatementProvider statementProvider = 
select(Count.of(queryHistoryTable.id)) //
+                    .from(queryHistoryTable) //
+                    .build().render(RenderingStrategies.MYBATIS3);
+            return mapper.selectAsLong(statementProvider);
+        }
+    }
+
+    public Long getCountOnQueryHistory(long retainTime) {
+        try (SqlSession session = sqlSessionFactory.openSession()) {
+            QueryHistoryMapper mapper = 
session.getMapper(QueryHistoryMapper.class);
+            SelectStatementProvider statementProvider = 
select(Count.of(queryHistoryTable.id).as(COUNT)) //
+                    .from(queryHistoryTable) //
+                    .where(queryHistoryTable.queryTime, 
isLessThan(retainTime)) //
+                    .build().render(RenderingStrategies.MYBATIS3);
+            return mapper.selectAsLong(statementProvider);
+        }
+    }
+
+    public Map<String, Long> getCountGroupByProject() {
+        Map<String, Long> projectCounts = new HashMap<>();
+        List<QueryHistoryProjectInfo> projectInfos;
+        try (SqlSession session = sqlSessionFactory.openSession()) {
+            QueryHistoryMapper mapper = 
session.getMapper(QueryHistoryMapper.class);
+            SelectStatementProvider statementProvider = 
select(queryHistoryTable.projectName,
+                    count(queryHistoryTable.id).as(COUNT)) //
+                    .from(queryHistoryTable) //
+                    .groupBy(queryHistoryTable.projectName) //
+                    .build().render(RenderingStrategies.MYBATIS3);
+            projectInfos = mapper.selectByProject(statementProvider);
+        }
+        projectInfos.forEach(projectInfo -> 
projectCounts.put(projectInfo.getProjectName(), projectInfo.getCount()));
+        return projectCounts;
+    }
+
     public QueryHistory queryByQueryId(String queryId) {
         try (SqlSession session = sqlSessionFactory.openSession()) {
             QueryHistoryMapper mapper = 
session.getMapper(QueryHistoryMapper.class);
@@ -398,27 +443,28 @@ public class JdbcQueryHistoryStore {
         }
     }
 
-    public void deleteQueryHistory(long queryTime) {
+    public int deleteQueryHistory(long id) {
         long startTime = System.currentTimeMillis();
         try (SqlSession session = sqlSessionFactory.openSession()) {
             QueryHistoryMapper mapper = 
session.getMapper(QueryHistoryMapper.class);
             DeleteStatementProvider deleteStatement = 
SqlBuilder.deleteFrom(queryHistoryTable) //
-                    .where(queryHistoryTable.queryTime, isLessThan(queryTime)) 
//
+                    .where(queryHistoryTable.id, isLessThanOrEqualTo(id)) //
                     .build().render(RenderingStrategies.MYBATIS3);
             int deleteRows = mapper.delete(deleteStatement);
             session.commit();
             if (deleteRows > 0) {
                 log.info("Delete {} row query history takes {} ms", 
deleteRows, System.currentTimeMillis() - startTime);
             }
+            return deleteRows;
         }
     }
 
-    public void deleteQueryHistory(long queryTime, String project) {
+    public int deleteQueryHistory(String project, long id) {
         long startTime = System.currentTimeMillis();
         try (SqlSession session = sqlSessionFactory.openSession()) {
             QueryHistoryMapper mapper = 
session.getMapper(QueryHistoryMapper.class);
             DeleteStatementProvider deleteStatement = 
SqlBuilder.deleteFrom(queryHistoryTable) //
-                    .where(queryHistoryTable.queryTime, isLessThan(queryTime)) 
//
+                    .where(queryHistoryTable.id, isLessThanOrEqualTo(id)) //
                     .and(queryHistoryTable.projectName, isEqualTo(project)) //
                     .build().render(RenderingStrategies.MYBATIS3);
             int deleteRows = mapper.delete(deleteStatement);
@@ -427,6 +473,7 @@ public class JdbcQueryHistoryStore {
                 log.info("Delete {} row query history for project [{}] takes 
{} ms", deleteRows, project,
                         System.currentTimeMillis() - startTime);
             }
+            return deleteRows;
         }
     }
 
@@ -463,7 +510,7 @@ public class JdbcQueryHistoryStore {
         }
     }
 
-    public void deleteQueryHistoryRealization(long queryTime, String project) {
+    public void deleteQueryHistoryRealization(String project, long queryTime) {
         long startTime = System.currentTimeMillis();
         try (SqlSession session = sqlSessionFactory.openSession()) {
             QueryHistoryMapper mapper = 
session.getMapper(QueryHistoryMapper.class);
@@ -631,7 +678,8 @@ public class JdbcQueryHistoryStore {
             if (request.isSubmitterExactlyMatch()) {
                 filterSql = filterSql.and(queryHistoryTable.querySubmitter, 
isIn(request.getFilterSubmitter()));
             } else if (request.getFilterSubmitter().size() == 1) {
-                filterSql = filterSql.and(queryHistoryTable.querySubmitter, 
isLikeCaseInsensitive("%" + request.getFilterSubmitter().get(0) + "%"));
+                filterSql = filterSql.and(queryHistoryTable.querySubmitter,
+                        isLikeCaseInsensitive("%" + 
request.getFilterSubmitter().get(0) + "%"));
             }
         }
 
@@ -655,12 +703,14 @@ public class JdbcQueryHistoryStore {
             }
         } else if (selectAllModels) {
             // Process CONSTANTS, HIVE, RDBMS and all model
-            filterSql = filterSql.and(queryHistoryTable.engineType, 
isIn(realizations), or(queryHistoryTable.indexHit, isEqualTo(true)));
+            filterSql = filterSql.and(queryHistoryTable.engineType, 
isIn(realizations),
+                    or(queryHistoryTable.indexHit, isEqualTo(true)));
         } else if (request.getFilterModelIds() != null && 
!request.getFilterModelIds().isEmpty()) {
             // Process CONSTANTS, HIVE, RDBMS and model1, model2, model3...
-            filterSql = filterSql.and(queryHistoryTable.engineType, 
isIn(realizations), or(queryHistoryTable.queryId,
-                    
isIn(selectDistinct(queryHistoryRealizationTable.queryId).from(queryHistoryRealizationTable)
-                            .where(queryHistoryRealizationTable.model, 
isIn(request.getFilterModelIds())))));
+            filterSql = filterSql.and(queryHistoryTable.engineType, 
isIn(realizations),
+                    or(queryHistoryTable.queryId,
+                            
isIn(selectDistinct(queryHistoryRealizationTable.queryId).from(queryHistoryRealizationTable)
+                                    .where(queryHistoryRealizationTable.model, 
isIn(request.getFilterModelIds())))));
         } else {
             // Process CONSTANTS, HIVE, RDBMS
             filterSql = filterSql.and(queryHistoryTable.engineType, 
isIn(realizations));
@@ -762,5 +812,4 @@ public class JdbcQueryHistoryStore {
                 queryHistoryTable.queryTime, queryHistoryTable.resultRowCount, 
queryHistoryTable.sql,
                 queryHistoryTable.sqlPattern, 
queryHistoryTable.totalScanBytes, queryHistoryTable.totalScanCount);
     }
-
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
index cd91f42287..d954368f35 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.metadata.query;
 
 import java.util.List;
+import java.util.Map;
 
 public interface QueryHistoryDAO {
 
@@ -44,10 +45,10 @@ public interface QueryHistoryDAO {
 
     void deleteQueryHistoriesIfMaxSizeReached();
 
-    void deleteQueryHistoriesIfProjectMaxSizeReached(String project);
-
     void deleteQueryHistoriesIfRetainTimeReached();
 
+    void deleteOldestQueryHistoriesByProject(String project, int deleteCount);
+
     long getQueryHistoriesSize(QueryHistoryRequest request, String project);
 
     QueryHistory getByQueryId(String queryId);
@@ -59,4 +60,7 @@ public interface QueryHistoryDAO {
     String getRealizationMetricMeasurement();
 
     List<QueryDailyStatistic> getQueryDailyStatistic(long startTime, long 
endTime);
+
+    Map<String, Long> getQueryCountByProject();
+
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java
index ed39f0a9c6..e5a24f4678 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java
@@ -69,6 +69,12 @@ public interface QueryHistoryMapper {
             @Result(column = "reserved_field_3", property = 
"queryHistoryInfo", jdbcType = JdbcType.BLOB, typeHandler = 
QueryHistoryTable.QueryHistoryInfoHandler.class) })
     List<QueryHistory> selectMany(SelectStatementProvider selectStatement);
 
+    @SelectProvider(type = SqlProviderAdapter.class, method = "select")
+    @Results(id = "QueryHistoryProjectInfoResult", value = {
+            @Result(column = "project_name", property = "projectName", 
jdbcType = JdbcType.VARCHAR),
+            @Result(column = "count", property = "count", jdbcType = 
JdbcType.BIGINT) })
+    List<QueryHistoryProjectInfo> selectByProject(SelectStatementProvider 
selectStatement);
+
     @SelectProvider(type = SqlProviderAdapter.class, method = "select")
     @ResultMap("QueryHistoryResult")
     QueryHistory selectOne(SelectStatementProvider selectStatement);
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryProjectInfo.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryProjectInfo.java
new file mode 100644
index 0000000000..2324979f71
--- /dev/null
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryProjectInfo.java
@@ -0,0 +1,20 @@
+package org.apache.kylin.metadata.query;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Getter
+@Setter
+@Slf4j
+public class QueryHistoryProjectInfo {
+
+    public static final String PROJECT_NAME = "project_name";
+
+    @JsonProperty(PROJECT_NAME)
+    private String projectName;
+
+    private long count;
+
+}
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
index 73124496de..14fdf0c4a1 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
@@ -23,18 +23,19 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.Date;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
+import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.Singletons;
 import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,8 +46,8 @@ public class RDBMSQueryHistoryDAO implements QueryHistoryDAO {
     private static final Logger logger = 
LoggerFactory.getLogger(RDBMSQueryHistoryDAO.class);
     @Setter
     private String queryMetricMeasurement;
-    private String realizationMetricMeasurement;
-    private JdbcQueryHistoryStore jdbcQueryHisStore;
+    private final String realizationMetricMeasurement;
+    private final JdbcQueryHistoryStore jdbcQueryHisStore;
 
     public static final String WEEK = "week";
     public static final String DAY = "day";
@@ -104,34 +105,43 @@ public class RDBMSQueryHistoryDAO implements 
QueryHistoryDAO {
         jdbcQueryHisStore.deleteQueryHistoryRealization(project);
     }
 
-    public void deleteQueryHistoriesIfMaxSizeReached() {
-        QueryHistory queryHistory = jdbcQueryHisStore
-                
.queryOldestQueryHistory(KylinConfig.getInstanceFromEnv().getQueryHistoryMaxSize());
-        if (Objects.nonNull(queryHistory)) {
-            long time = queryHistory.getQueryTime();
-            jdbcQueryHisStore.deleteQueryHistory(time);
-            jdbcQueryHisStore.deleteQueryHistoryRealization(time);
-        }
-    }
-
     public QueryHistory getByQueryId(String queryId) {
         return jdbcQueryHisStore.queryByQueryId(queryId);
     }
 
-    public void deleteQueryHistoriesIfProjectMaxSizeReached(String project) {
-        QueryHistory queryHistory = jdbcQueryHisStore
-                
.queryOldestQueryHistory(KylinConfig.getInstanceFromEnv().getQueryHistoryProjectMaxSize(),
 project);
-        if (Objects.nonNull(queryHistory)) {
-            long time = queryHistory.getQueryTime();
-            jdbcQueryHisStore.deleteQueryHistory(time, project);
-            jdbcQueryHisStore.deleteQueryHistoryRealization(time, project);
+    public void deleteQueryHistoriesIfMaxSizeReached() {
+        long maxSize = 
KylinConfig.getInstanceFromEnv().getQueryHistoryMaxSize();
+        long totalCount = jdbcQueryHisStore.getCountOnQueryHistory();
+        if (totalCount > maxSize) {
+            deleteQueryHistoryAndRealization((int) (totalCount - maxSize));
         }
     }
 
     public void deleteQueryHistoriesIfRetainTimeReached() {
-        long retainTime = getRetainTime();
-        jdbcQueryHisStore.deleteQueryHistory(retainTime);
-        jdbcQueryHisStore.deleteQueryHistoryRealization(retainTime);
+        long rangeOutCount = 
jdbcQueryHisStore.getCountOnQueryHistory(getRetainTime());
+        if (rangeOutCount > 0) {
+            deleteQueryHistoryAndRealization((int) rangeOutCount);
+        }
+    }
+
+    public void deleteQueryHistoryAndRealization(int deleteCount) {
+        int singleLimit = 
KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize();
+        largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> {
+            QueryHistory queryHistory = 
jdbcQueryHisStore.getOldestQueryHistory(currentCount);
+            int deletedRows = 
jdbcQueryHisStore.deleteQueryHistory(queryHistory.getId());
+            
jdbcQueryHisStore.deleteQueryHistoryRealization(queryHistory.getQueryTime());
+            return deletedRows;
+        }, "Cleanup all query history");
+    }
+
+    public void deleteOldestQueryHistoriesByProject(String project, int 
deleteCount) {
+        int singleLimit = 
KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize();
+        largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> {
+            QueryHistory queryHistory = 
jdbcQueryHisStore.getOldestQueryHistory(project, currentCount);
+            int deletedRows = jdbcQueryHisStore.deleteQueryHistory(project, 
queryHistory.getId());
+            jdbcQueryHisStore.deleteQueryHistoryRealization(project, 
queryHistory.getQueryTime());
+            return deletedRows;
+        }, "Cleanup project<" + project + "> query history");
     }
 
     public void batchUpdateQueryHistoriesInfo(List<Pair<Long, 
QueryHistoryInfo>> idToQHInfoList) {
@@ -215,6 +225,11 @@ public class RDBMSQueryHistoryDAO implements 
QueryHistoryDAO {
         return jdbcQueryHisStore.queryAvgDurationByTime(startTime, endTime, 
timeDimension, project);
     }
 
+    @Override
+    public Map<String, Long> getQueryCountByProject() {
+        return jdbcQueryHisStore.getCountGroupByProject();
+    }
+
     public static void fillZeroForQueryStatistics(List<QueryStatistics> 
queryStatistics, long startTime, long endTime,
             String dimension) {
         if (!dimension.equalsIgnoreCase(DAY) && 
!dimension.equalsIgnoreCase(WEEK)) {
@@ -245,4 +260,19 @@ public class RDBMSQueryHistoryDAO implements 
QueryHistoryDAO {
             }
         }
     }
+
+    public static void largeSplitToSmallTask(int totalCount, int singleSize, 
IntFunction<Integer> function,
+            String description) {
+        int retainCount = totalCount;
+        while (retainCount > 0) {
+            int currentCount = Math.min(retainCount, singleSize);
+            int actualCount = function.apply(currentCount);
+            if (currentCount != actualCount && logger.isWarnEnabled()) {
+                logger.warn("The task {} was not performed as expected, 
expect:{}, actual:{}", description,
+                        currentCount, actualCount);
+            }
+            retainCount -= currentCount;
+        }
+    }
+
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
index 1c0b8c779e..f6589ed5d0 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
@@ -27,11 +27,13 @@ import java.nio.charset.Charset;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Properties;
 
 import javax.sql.DataSource;
 
 import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.lang3.time.StopWatch;
 import org.apache.ibatis.jdbc.ScriptRunner;
 import org.apache.ibatis.mapping.Environment;
 import org.apache.ibatis.session.Configuration;
@@ -45,7 +47,6 @@ import org.apache.kylin.common.Singletons;
 import org.apache.kylin.common.logging.LogOutputStream;
 import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
 import org.apache.kylin.common.util.SetThreadName;
-import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.query.QueryHistoryDAO;
@@ -165,28 +166,40 @@ public class QueryHisStoreUtil {
         try (SetThreadName ignored = new 
SetThreadName("QueryHistoryCleanWorker")) {
             val config = KylinConfig.getInstanceFromEnv();
             val projectManager = NProjectManager.getInstance(config);
+
             getQueryHistoryDao().deleteQueryHistoriesIfMaxSizeReached();
             getQueryHistoryDao().deleteQueryHistoriesIfRetainTimeReached();
+
+            Map<String, Long> projectCounts = 
getQueryHistoryDao().getQueryCountByProject();
             for (ProjectInstance project : projectManager.listAllProjects()) {
                 if (Thread.currentThread().isInterrupted()) {
                     throw new InterruptedException("Thread is interrupted: " + 
Thread.currentThread().getName());
                 }
-                if 
(!EpochManager.getInstance().checkEpochOwner(project.getName()))
-                    continue;
-                try {
-                    long startTime = System.currentTimeMillis();
-                    log.info("Start to delete query histories that are beyond 
max size for project<{}>",
-                            project.getName());
-                    
getQueryHistoryDao().deleteQueryHistoriesIfProjectMaxSizeReached(project.getName());
-                    log.info("Query histories cleanup for project<{}> 
finished, it took {}ms", project.getName(),
-                            System.currentTimeMillis() - startTime);
-                } catch (Exception e) {
-                    log.error("clean query histories<" + project.getName() + 
"> failed", e);
-                }
+                long projectCount = 
projectCounts.getOrDefault(project.getName(), 0L);
+                cleanQueryHistory(project.getName(), projectCount);
             }
         }
     }
 
+    public static void cleanQueryHistory(String projectName, long 
historyCount) {
+        long projectMaxSize = 
KylinConfig.getInstanceFromEnv().getQueryHistoryProjectMaxSize();
+        if (historyCount <= projectMaxSize) {
+            log.info("Query histories of project<{}> is less than the maximum 
limit, so skip it.", projectName);
+            return;
+        }
+        try {
+            StopWatch watch = StopWatch.createStarted();
+            log.info("Start to delete query histories that are beyond max size 
for project<{}>, records:{}",
+                    projectName, historyCount);
+            
getQueryHistoryDao().deleteOldestQueryHistoriesByProject(projectName,
+                    (int) (historyCount - projectMaxSize));
+            watch.stop();
+            log.info("Query histories cleanup for project<{}> finished, it 
took {}ms", projectName, watch.getTime());
+        } catch (Exception e) {
+            log.error("Clean query histories for project<{}> failed", 
projectName, e);
+        }
+    }
+
     private static QueryHistoryDAO getQueryHistoryDao() {
         return RDBMSQueryHistoryDAO.getInstance();
     }
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
index 18443017dc..e6ea6c176c 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
@@ -19,13 +19,16 @@
 package org.apache.kylin.metadata.query;
 
 import static 
org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO.fillZeroForQueryStatistics;
+import static 
org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO.largeSplitToSmallTask;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.junit.TimeZoneTestRunner;
+import org.apache.kylin.metadata.query.util.QueryHisStoreUtil;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -71,7 +74,7 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testGetQueryHistoriesfilterByIsIndexHit() throws Exception {
+    public void testGetQueryHistoriesFilterByIsIndexHit() throws Exception {
         queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, true, 
PROJECT, true));
         queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, false, 
PROJECT, true));
         queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, false, 
PROJECT, true));
@@ -102,7 +105,7 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testGetQueryHistoriesfilterByQueryTime() throws Exception {
+    public void testGetQueryHistoriesFilterByQueryTime() throws Exception {
         // 2020-01-29 23:25:12
         queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, true, 
PROJECT, true));
         // 2020-01-30 23:25:12
@@ -123,7 +126,7 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testGetQueryHistoriesfilterByDuration() throws Exception {
+    public void testGetQueryHistoriesFilterByDuration() throws Exception {
         queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1000L, true, 
PROJECT, true));
         queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 2000L, 
false, PROJECT, true));
         queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 3000L, 
false, PROJECT, true));
@@ -145,7 +148,7 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testGetQueryHistoriesfilterBySql() throws Exception {
+    public void testGetQueryHistoriesFilterBySql() throws Exception {
         QueryMetrics queryMetrics1 = createQueryMetrics(1580311512000L, 1L, 
true, PROJECT, true);
         queryMetrics1.setSql("select 2 LIMIT 500\n");
         queryHistoryDAO.insert(queryMetrics1);
@@ -333,6 +336,38 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals(2, monthQueryStatistics.get(0).getMeanDuration(), 
0.1);
     }
 
+    @Test
+    public void testDeleteQueryHistories() throws Exception {
+        overwriteSystemProp("kylin.query.queryhistory.max-size", "2");
+        overwriteSystemProp("kylin.query.queryhistory.project-max-size", "5");
+
+        String PROJECT_V1 = PROJECT + "_v1";
+
+        // 2020-01-29 23:25:12
+        queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, true, 
PROJECT, true));
+        // 2020-01-30 23:25:12
+        queryHistoryDAO.insert(createQueryMetrics(1580397912000L, 2L, false, 
PROJECT, true));
+        // 2030-01-28 23:25:12
+        queryHistoryDAO.insert(createQueryMetrics(1895844312000L, 3L, false, 
PROJECT_V1, true));
+        // 2030-01-29 23:25:12
+        queryHistoryDAO.insert(createQueryMetrics(1895930712000L, 1L, false, 
PROJECT, true));
+
+        // before delete
+        List<QueryHistory> queryHistoryList = 
queryHistoryDAO.queryQueryHistoriesByIdOffset(0, 100, PROJECT);
+        Assert.assertEquals(3, queryHistoryList.size());
+
+        // after delete
+        QueryHisStoreUtil.cleanQueryHistory();
+
+        queryHistoryList = queryHistoryDAO.queryQueryHistoriesByIdOffset(0, 
100, PROJECT_V1);
+        Assert.assertEquals(1, queryHistoryList.size());
+        Assert.assertEquals(1895844312000L, 
queryHistoryList.get(0).getQueryTime());
+
+        queryHistoryList = queryHistoryDAO.queryQueryHistoriesByIdOffset(0, 
100, PROJECT);
+        Assert.assertEquals(1, queryHistoryList.size());
+        Assert.assertEquals(1895930712000L, 
queryHistoryList.get(0).getQueryTime());
+    }
+
     @Test
     public void testDeleteQueryHistoriesIfRetainTimeReached() throws Exception 
{
         // 2020-01-29 23:25:12
@@ -401,12 +436,12 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals(4, queryHistoryList.size());
 
         // after delete
-        queryHistoryDAO.deleteQueryHistoriesIfProjectMaxSizeReached(PROJECT);
+        QueryHisStoreUtil.cleanQueryHistory(PROJECT, 4);
         queryHistoryList = queryHistoryDAO.getAllQueryHistories();
         Assert.assertEquals(2, queryHistoryList.size());
 
         // test delete empty
-        queryHistoryDAO.deleteQueryHistoriesIfProjectMaxSizeReached(PROJECT);
+        QueryHisStoreUtil.cleanQueryHistory(PROJECT, 2);
         queryHistoryList = queryHistoryDAO.getAllQueryHistories();
         Assert.assertEquals(2, queryHistoryList.size());
     }
@@ -670,12 +705,12 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
 
         Assert.assertEquals(2, queryHistoryList.size());
 
-        Assert.assertEquals(false, 
queryHistoryList.get(0).getQueryHistoryInfo().isExactlyMatch());
+        
Assert.assertFalse(queryHistoryList.get(0).getQueryHistoryInfo().isExactlyMatch());
         Assert.assertEquals(5, 
queryHistoryList.get(0).getQueryHistoryInfo().getScanSegmentNum());
         Assert.assertEquals("PENDING", 
queryHistoryList.get(0).getQueryHistoryInfo().getState().toString());
-        Assert.assertEquals(false, 
queryHistoryList.get(0).getQueryHistoryInfo().isExecutionError());
+        
Assert.assertFalse(queryHistoryList.get(0).getQueryHistoryInfo().isExecutionError());
 
-        Assert.assertEquals(true, 
queryHistoryList.get(1).getQueryHistoryInfo().isExactlyMatch());
+        
Assert.assertTrue(queryHistoryList.get(1).getQueryHistoryInfo().isExactlyMatch());
         Assert.assertEquals(3, 
queryHistoryList.get(1).getQueryHistoryInfo().getScanSegmentNum());
         Assert.assertEquals("PENDING", 
queryHistoryList.get(1).getQueryHistoryInfo().getState().toString());
         
Assert.assertTrue(queryHistoryList.get(1).getQueryHistoryInfo().isExecutionError());
@@ -721,6 +756,23 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals(2L, queryDailyStatistic.get(0).getLt3sNum());
     }
 
+    @Test
+    public void testLargeSplitToSmallTask() {
+        AtomicInteger executions = new AtomicInteger(0);
+        AtomicInteger actualSize = new AtomicInteger(0);
+        largeSplitToSmallTask(105, 10, currentCount -> {
+            executions.incrementAndGet();
+            actualSize.addAndGet(currentCount);
+            if (currentCount < 10) {
+                return currentCount - 1;
+            } else {
+                return currentCount;
+            }
+        }, "Test LargeSplitToSmall Task");
+        Assert.assertEquals(105, actualSize.get());
+        Assert.assertEquals(11, executions.get());
+    }
+
     public static QueryMetrics createQueryMetrics(long queryTime, long 
duration, boolean indexHit, String project,
             boolean hitModel) {
         QueryMetrics queryMetrics = new 
QueryMetrics("6a9a151f-f992-4d52-a8ec-8ff3fd3de6b1", "192.168.1.6:7070");
@@ -753,7 +805,7 @@ public class RDBMSQueryHistoryDaoTest extends 
NLocalFileMetadataTestCase {
             
realizationMetrics.setModelId("82fa7671-a935-45f5-8779-85703601f49a.json");
 
             realizationMetrics.setSnapshots(
-                    Lists.newArrayList(new String[] { 
"DEFAULT.TEST_KYLIN_ACCOUNT", "DEFAULT.TEST_COUNTRY" }));
+                    Lists.newArrayList("DEFAULT.TEST_KYLIN_ACCOUNT", 
"DEFAULT.TEST_COUNTRY"));
 
             List<QueryMetrics.RealizationMetrics> realizationMetricsList = 
Lists.newArrayList();
             realizationMetricsList.add(realizationMetrics);


Reply via email to