This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5e1a385b8cf [fix](statistics)Add synchronize for modify
analysisTaskInfoMap and analysisJobInfoMap. #31940
5e1a385b8cf is described below
commit 5e1a385b8cf479ff353c1223879954c5f85cfc14
Author: Jibing-Li <[email protected]>
AuthorDate: Thu Mar 7 21:25:51 2024 +0800
[fix](statistics)Add synchronize for modify analysisTaskInfoMap and
analysisJobInfoMap. #31940
---
.../apache/doris/statistics/AnalysisManager.java | 48 ++++++++++++++--------
1 file changed, 30 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index b265b88f702..6bdf6fdb771 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -591,14 +591,16 @@ public class AnalysisManager implements Writable {
tbl = StatisticsUtil.findTable(tblName.getCtl(), tblName.getDb(),
tblName.getTbl());
}
long tblId = tbl == null ? -1 : tbl.getId();
- return analysisInfos.stream()
+ synchronized (analysisInfos) {
+ return analysisInfos.stream()
.filter(a -> stmt.getJobId() == 0 || a.jobId ==
stmt.getJobId())
.filter(a -> state == null ||
a.state.equals(AnalysisState.valueOf(state)))
.filter(a -> tblName == null || a.tblId == tblId)
.filter(a -> stmt.isAuto() && a.jobType.equals(JobType.SYSTEM)
- || !stmt.isAuto() &&
a.jobType.equals(JobType.MANUAL))
+ || !stmt.isAuto() && a.jobType.equals(JobType.MANUAL))
.sorted(Comparator.comparingLong(a -> a.jobId))
.collect(Collectors.toList());
+ }
}
public String getJobProgress(long jobId) {
@@ -796,31 +798,39 @@ public class AnalysisManager implements Writable {
}
public void replayCreateAnalysisJob(AnalysisInfo jobInfo) {
- while (analysisJobInfoMap.size() >= Config.analyze_record_limit) {
-
analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey());
- }
- if (jobInfo.message != null && jobInfo.message.length() >=
StatisticConstants.MSG_LEN_UPPER_BOUND) {
- jobInfo.message = jobInfo.message.substring(0,
StatisticConstants.MSG_LEN_UPPER_BOUND);
+ synchronized (analysisJobInfoMap) {
+ while (analysisJobInfoMap.size() >= Config.analyze_record_limit) {
+
analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey());
+ }
+ if (jobInfo.message != null && jobInfo.message.length() >=
StatisticConstants.MSG_LEN_UPPER_BOUND) {
+ jobInfo.message = jobInfo.message.substring(0,
StatisticConstants.MSG_LEN_UPPER_BOUND);
+ }
+ this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo);
}
- this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo);
}
public void replayCreateAnalysisTask(AnalysisInfo taskInfo) {
- while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) {
-
analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey());
- }
- if (taskInfo.message != null && taskInfo.message.length() >=
StatisticConstants.MSG_LEN_UPPER_BOUND) {
- taskInfo.message = taskInfo.message.substring(0,
StatisticConstants.MSG_LEN_UPPER_BOUND);
+ synchronized (analysisTaskInfoMap) {
+ while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) {
+
analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey());
+ }
+ if (taskInfo.message != null && taskInfo.message.length() >=
StatisticConstants.MSG_LEN_UPPER_BOUND) {
+ taskInfo.message = taskInfo.message.substring(0,
StatisticConstants.MSG_LEN_UPPER_BOUND);
+ }
+ this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo);
}
- this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo);
}
public void replayDeleteAnalysisJob(AnalyzeDeletionLog log) {
- this.analysisJobInfoMap.remove(log.id);
+ synchronized (analysisJobInfoMap) {
+ this.analysisJobInfoMap.remove(log.id);
+ }
}
public void replayDeleteAnalysisTask(AnalyzeDeletionLog log) {
- this.analysisTaskInfoMap.remove(log.id);
+ synchronized (analysisTaskInfoMap) {
+ this.analysisTaskInfoMap.remove(log.id);
+ }
}
private static class SyncTaskCollection {
@@ -892,8 +902,10 @@ public class AnalysisManager implements Writable {
}
public void removeAll(List<AnalysisInfo> analysisInfos) {
- for (AnalysisInfo analysisInfo : analysisInfos) {
- analysisTaskInfoMap.remove(analysisInfo.taskId);
+ synchronized (analysisTaskInfoMap) {
+ for (AnalysisInfo analysisInfo : analysisInfos) {
+ analysisTaskInfoMap.remove(analysisInfo.taskId);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]