This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit c51219ca78ce2c5b3dcd2c9b002cf56ae5515eef Author: BiteTheDDDDt <[email protected]> AuthorDate: Wed Mar 5 22:04:49 2025 +0800 add rpc fail test and eof test update update fix fix case fix case fix case --- be/src/runtime/fragment_mgr.cpp | 6 ++ be/src/runtime/runtime_state.h | 5 - be/src/runtime_filter/runtime_filter_producer.cpp | 9 +- be/src/runtime_filter/runtime_filter_producer.h | 6 +- be/src/runtime_filter/runtime_filter_wrapper.h | 2 +- .../java/org/apache/doris/qe/SessionVariable.java | 11 -- gensrc/thrift/PaloInternalService.thrift | 2 +- .../test_inject_send_filter_size_fail.out | Bin 0 -> 197 bytes .../test_inject_send_filter_size_fail.groovy | 120 +++++++++++++++++++++ .../join/test_low_bucket/test_low_bucket.groovy | 1 + .../join/test_slow_close/test_slow_close.groovy | 2 + 11 files changed, 142 insertions(+), 22 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index b550d187c01..d83cff2fb87 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1303,6 +1303,12 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); + + if (config::enable_debug_points && + DebugPoints::instance()->is_enable("FragmentMgr::send_filter_size.return_eof")) { + return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof"); + } + if (auto q_ctx = get_query_ctx(query_id)) { return q_ctx->get_merge_controller_handler()->send_filter_size(q_ctx, request); } else { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index e63aebe4bbe..743d53e480a 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -643,11 +643,6 @@ public: _query_options.enable_shared_exchange_sink_buffer; } - bool fuzzy_disable_runtime_filter_in_be() const { - return _query_options.__isset.fuzzy_disable_runtime_filter_in_be && - _query_options.fuzzy_disable_runtime_filter_in_be; - } - size_t minimum_operator_memory_required_bytes() const { if (_query_options.__isset.minimum_operator_memory_required_kb) { return _query_options.minimum_operator_memory_required_kb * 1024; diff --git a/be/src/runtime_filter/runtime_filter_producer.cpp b/be/src/runtime_filter/runtime_filter_producer.cpp index 3f91c000a48..fa988887641 100644 --- a/be/src/runtime_filter/runtime_filter_producer.cpp +++ b/be/src/runtime_filter/runtime_filter_producer.cpp @@ -100,6 +100,7 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest, std::weak_ptr<RuntimeFilterWrapper> _wrapper; using Base = AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>; + friend class RuntimeFilterProducer; ENABLE_FACTORY_CREATOR(SyncSizeClosure); void _process_if_rpc_failed() override { @@ -213,6 +214,11 @@ Status RuntimeFilterProducer::send_size( callback->cntl_->ignore_eovercrowded(); } + if (config::enable_debug_points && + DebugPoints::instance()->is_enable("RuntimeFilterProducer::send_size.rpc_fail")) { + closure->cntl_->SetFailed("inject RuntimeFilterProducer::send_size.rpc_fail"); + } + stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), closure.get()); closure.release(); @@ -220,7 +226,7 @@ Status RuntimeFilterProducer::send_size( } void RuntimeFilterProducer::set_synced_size(uint64_t global_size) { - if (_rf_state != State::WAITING_FOR_SYNCED_SIZE) { + if (!set_state(State::WAITING_FOR_DATA)) { _check_wrapper_state( {RuntimeFilterWrapper::State::DISABLED, RuntimeFilterWrapper::State::IGNORED}); } @@ -229,7 +235,6 @@ void RuntimeFilterProducer::set_synced_size(uint64_t global_size) { if (_dependency) { _dependency->sub(); } - set_state(State::WAITING_FOR_DATA); } Status RuntimeFilterProducer::init(size_t local_size) { diff --git a/be/src/runtime_filter/runtime_filter_producer.h b/be/src/runtime_filter/runtime_filter_producer.h index 94969605536..252b12616ee 100644 --- a/be/src/runtime_filter/runtime_filter_producer.h +++ b/be/src/runtime_filter/runtime_filter_producer.h @@ -83,9 +83,11 @@ public: void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State state, std::string reason = "") { - if (set_state(State::READY_TO_PUBLISH)) { - _wrapper->set_state(state, reason); + if (_rf_state == State::PUBLISHED || _rf_state == State::READY_TO_PUBLISH) { + return; } + _wrapper->set_state(state, reason); // set wrapper firstly to pass set_synced_size's check + set_state(State::READY_TO_PUBLISH); } static std::string to_string(const State& state) { diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h b/be/src/runtime_filter/runtime_filter_wrapper.h index 7cee88fe666..390c4fd41b3 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.h +++ b/be/src/runtime_filter/runtime_filter_wrapper.h @@ -98,7 +98,7 @@ public: void check_state(std::vector<State> assumed_states) const { if (!check_state_impl<RuntimeFilterWrapper>(_state, assumed_states)) { throw Exception(ErrorCode::INTERNAL_ERROR, - "producer meet invalid state, {}, assumed_states is {}", debug_string(), + "wrapper meet invalid state, {}, assumed_states is {}", debug_string(), states_to_string<RuntimeFilterWrapper>(assumed_states)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index eac77eea078..d5370347dbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -587,8 +587,6 @@ public class SessionVariable implements Serializable, Writable { public static final String LOW_MEMORY_MODE_BUFFER_LIMIT = "low_memory_mode_buffer_limit"; public static final String DUMP_HEAP_PROFILE_WHEN_MEM_LIMIT_EXCEEDED = "dump_heap_profile_when_mem_limit_exceeded"; - public static final String FUZZY_DISABLE_RUNTIME_FILTER_IN_BE = "fuzzy_disable_runtime_filter_in_be"; - public static final String GENERATE_STATS_FACTOR = "generate_stats_factor"; public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS @@ -2351,13 +2349,6 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = SPILL_SORT_BATCH_BYTES) public long spillSortBatchBytes = 8388608; // 8M - @VariableMgr.VarAttr( - name = FUZZY_DISABLE_RUNTIME_FILTER_IN_BE, - description = {"在 BE 上开启禁用 runtime filter 的随机开关,用于测试", - "Disable the runtime filter on the BE for testing purposes."}, - needForward = true, fuzzy = true) - public boolean fuzzyDisableRuntimeFilterInBE = false; - @VariableMgr.VarAttr(name = SPILL_AGGREGATION_PARTITION_COUNT, fuzzy = true) public int spillAggregationPartitionCount = 32; @@ -2617,7 +2608,6 @@ public class SessionVariable implements Serializable, Writable { } } - this.fuzzyDisableRuntimeFilterInBE = random.nextBoolean(); this.runtimeFilterWaitInfinitely = random.nextBoolean(); // set random 1, 10, 100, 1000, 10000 @@ -4073,7 +4063,6 @@ public class SessionVariable implements Serializable, Writable { tResult.setDumpHeapProfileWhenMemLimitExceeded(dumpHeapProfileWhenMemLimitExceeded); tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); - tResult.setFuzzyDisableRuntimeFilterInBe(fuzzyDisableRuntimeFilterInBE); tResult.setLowMemoryModeBufferLimit(lowMemoryModeBufferLimit); tResult.setEnableLocalMergeSort(enableLocalMergeSort); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 38d302ef574..a0c3a20dc81 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -367,7 +367,7 @@ struct TQueryOptions { 144: optional bool enable_inverted_index_searcher_cache = true; 145: optional bool enable_inverted_index_query_cache = true; - 146: optional bool fuzzy_disable_runtime_filter_in_be = false; + 146: optional bool fuzzy_disable_runtime_filter_in_be = false; // deprecated 147: optional i32 profile_level = 1; diff --git a/regression-test/data/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.out b/regression-test/data/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.out new file mode 100644 index 00000000000..42b0346861d Binary files /dev/null and b/regression-test/data/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.out differ diff --git a/regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy b/regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy new file mode 100644 index 00000000000..3b91bf157eb --- /dev/null +++ b/regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_inject_send_filter_size_fail") { + sql "set enable_spill=false" + sql "set enable_force_spill=false" + sql "set parallel_pipeline_task_num=3" + sql "set enable_runtime_filter_prune=false" + sql "set enable_sync_runtime_filter_size=true" + sql "set runtime_filter_type='IN_OR_BLOOM_FILTER,MIN_MAX'" + sql "set runtime_filter_wait_infinitely=false" + + sql """ drop table if exists t1; """ + sql """ drop table if exists t3; """ + sql """ drop table if exists t5; """ + + sql """ + create table t1 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + """ + + sql """ + create table t3 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + + """ + + sql """ + create table t5 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + """ + + sql """ + insert into t1 select e1,e1 from (select 1 k1) as t lateral view explode_numbers(100000) tmp1 as e1; + """ + + sql """ + insert into t3 values(1,1),(2,2),(3,3); + """ + + sql """ + insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5); + """ + + sql "analyze table t1 with sync;" + sql "analyze table t3 with sync;" + sql "analyze table t5 with sync;" + + qt_normal "select count(*),sleep(2) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + + try { + GetDebugPoint().enableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail") + sql "set ignore_runtime_filter_error = false" + test { + sql """select count(*),sleep(2) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;""" + exception "RPC meet failed" + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail") + sql "set ignore_runtime_filter_error = true" + qt_rpc_failed "select count(*),sleep(2) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof") + qt_eof "select count(*),sleep(2) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail") + sql "set ignore_runtime_filter_error = true" + qt_rpc_failed "select count(*) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof") + qt_eof "select count(*) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof") + } +} diff --git a/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy b/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy index 3f227b22f34..046b6183cb1 100644 --- a/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy +++ b/regression-test/suites/query_p0/join/test_low_bucket/test_low_bucket.groovy @@ -17,6 +17,7 @@ suite("test_low_bucket") { sql "set enable_spill = false" // spill will cause rf not_ready + sql "set enable_force_spill = false" sql "set runtime_filter_wait_infinitely = true" sql "set parallel_pipeline_task_num = 4" diff --git a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy index 0b36d2da5ab..5c6c678fabd 100644 --- a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy +++ b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy @@ -16,6 +16,8 @@ // under the License. suite("test_slow_close") { + sql "set enable_spill = false" + sql "set enable_force_spill = false" sql "set disable_join_reorder=true;" sql "set runtime_filter_type='bloom_filter';" sql "set parallel_pipeline_task_num=3" --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
