This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 30d1e6036c7 [feature](runtime filter) New session variable
runtime_filter_wait_infinitely (#26888)
30d1e6036c7 is described below
commit 30d1e6036c769eff57caabb9d972520f7f44dbc3
Author: zhiqiang <[email protected]>
AuthorDate: Tue Nov 14 07:05:59 2023 -0600
[feature](runtime filter) New session variable
runtime_filter_wait_infinitely (#26888)
New session variable: runtime_filter_wait_infinitely. If set
runtime_filter_wait_infinitely = true, consumer of rf will wait on receiving
until query is timeout.
---
be/src/exprs/runtime_filter.cpp | 18 ++++++++--------
be/src/exprs/runtime_filter.h | 25 +++++++++++++++++-----
be/src/exprs/runtime_filter_rpc.cpp | 2 +-
be/src/runtime/query_context.h | 5 +++++
be/src/runtime/runtime_state.h | 5 +++++
.../join-optimization/runtime-filter.md | 4 +++-
.../join-optimization/runtime-filter.md | 1 +
.../java/org/apache/doris/qe/SessionVariable.java | 6 ++++++
gensrc/thrift/PaloInternalService.thrift | 2 ++
9 files changed, 52 insertions(+), 16 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 10c2856112b..1c7cb4b5c13 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1185,17 +1185,17 @@ bool IRuntimeFilter::await() {
return true;
}
+// NOTE: Wait infinitely will not make scan task wait really forever.
+// Because BlockTaskSchedule will make it run when query is timedout.
+bool IRuntimeFilter::wait_infinitely() const {
+ // bitmap filter is precise filter and only filter once, so it must be
applied.
+ return _wait_infinitely ||
+ (_wrapper != nullptr && _wrapper->get_real_type() ==
RuntimeFilterType::BITMAP_FILTER);
+}
+
bool IRuntimeFilter::is_ready_or_timeout() {
DCHECK(is_consumer());
auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
- auto execution_timeout = _state == nullptr ?
_query_ctx->execution_timeout() * 1000
- : _state->execution_timeout() *
1000;
- auto runtime_filter_wait_time_ms = _state == nullptr ?
_query_ctx->runtime_filter_wait_time_ms()
- :
_state->runtime_filter_wait_time_ms();
- // bitmap filter is precise filter and only filter once, so it must be
applied.
- int64_t wait_times_ms = _wrapper->get_real_type() ==
RuntimeFilterType::BITMAP_FILTER
- ? execution_timeout
- : runtime_filter_wait_time_ms;
int64_t ms_since_registration = MonotonicMillis() - registration_time_;
if (!_enable_pipeline_exec) {
_rf_state = RuntimeFilterState::TIME_OUT;
@@ -1212,7 +1212,7 @@ bool IRuntimeFilter::is_ready_or_timeout() {
if (is_ready()) {
return true;
}
- bool timeout = wait_times_ms <= ms_since_registration;
+ bool timeout = wait_infinitely() ? false : _rf_wait_time_ms <=
ms_since_registration;
auto expected = RuntimeFilterState::NOT_READY;
if (timeout) {
if (!_rf_state_atomic.compare_exchange_strong(expected,
RuntimeFilterState::TIME_OUT,
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index f13877b869c..7a65706f5ad 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -211,6 +211,8 @@ public:
_always_true(false),
_is_ignored(false),
registration_time_(MonotonicMillis()),
+ _wait_infinitely(_state->runtime_filter_wait_infinitely()),
+ _rf_wait_time_ms(_state->runtime_filter_wait_time_ms()),
_enable_pipeline_exec(_state->enable_pipeline_exec()),
_profile(new RuntimeProfile(_name)) {
if (desc->__isset.min_max_type && desc->type ==
TRuntimeFilterType::MIN_MAX) {
@@ -236,6 +238,8 @@ public:
_always_true(false),
_is_ignored(false),
registration_time_(MonotonicMillis()),
+ _wait_infinitely(query_ctx->runtime_filter_wait_infinitely()),
+ _rf_wait_time_ms(query_ctx->runtime_filter_wait_time_ms()),
_enable_pipeline_exec(query_ctx->enable_pipeline_exec()),
_profile(new RuntimeProfile(_name)) {
if (desc->__isset.min_max_type && desc->type ==
TRuntimeFilterType::MIN_MAX) {
@@ -388,13 +392,21 @@ public:
}
}
- int32_t wait_time_ms() {
- auto runtime_filter_wait_time_ms = _state == nullptr
- ?
_query_ctx->runtime_filter_wait_time_ms()
- :
_state->runtime_filter_wait_time_ms();
- return runtime_filter_wait_time_ms;
+ // For pipelineX & Producer
+ int32_t wait_time_ms() const {
+ int32_t res = 0;
+ if (wait_infinitely()) {
+ res = _state == nullptr ? _query_ctx->execution_timeout() :
_state->execution_timeout();
+ // Convert to ms
+ res *= 1000;
+ } else {
+ res = _rf_wait_time_ms;
+ }
+ return res;
}
+ bool wait_infinitely() const;
+
int64_t registration_time() const { return registration_time_; }
void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
@@ -479,6 +491,9 @@ protected:
/// Time in ms (from MonotonicMillis()), that the filter was registered.
const int64_t registration_time_;
+ /// runtime filter wait time will be ignored if wait_infinitly is true
+ const bool _wait_infinitely;
+ const int32_t _rf_wait_time_ms;
const bool _enable_pipeline_exec;
diff --git a/be/src/exprs/runtime_filter_rpc.cpp
b/be/src/exprs/runtime_filter_rpc.cpp
index 2220ca86060..00540b8382c 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -74,7 +74,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state,
const TNetworkAddress
_rpc_context->request.set_filter_id(_filter_id);
_rpc_context->request.set_opt_remote_rf(opt_remote_rf);
_rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
- _rpc_context->cntl.set_timeout_ms(state->runtime_filter_wait_time_ms());
+ _rpc_context->cntl.set_timeout_ms(wait_time_ms());
_rpc_context->cid = _rpc_context->cntl.call_id();
Status serialize_status = serialize(&_rpc_context->request, &data, &len);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c51e92d14c8..bb7ad20b90b 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -210,6 +210,11 @@ public:
return _query_options.runtime_filter_wait_time_ms;
}
+ bool runtime_filter_wait_infinitely() const {
+ return _query_options.__isset.runtime_filter_wait_infinitely &&
+ _query_options.runtime_filter_wait_infinitely;
+ }
+
bool enable_pipeline_exec() const {
return _query_options.__isset.enable_pipeline_engine &&
_query_options.enable_pipeline_engine;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 3b420511fa0..835bd582894 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -333,6 +333,11 @@ public:
return _query_options.runtime_filter_wait_time_ms;
}
+ bool runtime_filter_wait_infinitely() const {
+ return _query_options.__isset.runtime_filter_wait_infinitely &&
+ _query_options.runtime_filter_wait_infinitely;
+ }
+
int32_t runtime_filter_max_in_num() const { return
_query_options.runtime_filter_max_in_num; }
int be_exec_version() const {
diff --git
a/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md
b/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md
index 7b359d5c08a..a70522b0215 100644
--- a/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md
+++ b/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md
@@ -95,7 +95,7 @@ For query options related to Runtime Filter, please refer to
the following secti
- `runtime_filter_mode`: Used to adjust the push-down strategy of Runtime
Filter, including three strategies of OFF, LOCAL, and GLOBAL. The default
setting is the GLOBAL strategy
- - `runtime_filter_wait_time_ms`: the time that ScanNode in the left table
waits for each Runtime Filter, the default is 1000ms
+ - `runtime_filter_wait_time_ms`: The time that ScanNode in the left table
waits for each Runtime Filter, the default is 1000ms
- `runtime_filters_max_num`: The maximum number of Bloom Filters in the
Runtime Filter that can be applied to each query, the default is 10
@@ -107,6 +107,8 @@ For query options related to Runtime Filter, please refer
to the following secti
- `runtime_filter_max_in_num`: If the number of rows in the right table of
the join is greater than this value, we will not generate an IN predicate, the
default is 1024
+ - `runtime_filter_wait_infinitely`: If the parameter is true, the scan node
of the left table will wait until it receives a runtime filter or the query
times out, default is false
+
The query options are further explained below.
#### 1.runtime_filter_type
diff --git
a/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md
b/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md
index ff47091cf05..8ed2876235a 100644
--- a/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md
+++ b/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md
@@ -103,6 +103,7 @@ Runtime Filter主要用于大表join小表的优化,如果左表的数据量
- `runtime_bloom_filter_max_size`: Runtime Filter中Bloom
Filter的最大长度,默认16777216(16M)
- `runtime_bloom_filter_size`: Runtime Filter中Bloom Filter的默认长度,默认2097152(2M)
- `runtime_filter_max_in_num`: 如果join右表数据行数大于这个值,我们将不生成IN predicate,默认1024
+ - `runtime_filter_wait_infinitely`: 如果参数为 true,那么左表的scan节点将会一直等待直到接收到
runtime filer或者查询超超时,默认为false
下面对查询选项做进一步说明。
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 711c5a2f016..3cc5bd45587 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -167,6 +167,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String USE_RF_DEFAULT = "use_rf_default";
// Time in ms to wait until runtime filters are delivered.
public static final String RUNTIME_FILTER_WAIT_TIME_MS =
"runtime_filter_wait_time_ms";
+ public static final String runtime_filter_wait_infinitely =
"runtime_filter_wait_infinitely";
+
// Maximum number of bloom runtime filters allowed per query
public static final String RUNTIME_FILTERS_MAX_NUM =
"runtime_filters_max_num";
// Runtime filter type used, For testing, Corresponds to TRuntimeFilterType
@@ -792,6 +794,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = RUNTIME_FILTER_WAIT_TIME_MS, needForward =
true)
private int runtimeFilterWaitTimeMs = 1000;
+ @VariableMgr.VarAttr(name = runtime_filter_wait_infinitely, needForward =
true)
+ private boolean runtimeFilterWaitInfinitely = false;
+
@VariableMgr.VarAttr(name = RUNTIME_FILTERS_MAX_NUM, needForward = true)
private int runtimeFiltersMaxNum = 10;
@@ -2486,6 +2491,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setRuntimeFilterWaitTimeMs(runtimeFilterWaitTimeMs);
tResult.setRuntimeFilterMaxInNum(runtimeFilterMaxInNum);
+ tResult.setRuntimeFilterWaitInfinitely(runtimeFilterWaitInfinitely);
if (cpuResourceLimit > 0) {
TResourceLimit resourceLimit = new TResourceLimit();
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 7c55842735c..d1a779e285a 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -255,6 +255,8 @@ struct TQueryOptions {
89: optional bool enable_local_shuffle = false;
// For emergency use, skip missing version when reading rowsets
90: optional bool skip_missing_version = false;
+
+ 91: optional bool runtime_filter_wait_infinitely = false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]