This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 33726b0e5a8 [Feature](runtime-filter) cancel query when 
runtime_filter's rpc failed (#43627)
33726b0e5a8 is described below

commit 33726b0e5a8c49bc72207d21e209ba3d4d3ce9e3
Author: Pxl <[email protected]>
AuthorDate: Mon Nov 18 10:17:28 2024 +0800

    [Feature](runtime-filter) cancel query when runtime_filter's rpc failed 
(#43627)
    
    ### What problem does this PR solve?
    cancel query when runtime_filter's rpc failed
---
 be/src/common/status.h                             |  1 +
 be/src/exprs/runtime_filter.cpp                    | 23 +++++++++++++--------
 be/src/exprs/runtime_filter.h                      |  4 ++--
 be/src/exprs/runtime_filter_slots.h                |  4 ++--
 be/src/exprs/runtime_filter_slots_cross.h          |  4 ++--
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  2 +-
 .../exec/nested_loop_join_build_operator.cpp       |  2 +-
 be/src/runtime/fragment_mgr.cpp                    |  4 ++--
 be/src/runtime/query_context.h                     |  6 ++++++
 be/src/runtime/runtime_filter_mgr.cpp              | 14 +++++++++----
 be/src/runtime/runtime_filter_mgr.h                |  5 +++--
 be/src/runtime/runtime_state.cpp                   |  6 +++++-
 be/src/runtime/runtime_state.h                     |  2 ++
 be/src/util/ref_count_closure.h                    | 24 +++++++++++++++++-----
 .../java/org/apache/doris/qe/SessionVariable.java  |  5 +++++
 gensrc/thrift/PaloInternalService.thrift           |  2 ++
 16 files changed, 78 insertions(+), 30 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index de029d87ec9..344f82a81b8 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -489,6 +489,7 @@ public:
     ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN)
     ERROR_CTOR_NOSTACK(CgroupError, CGROUP_ERROR)
     ERROR_CTOR_NOSTACK(ObtainLockFailed, OBTAIN_LOCK_FAILED)
+    ERROR_CTOR_NOSTACK(NetworkError, NETWORK_ERROR)
 #undef ERROR_CTOR
 
     template <int code>
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 85f1c535c70..d05bb6fa3cf 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -990,14 +990,14 @@ void IRuntimeFilter::insert_batch(const 
vectorized::ColumnPtr column, size_t sta
     _wrapper->insert_batch(column, start);
 }
 
-Status IRuntimeFilter::publish(bool publish_local) {
+Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) {
     DCHECK(is_producer());
 
     auto send_to_remote = [&](IRuntimeFilter* filter) {
         TNetworkAddress addr;
         DCHECK(_state != nullptr);
         RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
-        return filter->push_to_remote(&addr);
+        return filter->push_to_remote(state, &addr);
     };
     auto send_to_local = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper) 
{
         std::vector<std::shared_ptr<IRuntimeFilter>> filters;
@@ -1088,8 +1088,10 @@ public:
     SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
                     
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
                     std::shared_ptr<pipeline::Dependency> dependency,
-                    RuntimeFilterContextSPtr rf_context)
-            : Base(req, callback), _dependency(std::move(dependency)), 
_rf_context(rf_context) {}
+                    RuntimeFilterContextSPtr rf_context, 
std::weak_ptr<QueryContext> context)
+            : Base(req, callback, context),
+              _dependency(std::move(dependency)),
+              _rf_context(rf_context) {}
 };
 
 Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t 
local_filter_size) {
@@ -1133,8 +1135,10 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* 
state, uint64_t local_filt
     auto callback = 
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
     // IRuntimeFilter maybe deconstructed before the rpc finished, so that 
could not use
     // a raw pointer in closure. Has to use the context's shared ptr.
-    auto closure =
-            SyncSizeClosure::create_unique(request, callback, _dependency, 
_wrapper->_context);
+    auto closure = SyncSizeClosure::create_unique(
+            request, callback, _dependency, _wrapper->_context,
+            state->query_options().ignore_runtime_filter_error ? 
std::weak_ptr<QueryContext> {}
+                                                               : 
state->get_query_ctx_weak());
     auto* pquery_id = request->mutable_query_id();
     pquery_id->set_hi(_state->query_id.hi());
     pquery_id->set_lo(_state->query_id.lo());
@@ -1157,7 +1161,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* 
state, uint64_t local_filt
     return Status::OK();
 }
 
-Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
+Status IRuntimeFilter::push_to_remote(RuntimeState* state, const 
TNetworkAddress* addr) {
     DCHECK(is_producer());
     std::shared_ptr<PBackendService_Stub> stub(
             _state->exec_env->brpc_internal_client_cache()->get_client(*addr));
@@ -1170,7 +1174,10 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr) {
     auto merge_filter_callback = 
DummyBrpcCallback<PMergeFilterResponse>::create_shared();
     auto merge_filter_closure =
             AutoReleaseClosure<PMergeFilterRequest, 
DummyBrpcCallback<PMergeFilterResponse>>::
-                    create_unique(merge_filter_request, merge_filter_callback);
+                    create_unique(merge_filter_request, merge_filter_callback,
+                                  
state->query_options().ignore_runtime_filter_error
+                                          ? std::weak_ptr<QueryContext> {}
+                                          : state->get_query_ctx_weak());
     void* data = nullptr;
     int len = 0;
 
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index f5a069d9e55..84a7f36c8a8 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -225,7 +225,7 @@ public:
 
     // publish filter
     // push filter to remote node or push down it to scan_node
-    Status publish(bool publish_local = false);
+    Status publish(RuntimeState* state, bool publish_local = false);
 
     Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);
 
@@ -293,7 +293,7 @@ public:
     bool need_sync_filter_size();
 
     // async push runtimefilter to remote node
-    Status push_to_remote(const TNetworkAddress* addr);
+    Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr);
 
     void init_profile(RuntimeProfile* parent_profile);
 
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 42c5f598633..3c18735e4e8 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -149,10 +149,10 @@ public:
     }
 
     // publish runtime filter
-    Status publish(bool publish_local) {
+    Status publish(RuntimeState* state, bool publish_local) {
         for (auto& pair : _runtime_filters_map) {
             for (auto& filter : pair.second) {
-                RETURN_IF_ERROR(filter->publish(publish_local));
+                RETURN_IF_ERROR(filter->publish(state, publish_local));
             }
         }
         return Status::OK();
diff --git a/be/src/exprs/runtime_filter_slots_cross.h 
b/be/src/exprs/runtime_filter_slots_cross.h
index 01ae21a7599..a49f2928f84 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -72,9 +72,9 @@ public:
         return Status::OK();
     }
 
-    Status publish() {
+    Status publish(RuntimeState* state) {
         for (auto filter : _runtime_filters) {
-            RETURN_IF_ERROR(filter->publish());
+            RETURN_IF_ERROR(filter->publish(state));
         }
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 37de9ac93d8..74db3a5c06c 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -157,7 +157,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         }
     }
     SCOPED_TIMER(_publish_runtime_filter_timer);
-    RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
+    RETURN_IF_ERROR(_runtime_filter_slots->publish(state, 
!_should_build_hash_table));
     return Base::close(state, exec_status);
 }
 
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index 83b378e792c..41cd8068dd7 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -43,7 +43,7 @@ struct RuntimeFilterBuild {
         }
         {
             SCOPED_TIMER(_parent->publish_runtime_filter_timer());
-            RETURN_IF_ERROR(runtime_filter_slots.publish());
+            RETURN_IF_ERROR(runtime_filter_slots.publish(state));
         }
 
         return Status::OK();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 2896190f606..c4f633d84aa 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1271,7 +1271,7 @@ Status FragmentMgr::send_filter_size(const 
PSendFilterSizeRequest* request) {
 
     std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
     RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
-    auto merge_status = filter_controller->send_filter_size(request);
+    auto merge_status = filter_controller->send_filter_size(query_ctx, 
request);
     return merge_status;
 }
 
@@ -1313,7 +1313,7 @@ Status FragmentMgr::merge_filter(const 
PMergeFilterRequest* request,
     SCOPED_ATTACH_TASK(query_ctx.get());
     std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
     RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
-    auto merge_status = filter_controller->merge(request, attach_data);
+    auto merge_status = filter_controller->merge(query_ctx, request, 
attach_data);
     return merge_status;
 }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 47465530405..d557245bf23 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -165,6 +165,12 @@ public:
         return _query_options.__isset.fe_process_uuid ? 
_query_options.fe_process_uuid : 0;
     }
 
