This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7a0e89bc049 [fix](statistics)Add synchronize for modify
analysisTaskInfoMap and analysisJobInfoMap. #31940 (#31967)
7a0e89bc049 is described below
commit 7a0e89bc0491374c7b9488b367f0af5491fe49b3
Author: Jibing-Li <[email protected]>
AuthorDate: Fri Mar 8 00:19:06 2024 +0800
[fix](statistics)Add synchronize for modify analysisTaskInfoMap and
analysisJobInfoMap. #31940 (#31967)
---
.../apache/doris/statistics/AnalysisManager.java | 48 ++++++++++++++--------
1 file changed, 30 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 804de7facf8..4fbb751034a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -594,14 +594,16 @@ public class AnalysisManager implements Writable {
tbl = StatisticsUtil.findTable(tblName.getCtl(), tblName.getDb(),
tblName.getTbl());
}
long tblId = tbl == null ? -1 : tbl.getId();
- return analysisInfos.stream()
+ synchronized (analysisInfos) {
+ return analysisInfos.stream()
.filter(a -> stmt.getJobId() == 0 || a.jobId ==
stmt.getJobId())
.filter(a -> state == null ||
a.state.equals(AnalysisState.valueOf(state)))
.filter(a -> tblName == null || a.tblId == tblId)
.filter(a -> stmt.isAuto() && a.jobType.equals(JobType.SYSTEM)
- || !stmt.isAuto() &&
a.jobType.equals(JobType.MANUAL))
+ || !stmt.isAuto() && a.jobType.equals(JobType.MANUAL))
.sorted(Comparator.comparingLong(a -> a.jobId))
.collect(Collectors.toList());
+ }
}
public String getJobProgress(long jobId) {
@@ -798,31 +800,39 @@ public class AnalysisManager implements Writable {
}
public void replayCreateAnalysisJob(AnalysisInfo jobInfo) {
- while (analysisJobInfoMap.size() >= Config.analyze_record_limit) {
-
analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey());
- }
- if (jobInfo.message != null && jobInfo.message.length() >=
StatisticConstants.MSG_LEN_UPPER_BOUND) {
- jobInfo.message = jobInfo.message.substring(0,
StatisticConstants.MSG_LEN_UPPER_BOUND);
+ synchronized (analysisJobInfoMap) {
+ while (analysisJobInfoMap.size() >= Config.analyze_record_limit) {
+
analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey());
+ }
+ if (jobInfo.message != null && jobInfo.message.length() >=
StatisticConstants.MSG_LEN_UPPER_BOUND) {
+ jobInfo.message = jobInfo.message.substring(0,
StatisticConstants.MSG_LEN_UPPER_BOUND);
+ }
+ this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo);
}
- this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo);
}
public void replayCreateAnalysisTask(AnalysisInfo taskInfo) {
- while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) {
-
analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey());
- }
- if (taskInfo.message != null && taskInfo.message.length() >=
StatisticConstants.MSG_LEN_UPPER_BOUND) {
- taskInfo.message = taskInfo.message.substring(0,
StatisticConstants.MSG_LEN_UPPER_BOUND);
+ synchronized (analysisTaskInfoMap) {
+ while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) {
+
analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey());
+ }
+ if (taskInfo.message != null && taskInfo.message.length() >=
StatisticConstants.MSG_LEN_UPPER_BOUND) {
+ taskInfo.message = taskInfo.message.substring(0,
StatisticConstants.MSG_LEN_UPPER_BOUND);
+ }
+ this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo);
}
- this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo);
}
public void replayDeleteAnalysisJob(AnalyzeDeletionLog log) {
- this.analysisJobInfoMap.remove(log.id);
+ synchronized (analysisJobInfoMap) {
+ this.analysisJobInfoMap.remove(log.id);
+ }
}
public void replayDeleteAnalysisTask(AnalyzeDeletionLog log) {
- this.analysisTaskInfoMap.remove(log.id);
+ synchronized (analysisTaskInfoMap) {
+ this.analysisTaskInfoMap.remove(log.id);
+ }
}
private static class SyncTaskCollection {
@@ -894,8 +904,10 @@ public class AnalysisManager implements Writable {
}
public void removeAll(List<AnalysisInfo> analysisInfos) {
- for (AnalysisInfo analysisInfo : analysisInfos) {
- analysisTaskInfoMap.remove(analysisInfo.taskId);
+ synchronized (analysisTaskInfoMap) {
+ for (AnalysisInfo analysisInfo : analysisInfos) {
+ analysisTaskInfoMap.remove(analysisInfo.taskId);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]