This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4483e3a6e1 [Improvement](scan) add a config for scan queue memory
limit (#19439)
4483e3a6e1 is described below
commit 4483e3a6e13a09952154e0ea936c7b37b6c791f3
Author: Gabriel <[email protected]>
AuthorDate: Wed May 10 13:14:23 2023 +0800
[Improvement](scan) add a config for scan queue memory limit (#19439)
---
be/src/runtime/runtime_state.h | 4 ++++
be/src/vec/exec/scan/vscan_node.cpp | 8 ++++----
.../src/main/java/org/apache/doris/analysis/SetVar.java | 4 ++++
.../src/main/java/org/apache/doris/qe/SessionVariable.java | 14 ++++++++++++++
gensrc/thrift/PaloInternalService.thrift | 2 ++
5 files changed, 28 insertions(+), 4 deletions(-)
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 6def2cc4e5..50e672498b 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -86,6 +86,10 @@ public:
Status init_mem_trackers(const TUniqueId& query_id = TUniqueId());
const TQueryOptions& query_options() const { return _query_options; }
+ int64_t scan_queue_mem_limit() const {
+ return _query_options.__isset.scan_queue_mem_limit ?
_query_options.scan_queue_mem_limit
+ :
_query_options.mem_limit / 20;
+ }
ObjectPool* obj_pool() const { return _obj_pool.get(); }
const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index e082507e88..532cebefd4 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -302,11 +302,11 @@ Status VScanNode::_start_scanners(const
std::list<VScannerSPtr>& scanners) {
if (_is_pipeline_scan) {
_scanner_ctx = pipeline::PipScannerContext::create_shared(
_state, this, _input_tuple_desc, _output_tuple_desc, scanners,
limit(),
- _state->query_options().mem_limit / 20, _col_distribute_ids);
+ _state->scan_queue_mem_limit(), _col_distribute_ids);
} else {
- _scanner_ctx = ScannerContext::create_shared(_state, this,
_input_tuple_desc,
- _output_tuple_desc,
scanners, limit(),
-
_state->query_options().mem_limit / 20);
+ _scanner_ctx =
+ ScannerContext::create_shared(_state, this, _input_tuple_desc,
_output_tuple_desc,
+ scanners, limit(),
_state->scan_queue_mem_limit());
}
RETURN_IF_ERROR(_scanner_ctx->init());
return Status::OK();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
index b853615ae7..37eac5ac30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
@@ -160,6 +160,10 @@ public class SetVar {
this.value = new
StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getValue().getStringValue())));
this.result = (LiteralExpr) this.value;
}
+ if
(getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) {
+ this.value = new
StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getValue().getStringValue())));
+ this.result = (LiteralExpr) this.value;
+ }
if (getVariable().equalsIgnoreCase("is_report_success")) {
variable = SessionVariable.ENABLE_PROFILE;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 9e55696c85..ad2143b3da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -62,6 +62,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final Logger LOG =
LogManager.getLogger(SessionVariable.class);
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
+ public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
public static final String QUERY_TIMEOUT = "query_timeout";
public static final String INSERT_TIMEOUT = "insert_timeout";
public static final String ENABLE_PROFILE = "enable_profile";
@@ -336,6 +337,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
public long maxExecMemByte = 2147483648L;
+ @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT)
+ public long maxScanQueueMemByte = 2147483648L / 20;
+
@VariableMgr.VarAttr(name = ENABLE_SPILLING)
public boolean enableSpilling = false;
@@ -1012,6 +1016,10 @@ public class SessionVariable implements Serializable,
Writable {
return maxExecMemByte;
}
+ public long getMaxScanQueueExecMemByte() {
+ return maxScanQueueMemByte;
+ }
+
public int getQueryTimeoutS() {
return queryTimeoutS;
}
@@ -1159,6 +1167,10 @@ public class SessionVariable implements Serializable,
Writable {
}
}
+ public void setMaxScanQueueMemByte(long scanQueueMemByte) {
+ this.maxScanQueueMemByte = Math.min(scanQueueMemByte, maxExecMemByte /
20);
+ }
+
public boolean isSqlQuoteShowCreate() {
return sqlQuoteShowCreate;
}
@@ -1743,6 +1755,7 @@ public class SessionVariable implements Serializable,
Writable {
public TQueryOptions toThrift() {
TQueryOptions tResult = new TQueryOptions();
tResult.setMemLimit(maxExecMemByte);
+ tResult.setScanQueueMemLimit(Math.min(maxScanQueueMemByte,
maxExecMemByte / 20));
// TODO chenhao, reservation will be calculated by cost
tResult.setMinReservation(0);
@@ -1996,6 +2009,7 @@ public class SessionVariable implements Serializable,
Writable {
public TQueryOptions getQueryOptionVariables() {
TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setMemLimit(maxExecMemByte);
+ queryOptions.setScanQueueMemLimit(Math.min(maxScanQueueMemByte,
maxExecMemByte / 20));
queryOptions.setQueryTimeout(queryTimeoutS);
queryOptions.setInsertTimeout(insertTimeoutS);
return queryOptions;
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 64ddd524bf..3b77b325b9 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -219,6 +219,8 @@ struct TQueryOptions {
71: optional bool enable_parquet_lazy_mat = true
72: optional bool enable_orc_lazy_mat = true
+
+ 73: optional i64 scan_queue_mem_limit
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]