+    bool ignore_runtime_filter_error() const {
+        return _query_options.__isset.ignore_runtime_filter_error
+                       ? _query_options.ignore_runtime_filter_error
+                       : false;
+    }
+
     // global runtime filter mgr, the runtime filter have remote target or
     // need local merge should regist here. before publish() or 
push_to_remote()
     // the runtime filter should do the local merge work
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 1a238787207..4b4f4880123 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -305,7 +305,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId 
query_id,
     return Status::OK();
 }
 
-Status RuntimeFilterMergeControllerEntity::send_filter_size(const 
PSendFilterSizeRequest* request) {
+Status 
RuntimeFilterMergeControllerEntity::send_filter_size(std::weak_ptr<QueryContext>
 query_ctx,
+                                                            const 
PSendFilterSizeRequest* request) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     std::shared_ptr<RuntimeFilterCntlVal> cnt_val;
 
@@ -326,6 +327,8 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
 
     Status st = Status::OK();
     if (cnt_val->source_addrs.size() == cnt_val->producer_size) {
+        auto ctx = query_ctx.lock()->ignore_runtime_filter_error() ? 
std::weak_ptr<QueryContext> {}
+                                                                   : query_ctx;
         for (auto addr : cnt_val->source_addrs) {
             std::shared_ptr<PBackendService_Stub> stub(
                     
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
@@ -339,7 +342,7 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
             auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
                                               
DummyBrpcCallback<PSyncFilterSizeResponse>>::
                     create_unique(std::make_shared<PSyncFilterSizeRequest>(),
-                                  
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared());
+                                  
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
 
             auto* pquery_id = closure->request_->mutable_query_id();
             pquery_id->set_hi(_state->query_id.hi());
@@ -377,7 +380,8 @@ Status RuntimeFilterMgr::sync_filter_size(const 
PSyncFilterSizeRequest* request)
 }
 
 // merge data
-Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* 
request,
+Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> 
query_ctx,
+                                                 const PMergeFilterRequest* 
request,
                                                  
butil::IOBufAsZeroCopyInputStream* attach_data) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     std::shared_ptr<RuntimeFilterCntlVal> cnt_val;
@@ -444,12 +448,14 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             has_attachment = true;
         }
 
+        auto ctx = query_ctx.lock()->ignore_runtime_filter_error() ? 
std::weak_ptr<QueryContext> {}
+                                                                   : query_ctx;
         std::vector<TRuntimeFilterTargetParamsV2>& targets = 
cnt_val->targetv2_info;
         for (auto& target : targets) {
             auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
                                               
DummyBrpcCallback<PPublishFilterResponse>>::
                     
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
-                                  
DummyBrpcCallback<PPublishFilterResponse>::create_shared());
+                                  
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
 
             closure->request_->set_filter_id(request->filter_id());
             closure->request_->set_merge_time(merge_time);
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index b0aea7568cf..bac61d6248a 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -156,10 +156,11 @@ public:
                 const TQueryOptions& query_options);
 
     // handle merge rpc
