This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 14e7eb76243 [Opt](rf) Opt broadcast join remote runtime filter merge 
and wait (#29439)
14e7eb76243 is described below

commit 14e7eb762430b251869737eeef6ee3ac83ce3707
Author: HappenLee <[email protected]>
AuthorDate: Wed Jan 3 11:21:28 2024 +0800

    [Opt](rf) Opt broadcast join remote runtime filter merge and wait (#29439)
---
 be/src/exprs/runtime_filter.cpp                              | 10 +++++++---
 be/src/exprs/runtime_filter.h                                |  2 +-
 be/src/exprs/runtime_filter_slots.h                          |  6 +++---
 be/src/pipeline/exec/hashjoin_build_sink.cpp                 |  2 +-
 be/src/runtime/runtime_filter_mgr.cpp                        | 10 +++++-----
 be/src/vec/exec/join/vhash_join_node.cpp                     |  2 +-
 .../main/java/org/apache/doris/planner/RuntimeFilter.java    |  4 ++++
 .../src/main/java/org/apache/doris/qe/Coordinator.java       | 12 ++++++++++--
 8 files changed, 32 insertions(+), 16 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 8e8e3dfd8cd..f52a9574bf6 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -985,7 +985,7 @@ Status 
IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, int*
     return Status::OK();
 }
 
-Status IRuntimeFilter::publish() {
+Status IRuntimeFilter::publish(bool publish_local) {
     DCHECK(is_producer());
     if (_is_global) {
         std::vector<IRuntimeFilter*> filters;
@@ -1010,13 +1010,17 @@ Status IRuntimeFilter::publish() {
             filter->update_runtime_filter_type_to_profile();
             filter->signal();
         }
-        return Status::OK();
-    } else {
+    } else if (!publish_local) {
         TNetworkAddress addr;
         DCHECK(_state != nullptr);
         RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
         return push_to_remote(_state, &addr, _opt_remote_rf);
+    } else {
+        // remote broadcast join only push onetime in build shared hash table
+        // publish_local only set true on copy shared hash table
+        DCHECK(_is_broadcast_join);
     }
+    return Status::OK();
 }
 
 Status 
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& 
probe_ctxs,
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 32fee99173e..2673308ae77 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -228,7 +228,7 @@ public:
 
     // publish filter
     // push filter to remote node or push down it to scan_node
-    Status publish();
+    Status publish(bool publish_local = false);
 
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 7223b652a3c..d539e295ae8 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -204,10 +204,10 @@ public:
     }
 
     // publish runtime filter
-    Status publish() {
+    Status publish(bool publish_local = false) {
         for (auto& pair : _runtime_filters) {
-            for (auto filter : pair.second) {
-                RETURN_IF_ERROR(filter->publish());
+            for (auto& filter : pair.second) {
+                RETURN_IF_ERROR(filter->publish(publish_local));
             }
         }
         return Status::OK();
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 8c1ff852433..3d9fd501917 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -568,7 +568,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                                 RETURN_IF_ERROR(
                                         
local_state._runtime_filter_slots->copy_from_shared_context(
                                                 _shared_hash_table_context));
-                                
RETURN_IF_ERROR(local_state._runtime_filter_slots->publish());
+                                
RETURN_IF_ERROR(local_state._runtime_filter_slots->publish(true));
                                 return Status::OK();
                             }},
                     *local_state._shared_state->hash_table_variants);
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index b67bc5ffd3b..73a043070e3 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -328,6 +328,10 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
     cntVal = iter->second.first;
     {
         std::lock_guard<std::mutex> l(*iter->second.second);
+        // Skip the other broadcast join runtime filter
+        if (cntVal->arrive_id.size() == 1 && 
cntVal->runtime_filter_desc.is_broadcast_join) {
+            return Status::OK();
+        }
         MergeRuntimeFilterParams params(request, attach_data);
         ObjectPool* pool = cntVal->pool.get();
         RuntimeFilterWrapperHolder holder;
@@ -339,11 +343,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
         VLOG_ROW << "merge size:" << merged_size << ":" << 
cntVal->producer_size;
         DCHECK_LE(merged_size, cntVal->producer_size);
         cntVal->merge_time += (MonotonicMillis() - start_merge);
-        if (merged_size < cntVal->producer_size) {
-            return Status::OK();
-        } else {
-            merge_time = cntVal->merge_time;
-        }
+        merge_time = cntVal->merge_time;
     }
 
     if (merged_size == cntVal->producer_size) {
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 60391716e28..b135ada7237 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -824,7 +824,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
                                           state, arg.hash_table->size()));
                                   
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
                                           _shared_hash_table_context));
-                                  
RETURN_IF_ERROR(_runtime_filter_slots->publish());
+                                  
RETURN_IF_ERROR(_runtime_filter_slots->publish(true));
                                   return Status::OK();
                               }},
                     *_hash_table_variants);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index d21f390c04d..646a07221a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -604,6 +604,10 @@ public final class RuntimeFilter {
         isBroadcastJoin = isBroadcast;
     }
 
+    public boolean isBroadcast() {
+        return isBroadcastJoin;
+    }
+
     public void computeNdvEstimate() {
         if (ndvEstimate < 0) {
             ndvEstimate = builderNode.getChild(1).getCardinalityAfterFilter();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 421fa1ac523..9bfca2802eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3627,6 +3627,9 @@ public class Coordinator implements CoordInterface {
                 params.params.setRuntimeFilterParams(new 
TRuntimeFilterParams());
                 
params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
                 if 
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
+                    Set<Integer> broadCastRf = 
assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
+                            .map(r -> 
r.getFilterId().asInt()).collect(Collectors.toSet());
+
                     for (RuntimeFilter rf : assignedRuntimeFilters) {
                         if (!ridToTargetParam.containsKey(rf.getFilterId())) {
                             continue;
@@ -3668,7 +3671,8 @@ public class Coordinator implements CoordInterface {
                     }
                     for (Map.Entry<RuntimeFilterId, Integer> entry : 
ridToBuilderNum.entrySet()) {
                         
params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(
-                                entry.getKey().asInt(), entry.getValue());
+                                entry.getKey().asInt(), 
broadCastRf.contains(entry.getKey().asInt())
+                                        ? 1 : entry.getValue());
                     }
                     for (RuntimeFilter rf : assignedRuntimeFilters) {
                         
params.params.runtime_filter_params.putToRidToRuntimeFilter(
@@ -3753,6 +3757,9 @@ public class Coordinator implements CoordInterface {
                 localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
                 
localParams.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
                 if 
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
+                    Set<Integer> broadCastRf = 
assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
+                            .map(r -> 
r.getFilterId().asInt()).collect(Collectors.toSet());
+
                     for (RuntimeFilter rf : assignedRuntimeFilters) {
                         if (!ridToTargetParam.containsKey(rf.getFilterId())) {
                             continue;
@@ -3795,7 +3802,8 @@ public class Coordinator implements CoordInterface {
                     }
                     for (Map.Entry<RuntimeFilterId, Integer> entry : 
ridToBuilderNum.entrySet()) {
                         
localParams.runtime_filter_params.putToRuntimeFilterBuilderNum(
-                                entry.getKey().asInt(), entry.getValue());
+                                entry.getKey().asInt(), 
broadCastRf.contains(entry.getKey().asInt()) ? 1 :
+                                        entry.getValue());
                     }
                     for (RuntimeFilter rf : assignedRuntimeFilters) {
                         
localParams.runtime_filter_params.putToRidToRuntimeFilter(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to