This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 1c566253a81 [Pick][Improment]Query queued by be memory (#37559)
(#39733)
1c566253a81 is described below
commit 1c566253a819ffa6b91eb8fb2925b792520bff26
Author: wangbo <[email protected]>
AuthorDate: Thu Aug 22 15:14:47 2024 +0800
[Pick][Improment]Query queued by be memory (#37559) (#39733)
pick #37559
---
be/src/service/backend_service.cpp | 11 ++
be/src/service/backend_service.h | 3 +
.../main/java/org/apache/doris/common/Config.java | 11 ++
.../main/java/org/apache/doris/catalog/Env.java | 9 ++
.../main/java/org/apache/doris/qe/Coordinator.java | 43 ++++--
.../java/org/apache/doris/qe/QeProcessorImpl.java | 7 +-
.../apache/doris/resource/AdmissionControl.java | 156 +++++++++++++++++++++
.../doris/resource/workloadgroup/QueryQueue.java | 101 +++++++------
.../doris/resource/workloadgroup/QueueToken.java | 152 +++++++-------------
.../resource/workloadgroup/WorkloadGroup.java | 6 +-
.../doris/tablefunction/MetadataGenerator.java | 11 +-
.../org/apache/doris/common/GenericPoolTest.java | 7 +
.../apache/doris/utframe/MockedBackendFactory.java | 7 +
gensrc/thrift/BackendService.thrift | 15 ++
.../workload_manager_p0/test_curd_wlg.groovy | 4 +-
15 files changed, 367 insertions(+), 176 deletions(-)
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index e26264b1a22..c30b936769a 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -1110,4 +1110,15 @@ void
BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
break;
}
}
+
+void BackendService::get_be_resource(TGetBeResourceResult& result,
+ const TGetBeResourceRequest& request) {
+ int64_t mem_usage = PerfCounters::get_vm_rss();
+ int64_t mem_limit = MemInfo::mem_limit();
+ TGlobalResourceUsage global_resource_usage;
+ global_resource_usage.__set_mem_limit(mem_limit);
+ global_resource_usage.__set_mem_usage(mem_usage);
+ result.__set_global_resource_usage(global_resource_usage);
+}
+
} // namespace doris
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 9d53ec4bc45..bbcf103167f 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -140,6 +140,9 @@ public:
void query_ingest_binlog(TQueryIngestBinlogResult& result,
const TQueryIngestBinlogRequest& request)
override;
+ void get_be_resource(TGetBeResourceResult& result,
+ const TGetBeResourceRequest& request) override;
+
private:
Status start_plan_fragment_execution(const TExecPlanFragmentParams&
exec_params);
ExecEnv* _exec_env = nullptr;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6a0394a0be7..5e0cb99b638 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1782,6 +1782,17 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
public static boolean enable_cpu_hard_limit = false;
+ @ConfField(mutable = true, description = {
+ "当BE内存用量大于该值时,查询会进入排队逻辑,默认值为-1,代表该值不生效。取值范围0~1的小数",
+ "When be memory usage bigger than this value, query could queue, "
+ + "default value is -1, means this value not work. Decimal
value range from 0 to 1"})
+ public static double query_queue_by_be_used_memory = -1;
+
+ @ConfField(mutable = true, description = {"基于内存反压场景FE定时拉取BE内存用量的时间间隔",
+ "In the scenario of memory backpressure, "
+ + "the time interval for obtaining BE memory usage at
regular intervals"})
+ public static long get_be_resource_usage_interval_ms = 10000;
+
@ConfField(mutable = false, masterOnly = true)
public static int backend_rpc_timeout_ms = 60000; // 1 min
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 785b7add3a0..d6af35c5bf1 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -242,6 +242,7 @@ import org.apache.doris.qe.JournalObservable;
import org.apache.doris.qe.QueryCancelWorker;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
+import org.apache.doris.resource.AdmissionControl;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
@@ -515,6 +516,8 @@ public class Env {
private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr;
+ private AdmissionControl admissionControl;
+
private QueryStats queryStats;
private StatisticsCleaner statisticsCleaner;
@@ -772,6 +775,7 @@ public class Env {
this.workloadGroupMgr = new WorkloadGroupMgr();
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr();
+ this.admissionControl = new AdmissionControl(systemInfo);
this.queryStats = new QueryStats();
this.loadManagerAdapter = new LoadManagerAdapter();
this.hiveTransactionMgr = new HiveTransactionMgr();
@@ -883,6 +887,10 @@ public class Env {
return workloadRuntimeStatusMgr;
}
+ public AdmissionControl getAdmissionControl() {
+ return admissionControl;
+ }
+
public ExternalMetaIdMgr getExternalMetaIdMgr() {
return externalMetaIdMgr;
}
@@ -1747,6 +1755,7 @@ public class Env {
workloadGroupMgr.start();
workloadSchedPolicyMgr.start();
workloadRuntimeStatusMgr.start();
+ admissionControl.start();
splitSourceManager.start();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 19ca1050469..d676ec5b908 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -62,6 +62,7 @@ import org.apache.doris.planner.ResultSink;
import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.SchemaScanNode;
import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.UnionNode;
@@ -645,6 +646,22 @@ public class Coordinator implements CoordInterface {
return fragmentParams;
}
+ private boolean shouldQueue() {
+ boolean ret = Config.enable_query_queue &&
!context.getSessionVariable()
+ .getBypassWorkloadGroup() && !isQueryCancelled();
+ if (!ret) {
+ return false;
+ }
+ // a query with ScanNode need not queue only when all its scan node is
SchemaScanNode
+ for (ScanNode scanNode : this.scanNodes) {
+ boolean isSchemaScanNode = scanNode instanceof SchemaScanNode;
+ if (!isSchemaScanNode) {
+ return true;
+ }
+ }
+ return false;
+ }
+
// Initiate asynchronous execution of query. Returns as soon as all plan
fragments
// have started executing at their respective backends.
// 'Request' must contain at least a coordinator plan fragment (ie, can't
@@ -656,8 +673,7 @@ public class Coordinator implements CoordInterface {
if (context != null) {
if (Config.enable_workload_group) {
this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
- boolean shouldQueue = Config.enable_query_queue &&
!context.getSessionVariable()
- .getBypassWorkloadGroup() && !isQueryCancelled();
+ boolean shouldQueue = this.shouldQueue();
if (shouldQueue) {
queryQueue =
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
if (queryQueue == null) {
@@ -666,11 +682,8 @@ public class Coordinator implements CoordInterface {
throw new UserException("could not find query queue");
}
queueToken = queryQueue.getToken();
- if
(!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) {
- LOG.error("query (id=" + DebugUtil.printId(queryId) +
") " + queueToken.getOfferResultDetail());
- queryQueue.returnToken(queueToken);
- throw new
UserException(queueToken.getOfferResultDetail());
- }
+ queueToken.get(DebugUtil.printId(queryId),
+ this.queryOptions.getExecutionTimeout() * 1000);
}
} else {
context.setWorkloadGroupName("");
@@ -681,16 +694,22 @@ public class Coordinator implements CoordInterface {
@Override
public void close() {
- for (ScanNode scanNode : scanNodes) {
- scanNode.stop();
- }
+ // NOTE: all close method should be no exception
if (queryQueue != null && queueToken != null) {
try {
- queryQueue.returnToken(queueToken);
+ queryQueue.releaseAndNotify(queueToken);
} catch (Throwable t) {
LOG.error("error happens when coordinator close ", t);
}
}
+
+ try {
+ for (ScanNode scanNode : scanNodes) {
+ scanNode.stop();
+ }
+ } catch (Throwable t) {
+ LOG.error("error happens when scannode stop ", t);
+ }
}
private void execInternal() throws Exception {
@@ -1516,7 +1535,7 @@ public class Coordinator implements CoordInterface {
public void cancel() {
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel");
if (queueToken != null) {
- queueToken.signalForCancel();
+ queueToken.cancel();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 26bb3d95db3..1ec23257749 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -25,7 +25,6 @@ import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.metric.MetricRepo;
-import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
@@ -333,11 +332,11 @@ public final class QeProcessorImpl implements QeProcessor
{
return -1;
}
- public TokenState getQueueStatus() {
+ public String getQueueStatus() {
if (coord.getQueueToken() != null) {
- return coord.getQueueToken().getTokenState();
+ return coord.getQueueToken().getQueueMsg();
}
- return null;
+ return "";
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
new file mode 100644
index 00000000000..480afcde5b6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/AdmissionControl.java
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.resource.workloadgroup.QueueToken;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TGetBeResourceRequest;
+import org.apache.doris.thrift.TGetBeResourceResult;
+import org.apache.doris.thrift.TGlobalResourceUsage;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class AdmissionControl extends MasterDaemon {
+
+ public static final Logger LOG =
LogManager.getLogger(AdmissionControl.class);
+
+ private volatile boolean isAllBeMemoryEnough = true;
+
+ private double currentMemoryLimit = 0;
+
+ private SystemInfoService clusterInfoService;
+
+ public AdmissionControl(SystemInfoService clusterInfoService) {
+ super("get-be-resource-usage-thread",
Config.get_be_resource_usage_interval_ms);
+ this.clusterInfoService = clusterInfoService;
+ }
+
+ private ConcurrentLinkedQueue<QueueToken> queryWaitQueue = new
ConcurrentLinkedQueue();
+
+ public void addQueueToken(QueueToken queryQueue) {
+ queryWaitQueue.offer(queryQueue);
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ getBeMemoryUsage();
+ notifyWaitQuery();
+ }
+
+ public void getBeMemoryUsage() {
+ if (Config.query_queue_by_be_used_memory < 0) {
+ this.isAllBeMemoryEnough = true;
+ return;
+ }
+ Collection<Backend> backends =
clusterInfoService.getIdToBackend().values();
+ this.currentMemoryLimit = Config.query_queue_by_be_used_memory;
+ boolean tmpIsAllBeMemoryEnough = true;
+ for (Backend be : backends) {
+ if (!be.isAlive()) {
+ continue;
+ }
+ TNetworkAddress address = null;
+ BackendService.Client client = null;
+ TGetBeResourceResult result = null;
+ boolean rpcOk = true;
+ try {
+ address = new TNetworkAddress(be.getHost(), be.getBePort());
+ client = ClientPool.backendPool.borrowObject(address, 5000);
+ result = client.getBeResource(new TGetBeResourceRequest());
+ } catch (Throwable t) {
+ rpcOk = false;
+ LOG.warn("get be {} resource failed, ", be.getHost(), t);
+ } finally {
+ try {
+ if (rpcOk) {
+ ClientPool.backendPool.returnObject(address, client);
+ } else {
+ ClientPool.backendPool.invalidateObject(address,
client);
+ }
+ } catch (Throwable e) {
+ LOG.warn("return rpc client failed. related backend[{}]",
be.getHost(),
+ e);
+ }
+ }
+ if (result != null && result.isSetGlobalResourceUsage()) {
+ TGlobalResourceUsage globalResourceUsage =
result.getGlobalResourceUsage();
+ if (globalResourceUsage != null &&
globalResourceUsage.isSetMemLimit()
+ && globalResourceUsage.isSetMemUsage()) {
+ long memUsageL = globalResourceUsage.getMemUsage();
+ long memLimitL = globalResourceUsage.getMemLimit();
+ double memUsage =
Double.valueOf(String.valueOf(memUsageL));
+ double memLimit =
Double.valueOf(String.valueOf(memLimitL));
+ double memUsagePercent = memUsage / memLimit;
+
+ if (memUsagePercent > this.currentMemoryLimit) {
+ tmpIsAllBeMemoryEnough = false;
+ }
+ LOG.debug(
+ "be ip:{}, mem limit:{}, mem usage:{}, mem usage
percent:{}, "
+ + "query queue mem:{}, query wait size:{}",
+ be.getHost(), memLimitL, memUsageL,
memUsagePercent, this.currentMemoryLimit,
+ this.queryWaitQueue.size());
+ }
+ }
+ }
+ this.isAllBeMemoryEnough = tmpIsAllBeMemoryEnough;
+ }
+
+ public void notifyWaitQuery() {
+ if (!isAllBeMemoryEnough()) {
+ return;
+ }
+ int waitQueryCountSnapshot = queryWaitQueue.size();
+ Iterator<QueueToken> queueTokenIterator = queryWaitQueue.iterator();
+ while (waitQueryCountSnapshot > 0 && queueTokenIterator.hasNext()) {
+ QueueToken queueToken = queueTokenIterator.next();
+ queueToken.notifyWaitQuery();
+ waitQueryCountSnapshot--;
+ }
+ }
+
+ public void removeQueueToken(QueueToken queueToken) {
+ queryWaitQueue.remove(queueToken);
+ }
+
+ public boolean isAllBeMemoryEnough() {
+ return isAllBeMemoryEnough;
+ }
+
+ //TODO(wb): add more resource type
+ public boolean checkResourceAvailable(QueueToken queueToken) {
+ if (isAllBeMemoryEnough()) {
+ return true;
+ } else {
+ queueToken.setQueueMsg("WAIT_BE_MEMORY");
+ return false;
+ }
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
index 5953edbf66e..ba2d2526f2e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java
@@ -17,14 +17,18 @@
package org.apache.doris.resource.workloadgroup;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
+import org.apache.doris.resource.AdmissionControl;
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
-import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.LinkedList;
import java.util.PriorityQueue;
+import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;
// note(wb) refer java BlockingQueue, but support altering capacity
@@ -38,8 +42,6 @@ public class QueryQueue {
private int maxConcurrency;
private int maxQueueSize;
private int queueTimeout; // ms
- // running property
- private volatile int currentRunningQueryNum;
public static final String RUNNING_QUERY_NUM = "running_query_num";
public static final String WAITING_QUERY_NUM = "waiting_query_num";
@@ -48,16 +50,13 @@ public class QueryQueue {
private long propVersion;
- private PriorityQueue<QueueToken> priorityTokenQueue;
+ private PriorityQueue<QueueToken> waitingQueryQueue;
+ private Queue<QueueToken> runningQueryQueue;
- int getCurrentRunningQueryNum() {
- return currentRunningQueryNum;
- }
-
- int getCurrentWaitingQueryNum() {
+ Pair<Integer, Integer> getQueryQueueDetail() {
try {
queueLock.lock();
- return priorityTokenQueue.size();
+ return Pair.of(runningQueryQueue.size(), waitingQueryQueue.size());
} finally {
queueLock.unlock();
}
@@ -89,36 +88,47 @@ public class QueryQueue {
this.maxQueueSize = maxQueueSize;
this.queueTimeout = queueTimeout;
this.propVersion = propVersion;
- this.priorityTokenQueue = new PriorityQueue<QueueToken>();
+ this.waitingQueryQueue = new PriorityQueue<QueueToken>();
+ this.runningQueryQueue = new LinkedList<QueueToken>();
}
public String debugString() {
return "wgId= " + wgId + ", version=" + this.propVersion +
",maxConcurrency=" + maxConcurrency
- + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" +
queueTimeout
- + ", currentRunningQueryNum=" + currentRunningQueryNum
- + ", currentWaitingQueryNum=" + priorityTokenQueue.size();
+ + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" +
queueTimeout + ", currentRunningQueryNum="
+ + runningQueryQueue.size() + ", currentWaitingQueryNum=" +
waitingQueryQueue.size();
}
public QueueToken getToken() throws UserException {
+ AdmissionControl admissionControl =
Env.getCurrentEnv().getAdmissionControl();
queueLock.lock();
try {
if (LOG.isDebugEnabled()) {
LOG.info(this.debugString());
}
- if (currentRunningQueryNum < maxConcurrency) {
- currentRunningQueryNum++;
- QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN,
queueTimeout, "offer success");
- retToken.setQueueTimeWhenOfferSuccess();
- return retToken;
- }
- if (priorityTokenQueue.size() >= maxQueueSize) {
+ QueueToken queueToken = new QueueToken(queueTimeout, this);
+
+ boolean isReachMaxCon = runningQueryQueue.size() >= maxConcurrency;
+ boolean isResourceAvailable =
admissionControl.checkResourceAvailable(queueToken);
+ if (!isReachMaxCon && isResourceAvailable) {
+ runningQueryQueue.offer(queueToken);
+ queueToken.complete();
+ return queueToken;
+ } else if (waitingQueryQueue.size() >= maxQueueSize) {
throw new UserException("query waiting queue is full, queue
length=" + maxQueueSize);
+ } else {
+ if (isReachMaxCon) {
+ queueToken.setQueueMsg("WAIT_IN_QUEUE");
+ }
+ queueToken.setTokenState(TokenState.ENQUEUE_SUCCESS);
+ this.waitingQueryQueue.offer(queueToken);
+ // if a query is added to wg's queue but not in
AdmissionControl's
+ // queue may be blocked by be memory later,
+ // then we should put query to AdmissionControl in
releaseAndNotify, it's too complicated.
+ // To simplify the code logic, put all waiting query to
AdmissionControl,
+ // waiting query can be notified when query finish or memory
is enough.
+ admissionControl.addQueueToken(queueToken);
}
- QueueToken newQueryToken = new
QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout,
- "query wait timeout " + queueTimeout + " ms");
- newQueryToken.setQueueTimeWhenQueueSuccess();
- this.priorityTokenQueue.offer(newQueryToken);
- return newQueryToken;
+ return queueToken;
} finally {
if (LOG.isDebugEnabled()) {
LOG.info(this.debugString());
@@ -127,35 +137,36 @@ public class QueryQueue {
}
}
- // If the token is acquired and do work success, then call this method to
release it.
- public void returnToken(QueueToken token) {
+ public void notifyWaitQuery() {
+ releaseAndNotify(null);
+ }
+
+ public void releaseAndNotify(QueueToken releaseToken) {
+ AdmissionControl admissionControl =
Env.getCurrentEnv().getAdmissionControl();
queueLock.lock();
try {
- // If current token is not in ready to run state, then it is still
in the queue
- // it is not running, just remove it.
- if (!token.isReadyToRun()) {
- this.priorityTokenQueue.remove(token);
- return;
- }
- currentRunningQueryNum--;
- Preconditions.checkArgument(currentRunningQueryNum >= 0);
- // If return token and find user changed concurrency num, then
maybe need signal
- // more tokens.
- while (currentRunningQueryNum < maxConcurrency) {
- QueueToken nextToken = this.priorityTokenQueue.poll();
- if (nextToken != null) {
- if (nextToken.signal()) {
- ++currentRunningQueryNum;
- }
+ runningQueryQueue.remove(releaseToken);
+ waitingQueryQueue.remove(releaseToken);
+ admissionControl.removeQueueToken(releaseToken);
+ while (runningQueryQueue.size() < maxConcurrency) {
+ QueueToken queueToken = waitingQueryQueue.peek();
+ if (queueToken == null) {
+ break;
+ }
+ if (admissionControl.checkResourceAvailable(queueToken)) {
+ queueToken.complete();
+ runningQueryQueue.offer(queueToken);
+ waitingQueryQueue.remove();
+ admissionControl.removeQueueToken(queueToken);
} else {
break;
}
}
} finally {
+ queueLock.unlock();
if (LOG.isDebugEnabled()) {
LOG.info(this.debugString());
}
- queueLock.unlock();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
index 0a982b81236..748c0c21bda 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java
@@ -17,13 +17,16 @@
package org.apache.doris.resource.workloadgroup;
+import org.apache.doris.common.UserException;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
// used to mark QueryQueue offer result
// if offer failed, then need to cancel query
@@ -38,134 +41,79 @@ public class QueueToken implements Comparable<QueueToken> {
public enum TokenState {
ENQUEUE_SUCCESS,
- READY_TO_RUN,
- CANCELLED
+ READY_TO_RUN
}
static AtomicLong tokenIdGenerator = new AtomicLong(0);
private long tokenId = 0;
- private TokenState tokenState;
+ private volatile TokenState tokenState;
private long queueWaitTimeout = 0;
- private String offerResultDetail;
+ private long queueStartTime = -1;
+ private long queueEndTime = -1;
- private boolean isTimeout = false;
+ private volatile String queueMsg = "";
- private final ReentrantLock tokenLock = new ReentrantLock();
- private final Condition tokenCond = tokenLock.newCondition();
+ QueryQueue queryQueue = null;
- private long queueStartTime = -1;
- private long queueEndTime = -1;
+ // Object is just a placeholder, it's meaningless now
+ private CompletableFuture<Object> future;
- public QueueToken(TokenState tokenState, long queueWaitTimeout,
- String offerResultDetail) {
+ public QueueToken(long queueWaitTimeout, QueryQueue queryQueue) {
this.tokenId = tokenIdGenerator.addAndGet(1);
- this.tokenState = tokenState;
this.queueWaitTimeout = queueWaitTimeout;
- this.offerResultDetail = offerResultDetail;
+ this.queueStartTime = System.currentTimeMillis();
+ this.queryQueue = queryQueue;
+ this.future = new CompletableFuture<>();
}
- public boolean waitSignal(long queryTimeoutMillis) throws
InterruptedException {
- this.tokenLock.lock();
- try {
- if (isTimeout) {
- return false;
- }
- if (tokenState == TokenState.READY_TO_RUN) {
- return true;
- }
- // If query timeout is less than queue wait timeout, then should
use
- // query timeout as wait timeout
- long waitTimeout = queryTimeoutMillis > queueWaitTimeout ?
queueWaitTimeout : queryTimeoutMillis;
- tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS);
- if (tokenState == TokenState.CANCELLED) {
- this.offerResultDetail = "query is cancelled in queue";
- return false;
- }
- // If wait timeout and is steal not ready to run, then return false
- if (tokenState != TokenState.READY_TO_RUN) {
- LOG.warn("wait in queue timeout, timeout = {}", waitTimeout);
- isTimeout = true;
- return false;
- } else {
- return true;
- }
- } catch (Throwable t) {
- LOG.warn("meet execption when wait for signal", t);
- // If any exception happens, set isTimeout to true and return false
- // Then the caller will call returnToken to queue normally.
- offerResultDetail = "meet exeption when wait for signal";
- isTimeout = true;
- return false;
- } finally {
- this.tokenLock.unlock();
- this.setQueueTimeWhenQueueEnd();
- }
+ public void setQueueMsg(String msg) {
+ this.queueMsg = msg;
}
- public void signalForCancel() {
- this.tokenLock.lock();
- try {
- if (this.tokenState == TokenState.ENQUEUE_SUCCESS) {
- tokenCond.signal();
- this.tokenState = TokenState.CANCELLED;
- }
- } catch (Throwable t) {
- LOG.warn("error happens when signal for cancel", t);
- } finally {
- this.tokenLock.unlock();
- }
+ public void setTokenState(TokenState tokenState) {
+ this.tokenState = tokenState;
}
- public boolean signal() {
- this.tokenLock.lock();
+ public String getQueueMsg() {
+ return queueMsg;
+ }
+
+ public void get(String queryId, int queryTimeout) throws UserException {
+ if (isReadyToRun()) {
+ return;
+ }
+ long waitTimeout = queueWaitTimeout > 0 ? Math.min(queueWaitTimeout,
queryTimeout) : queryTimeout;
+ waitTimeout = waitTimeout <= 0 ? 4096 : waitTimeout;
try {
- // If current token is not ENQUEUE_SUCCESS, then it maybe has error
- // not run it any more.
- if (this.tokenState != TokenState.ENQUEUE_SUCCESS || isTimeout) {
- return false;
- }
- this.tokenState = TokenState.READY_TO_RUN;
- tokenCond.signal();
- return true;
+ future.get(waitTimeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ throw new UserException("query queue timeout, timeout: " +
waitTimeout + " ms ");
+ } catch (CancellationException e) {
+ throw new UserException("query is cancelled");
} catch (Throwable t) {
- isTimeout = true;
- offerResultDetail = "meet exception when signal";
- LOG.warn("failed to signal token", t);
- return false;
- } finally {
- this.tokenLock.unlock();
+ String errMsg = String.format("error happens when query {} queue",
queryId);
+ LOG.error(errMsg, t);
+ throw new RuntimeException(errMsg, t);
}
}
- public String getOfferResultDetail() {
- return offerResultDetail;
- }
-
- public boolean isReadyToRun() {
- return this.tokenState == TokenState.READY_TO_RUN;
- }
-
- public boolean isCancelled() {
- return this.tokenState == TokenState.CANCELLED;
- }
-
- public void setQueueTimeWhenOfferSuccess() {
- long currentTime = System.currentTimeMillis();
- this.queueStartTime = currentTime;
- this.queueEndTime = currentTime;
+ public void complete() {
+ this.queueEndTime = System.currentTimeMillis();
+ this.tokenState = TokenState.READY_TO_RUN;
+ this.setQueueMsg("RUNNING");
+ future.complete(null);
}
- public void setQueueTimeWhenQueueSuccess() {
- long currentTime = System.currentTimeMillis();
- this.queueStartTime = currentTime;
+ public void notifyWaitQuery() {
+ this.queryQueue.notifyWaitQuery();
}
- public void setQueueTimeWhenQueueEnd() {
- this.queueEndTime = System.currentTimeMillis();
+ public void cancel() {
+ future.cancel(true);
}
public long getQueueStartTime() {
@@ -176,8 +124,8 @@ public class QueueToken implements Comparable<QueueToken> {
return queueEndTime;
}
- public TokenState getTokenState() {
- return tokenState;
+ public boolean isReadyToRun() {
+ return tokenState == TokenState.READY_TO_RUN;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 8588e10cf34..2c200e425b3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
@@ -431,6 +432,7 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
List<String> row = new ArrayList<>();
row.add(String.valueOf(id));
row.add(name);
+ Pair<Integer, Integer> queryQueueDetail = qq != null ?
qq.getQueryQueueDetail() : null;
// skip id,name,running query,waiting query
for (int i = 2; i <
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
String key =
WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i);
@@ -472,9 +474,9 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
row.add(val + "%");
}
} else if (QueryQueue.RUNNING_QUERY_NUM.equals(key)) {
- row.add(qq == null ? "0" :
String.valueOf(qq.getCurrentRunningQueryNum()));
+ row.add(queryQueueDetail == null ? "0" :
String.valueOf(queryQueueDetail.first));
} else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) {
- row.add(qq == null ? "0" :
String.valueOf(qq.getCurrentWaitingQueryNum()));
+ row.add(queryQueueDetail == null ? "0" :
String.valueOf(queryQueueDetail.second));
} else if (TAG.equals(key)) {
String val = properties.get(key);
if (StringUtils.isEmpty(val)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 7d0f348d18b..9fb37242d84 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -51,7 +51,6 @@ import org.apache.doris.plsql.metastore.PlsqlStoredProcedure;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
-import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@@ -612,14 +611,8 @@ public class MetadataGenerator {
trow.addToColumnValue(new TCell());
}
- TokenState tokenState = queryInfo.getQueueStatus();
- if (tokenState == null) {
- trow.addToColumnValue(new TCell());
- } else if (tokenState == TokenState.READY_TO_RUN) {
- trow.addToColumnValue(new TCell().setStringVal("RUNNING"));
- } else {
- trow.addToColumnValue(new TCell().setStringVal("QUEUED"));
- }
+ String queueMsg = queryInfo.getQueueStatus();
+ trow.addToColumnValue(new TCell().setStringVal(queueMsg));
trow.addToColumnValue(new
TCell().setStringVal(queryInfo.getSql()));
dataBatch.add(trow);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index d03d3595682..af921cd1821 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -29,6 +29,8 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentResult;
import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
+import org.apache.doris.thrift.TGetBeResourceRequest;
+import org.apache.doris.thrift.TGetBeResourceResult;
import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
import org.apache.doris.thrift.TNetworkAddress;
@@ -148,6 +150,11 @@ public class GenericPoolTest {
return null;
}
+ @Override
+ public TGetBeResourceResult getBeResource(TGetBeResourceRequest
request) throws TException {
+ return null;
+ }
+
@Override
public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest)
throws TException {
// TODO Auto-generated method stub
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 12273331634..de73404b886 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -47,6 +47,8 @@ import org.apache.doris.thrift.TExportState;
import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TGetBeResourceRequest;
+import org.apache.doris.thrift.TGetBeResourceResult;
import org.apache.doris.thrift.THeartbeatResult;
import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
@@ -354,6 +356,11 @@ public class MockedBackendFactory {
return new TPublishTopicResult(new TStatus(TStatusCode.OK));
}
+ @Override
+ public TGetBeResourceResult getBeResource(TGetBeResourceRequest
request) throws TException {
+ return null;
+ }
+
@Override
public TStatus submitExportTask(TExportTaskRequest request) throws
TException {
return new TStatus(TStatusCode.OK);
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index f80c66dd827..0255d0d61a3 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -242,6 +242,19 @@ struct TPublishTopicResult {
1: required Status.TStatus status
}
+
+struct TGetBeResourceRequest {
+}
+
+struct TGlobalResourceUsage {
+ 1: optional i64 mem_limit
+ 2: optional i64 mem_usage
+}
+
+struct TGetBeResourceResult {
+ 1: optional TGlobalResourceUsage global_resource_usage
+}
+
service BackendService {
// Called by coord to start asynchronous execution of plan fragment in
backend.
// Returns as soon as all incoming data streams have been set up.
@@ -298,4 +311,6 @@ service BackendService {
TQueryIngestBinlogResult query_ingest_binlog(1: TQueryIngestBinlogRequest
query_ingest_binlog_request);
TPublishTopicResult publish_topic_info(1:TPublishTopicRequest
topic_request);
+
+ TGetBeResourceResult get_be_resource(1: TGetBeResourceRequest request);
}
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 70d4a550890..ec34b489e0b 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -325,7 +325,7 @@ suite("test_crud_wlg") {
test {
sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ * from
${table_name};"
- exception "query wait timeout"
+ exception "query queue timeout"
}
// test query queue running query/waiting num
@@ -520,7 +520,7 @@ suite("test_crud_wlg") {
sql "create workload group if not exists bypass_group properties (
'max_concurrency'='0','max_queue_size'='0','queue_timeout'='0');"
sql "set workload_group=bypass_group;"
test {
- sql "select count(1) from information_schema.active_queries;"
+ sql "select count(1) from ${table_name};"
exception "query waiting queue is full"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]