This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c51219ca78ce2c5b3dcd2c9b002cf56ae5515eef
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Wed Mar 5 22:04:49 2025 +0800

    add rpc fail test and eof test
    
    update
    
    update fix
    
    fix case
    
    fix case
    
    fix case
---
 be/src/runtime/fragment_mgr.cpp                    |   6 ++
 be/src/runtime/runtime_state.h                     |   5 -
 be/src/runtime_filter/runtime_filter_producer.cpp  |   9 +-
 be/src/runtime_filter/runtime_filter_producer.h    |   6 +-
 be/src/runtime_filter/runtime_filter_wrapper.h     |   2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  11 --
 gensrc/thrift/PaloInternalService.thrift           |   2 +-
 .../test_inject_send_filter_size_fail.out          | Bin 0 -> 197 bytes
 .../test_inject_send_filter_size_fail.groovy       | 120 +++++++++++++++++++++
 .../join/test_low_bucket/test_low_bucket.groovy    |   1 +
 .../join/test_slow_close/test_slow_close.groovy    |   2 +
 11 files changed, 142 insertions(+), 22 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b550d187c01..d83cff2fb87 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1303,6 +1303,12 @@ Status FragmentMgr::send_filter_size(const 
PSendFilterSizeRequest* request) {
     TUniqueId query_id;
     query_id.__set_hi(queryid.hi);
     query_id.__set_lo(queryid.lo);
+
+    if (config::enable_debug_points &&
+        
DebugPoints::instance()->is_enable("FragmentMgr::send_filter_size.return_eof")) 
{
+        return Status::EndOfFile("inject 
FragmentMgr::send_filter_size.return_eof");
+    }
+
     if (auto q_ctx = get_query_ctx(query_id)) {
         return q_ctx->get_merge_controller_handler()->send_filter_size(q_ctx, 
request);
     } else {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index e63aebe4bbe..743d53e480a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -643,11 +643,6 @@ public:
                _query_options.enable_shared_exchange_sink_buffer;
     }
 
-    bool fuzzy_disable_runtime_filter_in_be() const {
-        return _query_options.__isset.fuzzy_disable_runtime_filter_in_be &&
-               _query_options.fuzzy_disable_runtime_filter_in_be;
-    }
-
     size_t minimum_operator_memory_required_bytes() const {
         if (_query_options.__isset.minimum_operator_memory_required_kb) {
             return _query_options.minimum_operator_memory_required_kb * 1024;
diff --git a/be/src/runtime_filter/runtime_filter_producer.cpp 
b/be/src/runtime_filter/runtime_filter_producer.cpp
index 3f91c000a48..fa988887641 100644
--- a/be/src/runtime_filter/runtime_filter_producer.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer.cpp
@@ -100,6 +100,7 @@ class SyncSizeClosure : public 
AutoReleaseClosure<PSendFilterSizeRequest,
     std::weak_ptr<RuntimeFilterWrapper> _wrapper;
     using Base =
             AutoReleaseClosure<PSendFilterSizeRequest, 
DummyBrpcCallback<PSendFilterSizeResponse>>;
+    friend class RuntimeFilterProducer;
     ENABLE_FACTORY_CREATOR(SyncSizeClosure);
 
     void _process_if_rpc_failed() override {
@@ -213,6 +214,11 @@ Status RuntimeFilterProducer::send_size(
         callback->cntl_->ignore_eovercrowded();
     }
 
+    if (config::enable_debug_points &&
+        
DebugPoints::instance()->is_enable("RuntimeFilterProducer::send_size.rpc_fail"))
 {
+        closure->cntl_->SetFailed("inject 
RuntimeFilterProducer::send_size.rpc_fail");
+    }
+
     stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), 
closure->response_.get(),
                            closure.get());
     closure.release();
@@ -220,7 +226,7 @@ Status RuntimeFilterProducer::send_size(
 }
 
 void RuntimeFilterProducer::set_synced_size(uint64_t global_size) {
-    if (_rf_state != State::WAITING_FOR_SYNCED_SIZE) {
+    if (!set_state(State::WAITING_FOR_DATA)) {
         _check_wrapper_state(
                 {RuntimeFilterWrapper::State::DISABLED, 
RuntimeFilterWrapper::State::IGNORED});
     }
@@ -229,7 +235,6 @@ void RuntimeFilterProducer::set_synced_size(uint64_t 
global_size) {
     if (_dependency) {
         _dependency->sub();
     }
-    set_state(State::WAITING_FOR_DATA);
 }
 
 Status RuntimeFilterProducer::init(size_t local_size) {
diff --git a/be/src/runtime_filter/runtime_filter_producer.h 
b/be/src/runtime_filter/runtime_filter_producer.h
index 94969605536..252b12616ee 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -83,9 +83,11 @@ public:
 
     void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State 
state,
                                                 std::string reason = "") {
-        if (set_state(State::READY_TO_PUBLISH)) {
-            _wrapper->set_state(state, reason);
+        if (_rf_state == State::PUBLISHED || _rf_state == 
State::READY_TO_PUBLISH) {
+            return;
         }
+        _wrapper->set_state(state, reason); // set wrapper firstly to pass 
set_synced_size's check
+        set_state(State::READY_TO_PUBLISH);
     }
 
     static std::string to_string(const State& state) {
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h 
b/be/src/runtime_filter/runtime_filter_wrapper.h
index 7cee88fe666..390c4fd41b3 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.h
+++ b/be/src/runtime_filter/runtime_filter_wrapper.h
@@ -98,7 +98,7 @@ public:
     void check_state(std::vector<State> assumed_states) const {
         if (!check_state_impl<RuntimeFilterWrapper>(_state, assumed_states)) {
             throw Exception(ErrorCode::INTERNAL_ERROR,
-                            "producer meet invalid state, {}, assumed_states 
is {}", debug_string(),
+                            "wrapper meet invalid state, {}, assumed_states is 
{}", debug_string(),
                             
states_to_string<RuntimeFilterWrapper>(assumed_states));
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index eac77eea078..d5370347dbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -587,8 +587,6 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String LOW_MEMORY_MODE_BUFFER_LIMIT = 
"low_memory_mode_buffer_limit";
     public static final String DUMP_HEAP_PROFILE_WHEN_MEM_LIMIT_EXCEEDED = 
"dump_heap_profile_when_mem_limit_exceeded";
 
-    public static final String FUZZY_DISABLE_RUNTIME_FILTER_IN_BE = 
"fuzzy_disable_runtime_filter_in_be";
-
     public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";
 
     public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
@@ -2351,13 +2349,6 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = SPILL_SORT_BATCH_BYTES)
     public long spillSortBatchBytes = 8388608; // 8M
 
-    @VariableMgr.VarAttr(
-            name = FUZZY_DISABLE_RUNTIME_FILTER_IN_BE,
-            description = {"在 BE 上开启禁用 runtime filter 的随机开关,用于测试",
-                    "Disable the runtime filter on the BE for testing 
purposes."},
-            needForward = true, fuzzy = true)
-    public boolean fuzzyDisableRuntimeFilterInBE = false;
-
     @VariableMgr.VarAttr(name = SPILL_AGGREGATION_PARTITION_COUNT, fuzzy = 
true)
     public int spillAggregationPartitionCount = 32;
 
@@ -2617,7 +2608,6 @@ public class SessionVariable implements Serializable, 
Writable {
             }
 
         }
-        this.fuzzyDisableRuntimeFilterInBE = random.nextBoolean();
         this.runtimeFilterWaitInfinitely = random.nextBoolean();
 
         // set random 1, 10, 100, 1000, 10000
@@ -4073,7 +4063,6 @@ public class SessionVariable implements Serializable, 
Writable {
         
tResult.setDumpHeapProfileWhenMemLimitExceeded(dumpHeapProfileWhenMemLimitExceeded);
 
         tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
-        
tResult.setFuzzyDisableRuntimeFilterInBe(fuzzyDisableRuntimeFilterInBE);
         tResult.setLowMemoryModeBufferLimit(lowMemoryModeBufferLimit);
 
         tResult.setEnableLocalMergeSort(enableLocalMergeSort);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 38d302ef574..a0c3a20dc81 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -367,7 +367,7 @@ struct TQueryOptions {
 
   144: optional bool enable_inverted_index_searcher_cache = true;
   145: optional bool enable_inverted_index_query_cache = true;
-  146: optional bool fuzzy_disable_runtime_filter_in_be = false;
+  146: optional bool fuzzy_disable_runtime_filter_in_be = false; // deprecated
 
   147: optional i32 profile_level = 1;
 
diff --git 
a/regression-test/data/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.out
 
b/regression-test/data/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.out
new file mode 100644
index 00000000000..42b0346861d
Binary files /dev/null and 
b/regression-test/data/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.out
 differ
diff --git 
a/regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy
 
b/regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy
new file mode 100644
index 00000000000..3b91bf157eb
--- /dev/null
+++ 
b/regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_inject_send_filter_size_fail") {
+    sql "set enable_spill=false"
+    sql "set enable_force_spill=false"
+    sql "set parallel_pipeline_task_num=3"
+    sql "set enable_runtime_filter_prune=false"
+    sql "set enable_sync_runtime_filter_size=true"
+    sql "set runtime_filter_type='IN_OR_BLOOM_FILTER,MIN_MAX'"
+    sql "set runtime_filter_wait_infinitely=false"
+
+    sql """ drop table if exists t1; """
+    sql """ drop table if exists t3; """
+    sql """ drop table if exists t5; """
+
+    sql """
+        create table t1 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+        """
+
+    sql """
+        create table t3 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+
+    """
+
+    sql """
+        create table t5 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+    """
+
+    sql """
+    insert into t1 select e1,e1 from (select 1 k1) as t lateral view 
explode_numbers(100000) tmp1 as e1;
+    """
+    
+    sql """
+    insert into t3 values(1,1),(2,2),(3,3);
+    """
+
+    sql """
+    insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5);
+    """
+    
+    sql "analyze table t1 with sync;"
+    sql "analyze table t3 with sync;"
+    sql "analyze table t5 with sync;"
+
+    qt_normal "select count(*),sleep(2) from (select t1.k1 from t5 join 
[shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on 
tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail")
+        sql "set ignore_runtime_filter_error = false"
+        test {
+            sql """select count(*),sleep(2) from (select t1.k1 from t5 join 
[shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on 
tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"""
+            exception "RPC meet failed"
+        }
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail")
+        sql "set ignore_runtime_filter_error = true"
+        qt_rpc_failed "select count(*),sleep(2) from (select t1.k1 from t5 
join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] 
on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof")
+        qt_eof "select count(*),sleep(2) from (select t1.k1 from t5 join 
[shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on 
tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail")
+        sql "set ignore_runtime_filter_error = true"
+        qt_rpc_failed "select count(*) from (select t1.k1 from t5 join 
[shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on 
tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail")
+    }
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof")
+        qt_eof "select count(*) from (select t1.k1 from t5 join [shuffle] t1 
on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and 
t3s.k1=t3.k1 where t3.k2=5;"
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof")
+    }
+}
diff --git 
a/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy 
b/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy
index 3f227b22f34..046b6183cb1 100644
--- 
a/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy
+++ 
b/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy
@@ -17,6 +17,7 @@
 
 suite("test_low_bucket") {
     sql "set enable_spill = false" // spill will cause rf not_ready
+    sql "set enable_force_spill = false"
     sql "set runtime_filter_wait_infinitely = true"
     sql "set parallel_pipeline_task_num = 4"
 
diff --git 
a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy 
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
index 0b36d2da5ab..5c6c678fabd 100644
--- 
a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
+++ 
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
@@ -16,6 +16,8 @@
 // under the License.
 
 suite("test_slow_close") {
+    sql "set enable_spill = false"
+    sql "set enable_force_spill = false"
     sql "set disable_join_reorder=true;"
     sql "set runtime_filter_type='bloom_filter';"
     sql "set parallel_pipeline_task_num=3"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to