This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 607887673eb [improvement](report) report handler discard old report
tasks #39469 (#39605)
607887673eb is described below
commit 607887673eb638f765c474971859b2b124226028
Author: yujun <[email protected]>
AuthorDate: Tue Aug 20 17:40:49 2024 +0800
[improvement](report) report handler discard old report tasks #39469
(#39605)
cherry pick from #39469
---
.../org/apache/doris/master/ReportHandler.java | 90 ++++++++++++++++++----
1 file changed, 76 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 29887658f7a..af62171007b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -104,6 +104,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
@@ -111,10 +112,11 @@ import java.util.stream.Collectors;
public class ReportHandler extends Daemon {
private static final Logger LOG =
LogManager.getLogger(ReportHandler.class);
- private BlockingQueue<ReportTask> reportQueue =
Queues.newLinkedBlockingQueue();
+ private BlockingQueue<BackendReportType> reportQueue =
Queues.newLinkedBlockingQueue();
+
+ private Map<BackendReportType, ReportTask> reportTasks = Maps.newHashMap();
private enum ReportType {
- UNKNOWN,
TASK,
DISK,
TABLET
@@ -158,7 +160,7 @@ public class ReportHandler extends Daemon {
Map<Long, Long> partitionsVersion = null;
long reportVersion = -1;
- ReportType reportType = ReportType.UNKNOWN;
+ ReportType reportType = null;
if (request.isSetTasks()) {
tasks = request.getTasks();
@@ -189,8 +191,16 @@ public class ReportHandler extends Daemon {
backend.setTabletMaxCompactionScore(request.getTabletMaxCompactionScore());
}
- ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets,
partitionsVersion, reportVersion,
- request.getStoragePolicy(), request.getResource(),
request.getNumCores(),
+ if (reportType == null) {
+ tStatus.setStatusCode(TStatusCode.INTERNAL_ERROR);
+ tStatus.setErrorMsgs(Lists.newArrayList("unknown report type"));
+ LOG.error("receive unknown report type from be {}. current queue
size: {}",
+ backend.getId(), reportQueue.size());
+ return result;
+ }
+
+ ReportTask reportTask = new ReportTask(beId, reportType, tasks, disks,
tablets, partitionsVersion,
+ reportVersion, request.getStoragePolicy(),
request.getResource(), request.getNumCores(),
request.getPipelineExecutorSize());
try {
putToQueue(reportTask);
@@ -202,8 +212,8 @@ public class ReportHandler extends Daemon {
tStatus.setErrorMsgs(errorMsgs);
return result;
}
- LOG.info("receive report from be {}. type: {}, current queue size: {}",
- backend.getId(), reportType, reportQueue.size());
+ LOG.info("receive report from be {}. type: {}, report version {},
current queue size: {}",
+ backend.getId(), reportType, reportVersion,
reportQueue.size());
return result;
}
@@ -215,7 +225,14 @@ public class ReportHandler extends Daemon {
"the report queue size exceeds the limit: "
+ Config.report_queue_size + ". current: " +
currentSize);
}
- reportQueue.put(reportTask);
+
+ BackendReportType backendReportType = new
BackendReportType(reportTask.beId, reportTask.reportType);
+
+ synchronized (reportTasks) {
+ reportTasks.put(backendReportType, reportTask);
+ }
+
+ reportQueue.put(backendReportType);
}
private Map<Long, TTablet> buildTabletMap(List<TTablet> tabletList) {
@@ -230,9 +247,38 @@ public class ReportHandler extends Daemon {
return tabletMap;
}
+ private class BackendReportType {
+ private long beId;
+ private ReportType reportType;
+
+ public BackendReportType(long beId, ReportType reportType) {
+ this.beId = beId;
+ this.reportType = reportType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(beId, reportType);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof BackendReportType)) {
+ return false;
+ }
+ BackendReportType otherBeReport = (BackendReportType) other;
+ return this.beId == otherBeReport.beId
+ && this.reportType == otherBeReport.reportType;
+ }
+ }
+
private class ReportTask extends MasterTask {
private long beId;
+ private ReportType reportType;
private Map<TTaskType, Set<Long>> tasks;
private Map<String, TDisk> disks;
private Map<Long, TTablet> tablets;
@@ -244,12 +290,13 @@ public class ReportHandler extends Daemon {
private int cpuCores;
private int pipelineExecutorSize;
- public ReportTask(long beId, Map<TTaskType, Set<Long>> tasks,
+ public ReportTask(long beId, ReportType reportType, Map<TTaskType,
Set<Long>> tasks,
Map<String, TDisk> disks, Map<Long, TTablet> tablets,
Map<Long, Long> partitionsVersion, long reportVersion,
List<TStoragePolicy> storagePolicies, List<TStorageResource>
storageResources, int cpuCores,
int pipelineExecutorSize) {
this.beId = beId;
+ this.reportType = reportType;
this.tasks = tasks;
this.disks = disks;
this.tablets = tablets;
@@ -1383,13 +1430,28 @@ public class ReportHandler extends Daemon {
@Override
protected void runOneCycle() {
while (true) {
- ReportTask task = null;
- try {
- task = reportQueue.take();
+ ReportTask task = takeReportTask();
+ if (task != null) {
task.exec();
- } catch (InterruptedException e) {
- LOG.warn("got interupted exception when executing report", e);
}
}
}
+
+ private ReportTask takeReportTask() {
+ BackendReportType backendReportType;
+ try {
+ backendReportType = reportQueue.take();
+ } catch (InterruptedException e) {
+ LOG.warn("got interupted exception when executing report", e);
+ return null;
+ }
+
+ ReportTask task = null;
+ synchronized (reportTasks) {
+ task = reportTasks.get(backendReportType);
+ reportTasks.remove(backendReportType);
+ }
+
+ return task;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]