This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 30d1e6036c7 [feature](runtime filter) New session variable 
runtime_filter_wait_infinitely (#26888)
30d1e6036c7 is described below

commit 30d1e6036c769eff57caabb9d972520f7f44dbc3
Author: zhiqiang <[email protected]>
AuthorDate: Tue Nov 14 07:05:59 2023 -0600

    [feature](runtime filter) New session variable 
runtime_filter_wait_infinitely (#26888)
    
    New session variable: runtime_filter_wait_infinitely. If set 
runtime_filter_wait_infinitely = true, consumer of rf will wait on receiving 
until query is timeout.
---
 be/src/exprs/runtime_filter.cpp                    | 18 ++++++++--------
 be/src/exprs/runtime_filter.h                      | 25 +++++++++++++++++-----
 be/src/exprs/runtime_filter_rpc.cpp                |  2 +-
 be/src/runtime/query_context.h                     |  5 +++++
 be/src/runtime/runtime_state.h                     |  5 +++++
 .../join-optimization/runtime-filter.md            |  4 +++-
 .../join-optimization/runtime-filter.md            |  1 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  6 ++++++
 gensrc/thrift/PaloInternalService.thrift           |  2 ++
 9 files changed, 52 insertions(+), 16 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 10c2856112b..1c7cb4b5c13 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1185,17 +1185,17 @@ bool IRuntimeFilter::await() {
     return true;
 }
 
+// NOTE: Wait infinitely will not make scan task wait really forever.
+// Because BlockTaskSchedule will make it run when query is timedout.
+bool IRuntimeFilter::wait_infinitely() const {
+    // bitmap filter is precise filter and only filter once, so it must be 
applied.
+    return _wait_infinitely ||
+           (_wrapper != nullptr && _wrapper->get_real_type() == 
RuntimeFilterType::BITMAP_FILTER);
+}
+
 bool IRuntimeFilter::is_ready_or_timeout() {
     DCHECK(is_consumer());
     auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
-    auto execution_timeout = _state == nullptr ? 
_query_ctx->execution_timeout() * 1000
-                                               : _state->execution_timeout() * 
1000;
-    auto runtime_filter_wait_time_ms = _state == nullptr ? 
_query_ctx->runtime_filter_wait_time_ms()
-                                                         : 
_state->runtime_filter_wait_time_ms();
-    // bitmap filter is precise filter and only filter once, so it must be 
applied.
-    int64_t wait_times_ms = _wrapper->get_real_type() == 
RuntimeFilterType::BITMAP_FILTER
-                                    ? execution_timeout
-                                    : runtime_filter_wait_time_ms;
     int64_t ms_since_registration = MonotonicMillis() - registration_time_;
     if (!_enable_pipeline_exec) {
         _rf_state = RuntimeFilterState::TIME_OUT;
@@ -1212,7 +1212,7 @@ bool IRuntimeFilter::is_ready_or_timeout() {
         if (is_ready()) {
             return true;
         }
-        bool timeout = wait_times_ms <= ms_since_registration;
+        bool timeout = wait_infinitely() ? false : _rf_wait_time_ms <= 
ms_since_registration;
         auto expected = RuntimeFilterState::NOT_READY;
         if (timeout) {
             if (!_rf_state_atomic.compare_exchange_strong(expected, 
RuntimeFilterState::TIME_OUT,
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index f13877b869c..7a65706f5ad 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -211,6 +211,8 @@ public:
               _always_true(false),
               _is_ignored(false),
               registration_time_(MonotonicMillis()),
+              _wait_infinitely(_state->runtime_filter_wait_infinitely()),
+              _rf_wait_time_ms(_state->runtime_filter_wait_time_ms()),
               _enable_pipeline_exec(_state->enable_pipeline_exec()),
               _profile(new RuntimeProfile(_name)) {
         if (desc->__isset.min_max_type && desc->type == 
TRuntimeFilterType::MIN_MAX) {
@@ -236,6 +238,8 @@ public:
               _always_true(false),
               _is_ignored(false),
               registration_time_(MonotonicMillis()),
+              _wait_infinitely(query_ctx->runtime_filter_wait_infinitely()),
+              _rf_wait_time_ms(query_ctx->runtime_filter_wait_time_ms()),
               _enable_pipeline_exec(query_ctx->enable_pipeline_exec()),
               _profile(new RuntimeProfile(_name)) {
         if (desc->__isset.min_max_type && desc->type == 
TRuntimeFilterType::MIN_MAX) {
@@ -388,13 +392,21 @@ public:
         }
     }
 
-    int32_t wait_time_ms() {
-        auto runtime_filter_wait_time_ms = _state == nullptr
-                                                   ? 
_query_ctx->runtime_filter_wait_time_ms()
-                                                   : 
_state->runtime_filter_wait_time_ms();
-        return runtime_filter_wait_time_ms;
+    // For pipelineX & Producer
+    int32_t wait_time_ms() const {
+        int32_t res = 0;
+        if (wait_infinitely()) {
+            res = _state == nullptr ? _query_ctx->execution_timeout() : 
_state->execution_timeout();
+            // Convert to ms
+            res *= 1000;
+        } else {
+            res = _rf_wait_time_ms;
+        }
+        return res;
     }
 
+    bool wait_infinitely() const;
+
     int64_t registration_time() const { return registration_time_; }
 
     void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
@@ -479,6 +491,9 @@ protected:
 
     /// Time in ms (from MonotonicMillis()), that the filter was registered.
     const int64_t registration_time_;
+    /// runtime filter wait time will be ignored if wait_infinitly is true
+    const bool _wait_infinitely;
+    const int32_t _rf_wait_time_ms;
 
     const bool _enable_pipeline_exec;
 
diff --git a/be/src/exprs/runtime_filter_rpc.cpp 
b/be/src/exprs/runtime_filter_rpc.cpp
index 2220ca86060..00540b8382c 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -74,7 +74,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, 
const TNetworkAddress
     _rpc_context->request.set_filter_id(_filter_id);
     _rpc_context->request.set_opt_remote_rf(opt_remote_rf);
     _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
-    _rpc_context->cntl.set_timeout_ms(state->runtime_filter_wait_time_ms());
+    _rpc_context->cntl.set_timeout_ms(wait_time_ms());
     _rpc_context->cid = _rpc_context->cntl.call_id();
 
     Status serialize_status = serialize(&_rpc_context->request, &data, &len);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c51e92d14c8..bb7ad20b90b 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -210,6 +210,11 @@ public:
         return _query_options.runtime_filter_wait_time_ms;
     }
 
+    bool runtime_filter_wait_infinitely() const {
+        return _query_options.__isset.runtime_filter_wait_infinitely &&
+               _query_options.runtime_filter_wait_infinitely;
+    }
+
     bool enable_pipeline_exec() const {
         return _query_options.__isset.enable_pipeline_engine &&
                _query_options.enable_pipeline_engine;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 3b420511fa0..835bd582894 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -333,6 +333,11 @@ public:
         return _query_options.runtime_filter_wait_time_ms;
     }
 
+    bool runtime_filter_wait_infinitely() const {
+        return _query_options.__isset.runtime_filter_wait_infinitely &&
+               _query_options.runtime_filter_wait_infinitely;
+    }
+
     int32_t runtime_filter_max_in_num() const { return 
_query_options.runtime_filter_max_in_num; }
 
     int be_exec_version() const {
diff --git 
a/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md 
b/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md
index 7b359d5c08a..a70522b0215 100644
--- a/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md
+++ b/docs/en/docs/query-acceleration/join-optimization/runtime-filter.md
@@ -95,7 +95,7 @@ For query options related to Runtime Filter, please refer to 
the following secti
 
   - `runtime_filter_mode`: Used to adjust the push-down strategy of Runtime 
Filter, including three strategies of OFF, LOCAL, and GLOBAL. The default 
setting is the GLOBAL strategy
 
-  - `runtime_filter_wait_time_ms`: the time that ScanNode in the left table 
waits for each Runtime Filter, the default is 1000ms
+  - `runtime_filter_wait_time_ms`: The time that ScanNode in the left table 
waits for each Runtime Filter, the default is 1000ms
 
   - `runtime_filters_max_num`: The maximum number of Bloom Filters in the 
Runtime Filter that can be applied to each query, the default is 10
 
@@ -107,6 +107,8 @@ For query options related to Runtime Filter, please refer 
to the following secti
 
   - `runtime_filter_max_in_num`: If the number of rows in the right table of 
the join is greater than this value, we will not generate an IN predicate, the 
default is 1024
 
+  - `runtime_filter_wait_infinitely`: If the parameter is true, the scan node 
of the left table will wait until it receives a runtime filter or the query 
times out, default is false
+
 The query options are further explained below.
 
 #### 1.runtime_filter_type
diff --git 
a/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md 
b/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md
index ff47091cf05..8ed2876235a 100644
--- a/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md
+++ b/docs/zh-CN/docs/query-acceleration/join-optimization/runtime-filter.md
@@ -103,6 +103,7 @@ Runtime Filter主要用于大表join小表的优化,如果左表的数据量
   - `runtime_bloom_filter_max_size`: Runtime Filter中Bloom 
Filter的最大长度,默认16777216(16M)
   - `runtime_bloom_filter_size`: Runtime Filter中Bloom Filter的默认长度,默认2097152(2M)
   - `runtime_filter_max_in_num`: 如果join右表数据行数大于这个值,我们将不生成IN predicate,默认1024
+  - `runtime_filter_wait_infinitely`: 如果参数为 true,那么左表的scan节点将会一直等待直到接收到 
runtime filer或者查询超超时,默认为false
 
 下面对查询选项做进一步说明。
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 711c5a2f016..3cc5bd45587 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -167,6 +167,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String USE_RF_DEFAULT = "use_rf_default";
     // Time in ms to wait until runtime filters are delivered.
     public static final String RUNTIME_FILTER_WAIT_TIME_MS = 
"runtime_filter_wait_time_ms";
+    public static final String runtime_filter_wait_infinitely = 
"runtime_filter_wait_infinitely";
+
     // Maximum number of bloom runtime filters allowed per query
     public static final String RUNTIME_FILTERS_MAX_NUM = 
"runtime_filters_max_num";
     // Runtime filter type used, For testing, Corresponds to TRuntimeFilterType
@@ -792,6 +794,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = RUNTIME_FILTER_WAIT_TIME_MS, needForward = 
true)
     private int runtimeFilterWaitTimeMs = 1000;
 
+    @VariableMgr.VarAttr(name = runtime_filter_wait_infinitely, needForward = 
true)
+    private boolean runtimeFilterWaitInfinitely = false;
+
     @VariableMgr.VarAttr(name = RUNTIME_FILTERS_MAX_NUM, needForward = true)
     private int runtimeFiltersMaxNum = 10;
 
@@ -2486,6 +2491,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setRuntimeFilterWaitTimeMs(runtimeFilterWaitTimeMs);
         tResult.setRuntimeFilterMaxInNum(runtimeFilterMaxInNum);
+        tResult.setRuntimeFilterWaitInfinitely(runtimeFilterWaitInfinitely);
 
         if (cpuResourceLimit > 0) {
             TResourceLimit resourceLimit = new TResourceLimit();
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 7c55842735c..d1a779e285a 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -255,6 +255,8 @@ struct TQueryOptions {
   89: optional bool enable_local_shuffle = false;
   // For emergency use, skip missing version when reading rowsets
   90: optional bool skip_missing_version = false;
+
+  91: optional bool runtime_filter_wait_infinitely = false;
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to