This is an automated email from the ASF dual-hosted git repository.
starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ec0cedab51 [opt](stats) Use single connect context for each olap
analyze task
ec0cedab51 is described below
commit ec0cedab5174bf6bb3424e2796595f7d7619b160
Author: AKIRA <[email protected]>
AuthorDate: Thu Aug 10 15:04:28 2023 +0800
[opt](stats) Use single connect context for each olap analyze task
1. add some comment
2. Fix potential NPE caused by deleting a running analyze job
3. Use single connect context for each olap analyze task
---
.../apache/doris/statistics/AnalysisManager.java | 14 +++++--
.../apache/doris/statistics/OlapAnalysisTask.java | 44 ++++++++++------------
.../doris/statistics/AnalysisTaskExecutorTest.java | 2 +-
3 files changed, 31 insertions(+), 29 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index cb4d9eb034..b5c6ebf602 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -96,17 +96,23 @@ public class AnalysisManager extends Daemon implements
Writable {
private static final Logger LOG =
LogManager.getLogger(AnalysisManager.class);
- private ConcurrentMap<Long, Map<Long, BaseAnalysisTask>>
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
+ // Tracking running manually submitted async tasks, keep in mem only
+ private final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>>
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
private StatisticsCache statisticsCache;
private AnalysisTaskExecutor taskExecutor;
+ // Store task information in metadata.
private final Map<Long, AnalysisInfo> analysisTaskInfoMap =
Collections.synchronizedMap(new TreeMap<>());
+
+ // Store job information in metadata
private final Map<Long, AnalysisInfo> analysisJobInfoMap =
Collections.synchronizedMap(new TreeMap<>());
+ // Tracking system submitted job, keep in mem only
private final Map<Long, AnalysisInfo> systemJobInfoMap = new
ConcurrentHashMap<>();
+ // Tracking and control sync analyze tasks, keep in mem only
private final ConcurrentMap<ConnectContext, SyncTaskCollection>
ctxToSyncTask = new ConcurrentHashMap<>();
private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w
-> {
@@ -127,6 +133,10 @@ public class AnalysisManager extends Daemon implements
Writable {
}
info.lastExecTimeInMs = time;
AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
+ // Job may get deleted during execution.
+ if (job == null) {
+ return null;
+ }
// Synchronize the job state change in job level.
synchronized (job) {
job.lastExecTimeInMs = time;
@@ -333,8 +343,6 @@ public class AnalysisManager extends Daemon implements
Writable {
if (!isSync) {
persistAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
- }
- if (!isSync) {
try {
updateTableStats(jobInfo);
} catch (Throwable e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index 257708de54..71b1191565 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -67,7 +67,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
params.put("colName", String.valueOf(info.colName));
params.put("tblName", String.valueOf(info.tblName));
params.put("sampleExpr", getSampleExpression());
- List<String> partitionAnalysisSQLs = new ArrayList<>();
+ List<String> sqls = new ArrayList<>();
try {
tbl.readLock();
Set<String> partNames = info.colToPartitions.get(info.colName);
@@ -80,46 +80,40 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
// Avoid error when get the default partition
params.put("partName", "`" + partName + "`");
StringSubstitutor stringSubstitutor = new
StringSubstitutor(params);
-
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
+
sqls.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
}
} finally {
tbl.readUnlock();
}
- execSQLs(partitionAnalysisSQLs);
params.remove("partId");
params.put("type", col.getType().toString());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
- execSQL(sql);
+ sqls.add(sql);
+ execSQLs(sqls);
}
@VisibleForTesting
- public void execSQLs(List<String> partitionAnalysisSQLs) throws Exception {
- for (String sql : partitionAnalysisSQLs) {
- execSQL(sql);
- }
- }
-
- @VisibleForTesting
- public void execSQL(String sql) throws Exception {
- if (killed) {
- return;
- }
+ public void execSQLs(List<String> sqls) throws Exception {
long startTime = System.currentTimeMillis();
- LOG.info("ANALYZE SQL : " + sql + " start at " + startTime);
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext())
{
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
- stmtExecutor = new StmtExecutor(r.connectContext, sql);
- r.connectContext.setExecutor(stmtExecutor);
- stmtExecutor.execute();
- QueryState queryState = r.connectContext.getState();
- if (queryState.getStateType().equals(MysqlStateType.ERR)) {
- throw new RuntimeException(String.format("Failed to analyze
%s.%s.%s, error: %s sql: %s",
- info.catalogName, info.dbName, info.colName, sql,
queryState.getErrorMessage()));
+ for (String sql : sqls) {
+ if (killed) {
+ return;
+ }
+ LOG.info("ANALYZE SQL : " + sql + " start at " + startTime);
+ stmtExecutor = new StmtExecutor(r.connectContext, sql);
+ r.connectContext.setExecutor(stmtExecutor);
+ stmtExecutor.execute();
+ QueryState queryState = r.connectContext.getState();
+ if (queryState.getStateType().equals(MysqlStateType.ERR)) {
+ throw new RuntimeException(String.format("Failed to
analyze %s.%s.%s, error: %s sql: %s",
+ info.catalogName, info.dbName, info.colName, sql,
queryState.getErrorMessage()));
+ }
}
} finally {
- LOG.info("Analyze SQL: " + sql + " cost time: " +
(System.currentTimeMillis() - startTime) + "ms");
+ LOG.debug("Analyze SQL: " + sqls + " cost time: " +
(System.currentTimeMillis() - startTime) + "ms");
}
}
-
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index 7bbaf9b902..574c96c73d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -89,7 +89,7 @@ public class AnalysisTaskExecutorTest extends
TestWithFeService {
new MockUp<OlapAnalysisTask>() {
@Mock
- public void execSQL(String sql) throws Exception {
+ public void execSQLs(List<String> sqls) throws Exception {
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]