This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f8a2c66174 [refactor](planner) refactor automatically set instance_num
(#21640)
f8a2c66174 is described below
commit f8a2c66174609f86247b4e61ac83c1bfd0e21255
Author: Mryange <[email protected]>
AuthorDate: Sat Jul 8 21:59:17 2023 +0800
[refactor](planner) refactor automatically set instance_num (#21640)
refactor automatically set instance_num
---
be/src/agent/task_worker_pool.cpp | 7 +-
.../org/apache/doris/master/ReportHandler.java | 41 ++++++---
.../java/org/apache/doris/qe/SessionVariable.java | 6 +-
.../main/java/org/apache/doris/system/Backend.java | 98 +++++++++-------------
.../org/apache/doris/system/SystemInfoService.java | 22 +++--
gensrc/thrift/MasterService.thrift | 1 +
6 files changed, 92 insertions(+), 83 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 030f50b364..0ddb9c3531 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -613,9 +613,10 @@ void
TaskWorkerPool::_report_disk_state_worker_thread_callback() {
disk.__set_used(root_path_info.is_used);
request.disks[root_path_info.path] = disk;
}
- int num_cores = config::pipeline_executor_size > 0 ?
config::pipeline_executor_size
- :
CpuInfo::num_cores();
- request.__set_num_cores(num_cores);
+ request.__set_num_cores(CpuInfo::num_cores());
+ request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
+ ?
config::pipeline_executor_size
+ : CpuInfo::num_cores());
_handle_report(request, ReportType::DISK);
}
StorageEngine::instance()->deregister_report_listener(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index d506eb2432..0dd73b3c72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -181,7 +181,8 @@ public class ReportHandler extends Daemon {
}
ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets,
reportVersion,
- request.getStoragePolicy(), request.getResource());
+ request.getStoragePolicy(), request.getResource(),
request.getNumCores(),
+ request.getPipelineExecutorSize());
try {
putToQueue(reportTask);
} catch (Exception e) {
@@ -192,14 +193,8 @@ public class ReportHandler extends Daemon {
tStatus.setErrorMsgs(errorMsgs);
return result;
}
-
LOG.info("receive report from be {}. type: {}, current queue size: {}",
backend.getId(), reportType, reportQueue.size());
- if (reportType == ReportType.DISK) {
- Backend.BeInfoCollector beinfoCollector =
Backend.getBeInfoCollector();
- int numCores = request.getNumCores();
- beinfoCollector.addBeInfo(beId, numCores);
- }
return result;
}
@@ -236,11 +231,14 @@ public class ReportHandler extends Daemon {
private List<TStoragePolicy> storagePolicies;
private List<TStorageResource> storageResources;
+ private int cpuCores;
+ private int pipelineExecutorSize;
public ReportTask(long beId, Map<TTaskType, Set<Long>> tasks,
- Map<String, TDisk> disks,
- Map<Long, TTablet> tablets, long reportVersion,
- List<TStoragePolicy> storagePolicies,
List<TStorageResource> storageResources) {
+ Map<String, TDisk> disks,
+ Map<Long, TTablet> tablets, long reportVersion,
+ List<TStoragePolicy> storagePolicies, List<TStorageResource>
storageResources, int cpuCores,
+ int pipelineExecutorSize) {
this.beId = beId;
this.tasks = tasks;
this.disks = disks;
@@ -248,6 +246,8 @@ public class ReportHandler extends Daemon {
this.reportVersion = reportVersion;
this.storagePolicies = storagePolicies;
this.storageResources = storageResources;
+ this.cpuCores = cpuCores;
+ this.pipelineExecutorSize = pipelineExecutorSize;
}
@Override
@@ -257,6 +257,7 @@ public class ReportHandler extends Daemon {
}
if (disks != null) {
ReportHandler.diskReport(beId, disks);
+ ReportHandler.cpuReport(beId, cpuCores, pipelineExecutorSize);
}
if (Config.enable_storage_policy && storagePolicies != null &&
storageResources != null) {
storagePolicyReport(beId, storagePolicies, storageResources);
@@ -557,12 +558,30 @@ public class ReportHandler extends Daemon {
LOG.warn("backend doesn't exist. id: " + backendId);
return;
}
-
backend.updateDisks(backendDisks);
LOG.info("finished to handle disk report from backend {}, cost: {} ms",
backendId, (System.currentTimeMillis() - start));
}
+ private static void cpuReport(long backendId, int cpuCores, int
pipelineExecutorSize) {
+ LOG.info("begin to handle cpu report from backend {}", backendId);
+ long start = System.currentTimeMillis();
+ Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
+ if (backend == null) {
+ LOG.warn("backend doesn't exist. id: " + backendId);
+ return;
+ }
+ if (backend.updateCpuInfo(cpuCores, pipelineExecutorSize)) {
+ // cpu info is changed
+ LOG.info("new cpu info. backendId: {}, cpucores: {},
pipelineExecutorSize: {}", backendId, cpuCores,
+ pipelineExecutorSize);
+ // log change
+ Env.getCurrentEnv().getEditLog().logBackendStateChange(backend);
+ }
+ LOG.info("finished to handle cpu report from backend {}, cost: {} ms",
+ backendId, (System.currentTimeMillis() - start));
+ }
+
private static void sync(Map<Long, TTablet> backendTablets,
ListMultimap<Long, Long> tabletSyncMap,
long backendId, long backendReportVersion) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index fde5d0c8af..fb36376c59 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ExperimentalUtil.ExperimentalType;
@@ -28,7 +29,6 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.nereids.metrics.Event;
import org.apache.doris.nereids.metrics.EventSwitchParser;
import org.apache.doris.qe.VariableMgr.VarAttr;
-import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TResourceLimit;
import org.apache.doris.thrift.TRuntimeFilterType;
@@ -1433,8 +1433,8 @@ public class SessionVariable implements Serializable,
Writable {
public int getParallelExecInstanceNum() {
if (enablePipelineEngine && parallelPipelineTaskNum == 0) {
- Backend.BeInfoCollector beinfoCollector =
Backend.getBeInfoCollector();
- return beinfoCollector.getParallelExecInstanceNum();
+ int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
+ return (size + 1) / 2;
} else if (enablePipelineEngine) {
return parallelPipelineTaskNum;
} else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index c6a06d1b13..ba10130d49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -48,7 +48,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -119,6 +118,14 @@ public class Backend implements Writable {
@SerializedName("tagMap")
private Map<String, String> tagMap = Maps.newHashMap();
+ // cpu cores
+ @SerializedName("cpuCores")
+ private int cpuCores = 1;
+
+ // from config::pipeline_executor_size , default equal cpuCores
+ @SerializedName("pipelineExecutorSize")
+ private int pipelineExecutorSize = 1;
+
// Counter of heartbeat failure.
// Once a heartbeat failed, increase this counter by one.
// And if it reaches Config.max_backend_heartbeat_failure_tolerance_count,
this backend
@@ -278,6 +285,14 @@ public class Backend implements Writable {
this.brpcPort = brpcPort;
}
+ public void setCpuCores(int cpuCores) {
+ this.cpuCores = cpuCores;
+ }
+
+ public void setPipelineExecutorSize(int pipelineExecutorSize) {
+ this.pipelineExecutorSize = pipelineExecutorSize;
+ }
+
public long getLastUpdateMs() {
return this.lastUpdateMs;
}
@@ -294,6 +309,14 @@ public class Backend implements Writable {
this.lastStartTime = currentTime;
}
+ public int getCputCores() {
+ return cpuCores;
+ }
+
+ public int getPipelineExecutorSize() {
+ return pipelineExecutorSize;
+ }
+
public long getLastMissingHeartbeatTime() {
return lastMissingHeartbeatTime;
}
@@ -519,6 +542,20 @@ public class Backend implements Writable {
}
}
+ public boolean updateCpuInfo(int cpuCores, int pipelineExecutorSize) {
+ boolean isChanged = false;
+
+ if (this.cpuCores != cpuCores) {
+ this.cpuCores = cpuCores;
+ isChanged = true;
+ }
+ if (this.pipelineExecutorSize != pipelineExecutorSize) {
+ this.pipelineExecutorSize = pipelineExecutorSize;
+ isChanged = true;
+ }
+ return isChanged;
+ }
+
/**
* In old version, there is only one tag for a Backend, and it is a
"location" type tag.
* But in new version, a Backend can have multi tag, so we need to put
locationTag to
@@ -729,63 +766,4 @@ public class Backend implements Writable {
return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() +
"}";
}
- public static BeInfoCollector getBeInfoCollector() {
- return BeInfoCollector.get();
- }
-
- public static class BeInfoCollector {
- private int numCores = 1;
- private static volatile BeInfoCollector instance = null;
- private static final Map<Long, BeInfoCollector> Info = new
ConcurrentHashMap<>();
-
- private BeInfoCollector(int numCores) {
- this.numCores = numCores;
- }
-
- public static BeInfoCollector get() {
- if (instance == null) {
- synchronized (BeInfoCollector.class) {
- if (instance == null) {
- instance = new BeInfoCollector(Integer.MAX_VALUE);
- }
- }
- }
- return instance;
- }
-
- public int getNumCores() {
- return numCores;
- }
-
- public void clear() {
- Info.clear();
- }
-
- public void addBeInfo(long beId, int numCores) {
- Info.put(beId, new BeInfoCollector(numCores));
- }
-
- public void dropBeInfo(long beId) {
- Info.remove(beId);
- }
-
- public int getMinNumCores() {
- int minNumCores = Integer.MAX_VALUE;
- for (BeInfoCollector beinfo : Info.values()) {
- minNumCores = Math.min(minNumCores, beinfo.getNumCores());
- }
- return Math.max(1, minNumCores);
- }
-
- public int getParallelExecInstanceNum() {
- if (getMinNumCores() == Integer.MAX_VALUE) {
- return 1;
- }
- return (getMinNumCores() + 1) / 2;
- }
-
- public BeInfoCollector getBeInfoCollectorById(long beId) {
- return Info.get(beId);
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index f93f906cc3..12483ff2f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -247,9 +247,6 @@ public class SystemInfoService {
throw new DdlException("Backend[" + backendId + "] does not
exist");
}
dropBackend(backend.getHost(), backend.getHeartbeatPort());
- // update BeInfoCollector
- Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
- beinfoCollector.dropBeInfo(backendId);
}
// final entry of dropping backend
@@ -770,9 +767,6 @@ public class SystemInfoService {
ImmutableMap<Long, AtomicLong> newIdToReportVersion =
ImmutableMap.copyOf(copiedReportVersions);
idToReportVersionRef = newIdToReportVersion;
- // update BeInfoCollector
- Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
- beinfoCollector.dropBeInfo(backend.getId());
}
public void updateBackendState(Backend be) {
@@ -791,6 +785,8 @@ public class SystemInfoService {
memoryBe.setLastUpdateMs(be.getLastUpdateMs());
memoryBe.setLastStartTime(be.getLastStartTime());
memoryBe.setDisks(be.getDisks());
+ memoryBe.setCpuCores(be.getCputCores());
+ memoryBe.setPipelineExecutorSize(be.getPipelineExecutorSize());
}
}
@@ -963,4 +959,18 @@ public class SystemInfoService {
List<Backend> bes = getMixBackends();
return bes.stream().filter(b ->
b.getLocationTag().equals(tag)).collect(Collectors.toList());
}
+
+ public int getMinPipelineExecutorSize() {
+ if (idToBackendRef.size() == 0) {
+ return 1;
+ }
+ int minPipelineExecutorSize = Integer.MAX_VALUE;
+ for (Backend be : idToBackendRef.values()) {
+ int size = be.getPipelineExecutorSize();
+ if (size > 0) {
+ minPipelineExecutorSize = Math.min(minPipelineExecutorSize,
size);
+ }
+ }
+ return minPipelineExecutorSize;
+ }
}
diff --git a/gensrc/thrift/MasterService.thrift
b/gensrc/thrift/MasterService.thrift
index 18d95b3854..3643df178b 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -102,6 +102,7 @@ struct TReportRequest {
9: optional list<AgentService.TStoragePolicy> storage_policy // only id
and version
10: optional list<AgentService.TStorageResource> resource // only id and
version
11: i32 num_cores
+ 12: i32 pipeline_executor_size
}
struct TMasterResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]