-    Status merge(const PMergeFilterRequest* request,
+    Status merge(std::weak_ptr<QueryContext> query_ctx, const 
PMergeFilterRequest* request,
                  butil::IOBufAsZeroCopyInputStream* attach_data);
 
-    Status send_filter_size(const PSendFilterSizeRequest* request);
+    Status send_filter_size(std::weak_ptr<QueryContext> query_ctx,
+                            const PSendFilterSizeRequest* request);
 
     UniqueId query_id() const { return _query_id; }
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index e3f9d075c8f..38522f49dc3 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -40,6 +40,7 @@
 #include "pipeline/exec/operator.h"
 #include "pipeline/pipeline_task.h"
 #include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
 #include "runtime/load_path_mgr.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/memory/thread_mem_tracker_mgr.h"
@@ -129,7 +130,6 @@ 
RuntimeState::RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId&
         : _profile("Fragment " + print_id(instance_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
-          _runtime_filter_mgr(nullptr),
           _unreported_error_idx(0),
           _query_id(query_id),
           _fragment_id(fragment_id),
@@ -294,6 +294,10 @@ Status RuntimeState::init(const TUniqueId& 
fragment_instance_id, const TQueryOpt
     return Status::OK();
 }
 
+std::weak_ptr<QueryContext> RuntimeState::get_query_ctx_weak() {
+    return 
_exec_env->fragment_mgr()->get_or_erase_query_ctx_with_lock(_query_ctx->query_id());
+}
+
 void RuntimeState::init_mem_trackers(const std::string& name, const TUniqueId& 
id) {
     _query_mem_tracker = MemTrackerLimiter::create_shared(
             MemTrackerLimiter::Type::OTHER, fmt::format("{}#Id={}", name, 
print_id(id)));
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 88deee491d1..73f854896f4 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -449,6 +449,8 @@ public:
 
     QueryContext* get_query_ctx() { return _query_ctx; }
 
+    std::weak_ptr<QueryContext> get_query_ctx_weak();
+
     void set_query_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& 
tracker) {
         _query_mem_tracker = tracker;
     }
diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h
index 92772a82373..560aebb98ee 100644
--- a/be/src/util/ref_count_closure.h
+++ b/be/src/util/ref_count_closure.h
@@ -20,7 +20,9 @@
 #include <google/protobuf/stubs/common.h>
 
 #include <atomic>
+#include <utility>
 
+#include "runtime/query_context.h"
 #include "runtime/thread_context.h"
 #include "service/brpc.h"
 #include "util/ref_count_closure.h"
@@ -79,8 +81,9 @@ class AutoReleaseClosure : public google::protobuf::Closure {
     ENABLE_FACTORY_CREATOR(AutoReleaseClosure);
 
 public:
-    AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> 
callback)
-            : request_(req), callback_(callback) {
+    AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> 
callback,
+                       std::weak_ptr<QueryContext> context = {})
+            : request_(req), callback_(callback), context_(std::move(context)) 
{
         this->cntl_ = callback->cntl_;
         this->response_ = callback->response_;
     }
@@ -113,12 +116,22 @@ public:
 
 protected:
     virtual void _process_if_rpc_failed() {
-        LOG(WARNING) << "RPC meet failed: " << cntl_->ErrorText();
+        std::string error_msg = "RPC meet failed: " + cntl_->ErrorText();
+        if (auto ctx = context_.lock(); ctx) {
+            ctx->cancel(Status::NetworkError(error_msg));
+        } else {
+            LOG(WARNING) << error_msg;
+        }
     }
 
     virtual void _process_if_meet_error_status(const Status& status) {
-        // no need to log END_OF_FILE, reduce the unlessful log
-        if (!status.is<ErrorCode::END_OF_FILE>()) {
+        if (status.is<ErrorCode::END_OF_FILE>()) {
+            // no need to log END_OF_FILE, reduce the unlessful log
+            return;
+        }
+        if (auto ctx = context_.lock(); ctx) {
+            ctx->cancel(status);
+        } else {
             LOG(WARNING) << "RPC meet error status: " << status;
         }
     }
@@ -136,6 +149,7 @@ private:
     // Use a weak ptr to keep the callback, so that the callback can be 
deleted if the main
     // thread is freed.
     Weak callback_;
+    std::weak_ptr<QueryContext> context_;
 };
 
 } // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8cb2795bbfc..c5274a38647 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1173,6 +1173,10 @@ public class SessionVariable implements Serializable, 
Writable {
             "Force the sort algorithm of SortNode to be specified" })
     public String forceSortAlgorithm = "";
 
+    @VariableMgr.VarAttr(name = "ignore_runtime_filter_error", needForward = 
true, description = { "在rf遇到错误的时候忽略该rf",
+            "Ignore the rf when it encounters an error" })
+    public boolean ignoreRuntimeFilterError = false;
+
     @VariableMgr.VarAttr(name = RUNTIME_FILTER_MODE, needForward = true)
     private String runtimeFilterMode = "GLOBAL";
 
@@ -3971,6 +3975,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setOrcTinyStripeThresholdBytes(orcTinyStripeThresholdBytes);
         tResult.setOrcMaxMergeDistanceBytes(orcMaxMergeDistanceBytes);
         tResult.setOrcOnceMaxReadBytes(orcOnceMaxReadBytes);
+        tResult.setIgnoreRuntimeFilterError(ignoreRuntimeFilterError);
 
         return tResult;
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 29fecc27539..392aa8658df 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -356,6 +356,8 @@ struct TQueryOptions {
   138: optional i64 orc_tiny_stripe_threshold_bytes = 8388608;
   139: optional i64 orc_once_max_read_bytes = 8388608;
   140: optional i64 orc_max_merge_distance_bytes = 1048576;
+
+  141: optional bool ignore_runtime_filter_error = false;
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to