This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 14e7eb76243 [Opt](rf) Opt broadcast join remote runtime filter merge
and wait (#29439)
14e7eb76243 is described below
commit 14e7eb762430b251869737eeef6ee3ac83ce3707
Author: HappenLee <[email protected]>
AuthorDate: Wed Jan 3 11:21:28 2024 +0800
[Opt](rf) Opt broadcast join remote runtime filter merge and wait (#29439)
---
be/src/exprs/runtime_filter.cpp | 10 +++++++---
be/src/exprs/runtime_filter.h | 2 +-
be/src/exprs/runtime_filter_slots.h | 6 +++---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 +-
be/src/runtime/runtime_filter_mgr.cpp | 10 +++++-----
be/src/vec/exec/join/vhash_join_node.cpp | 2 +-
.../main/java/org/apache/doris/planner/RuntimeFilter.java | 4 ++++
.../src/main/java/org/apache/doris/qe/Coordinator.java | 12 ++++++++++--
8 files changed, 32 insertions(+), 16 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 8e8e3dfd8cd..f52a9574bf6 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -985,7 +985,7 @@ Status
IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, int*
return Status::OK();
}
-Status IRuntimeFilter::publish() {
+Status IRuntimeFilter::publish(bool publish_local) {
DCHECK(is_producer());
if (_is_global) {
std::vector<IRuntimeFilter*> filters;
@@ -1010,13 +1010,17 @@ Status IRuntimeFilter::publish() {
filter->update_runtime_filter_type_to_profile();
filter->signal();
}
- return Status::OK();
- } else {
+ } else if (!publish_local) {
TNetworkAddress addr;
DCHECK(_state != nullptr);
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
return push_to_remote(_state, &addr, _opt_remote_rf);
+ } else {
+ // remote broadcast join only push onetime in build shared hash table
+ // publish_local only set true on copy shared hash table
+ DCHECK(_is_broadcast_join);
}
+ return Status::OK();
}
Status
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>&
probe_ctxs,
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 32fee99173e..2673308ae77 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -228,7 +228,7 @@ public:
// publish filter
// push filter to remote node or push down it to scan_node
- Status publish();
+ Status publish(bool publish_local = false);
RuntimeFilterType type() const { return _runtime_filter_type; }
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index 7223b652a3c..d539e295ae8 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -204,10 +204,10 @@ public:
}
// publish runtime filter
- Status publish() {
+ Status publish(bool publish_local = false) {
for (auto& pair : _runtime_filters) {
- for (auto filter : pair.second) {
- RETURN_IF_ERROR(filter->publish());
+ for (auto& filter : pair.second) {
+ RETURN_IF_ERROR(filter->publish(publish_local));
}
}
return Status::OK();
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 8c1ff852433..3d9fd501917 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -568,7 +568,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
RETURN_IF_ERROR(
local_state._runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));
-
RETURN_IF_ERROR(local_state._runtime_filter_slots->publish());
+
RETURN_IF_ERROR(local_state._runtime_filter_slots->publish(true));
return Status::OK();
}},
*local_state._shared_state->hash_table_variants);
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index b67bc5ffd3b..73a043070e3 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -328,6 +328,10 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
cntVal = iter->second.first;
{
std::lock_guard<std::mutex> l(*iter->second.second);
+ // Skip the other broadcast join runtime filter
+ if (cntVal->arrive_id.size() == 1 &&
cntVal->runtime_filter_desc.is_broadcast_join) {
+ return Status::OK();
+ }
MergeRuntimeFilterParams params(request, attach_data);
ObjectPool* pool = cntVal->pool.get();
RuntimeFilterWrapperHolder holder;
@@ -339,11 +343,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
VLOG_ROW << "merge size:" << merged_size << ":" <<
cntVal->producer_size;
DCHECK_LE(merged_size, cntVal->producer_size);
cntVal->merge_time += (MonotonicMillis() - start_merge);
- if (merged_size < cntVal->producer_size) {
- return Status::OK();
- } else {
- merge_time = cntVal->merge_time;
- }
+ merge_time = cntVal->merge_time;
}
if (merged_size == cntVal->producer_size) {
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 60391716e28..b135ada7237 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -824,7 +824,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
state, arg.hash_table->size()));
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));
-
RETURN_IF_ERROR(_runtime_filter_slots->publish());
+
RETURN_IF_ERROR(_runtime_filter_slots->publish(true));
return Status::OK();
}},
*_hash_table_variants);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index d21f390c04d..646a07221a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -604,6 +604,10 @@ public final class RuntimeFilter {
isBroadcastJoin = isBroadcast;
}
+ public boolean isBroadcast() {
+ return isBroadcastJoin;
+ }
+
public void computeNdvEstimate() {
if (ndvEstimate < 0) {
ndvEstimate = builderNode.getChild(1).getCardinalityAfterFilter();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 421fa1ac523..9bfca2802eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3627,6 +3627,9 @@ public class Coordinator implements CoordInterface {
params.params.setRuntimeFilterParams(new
TRuntimeFilterParams());
params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
if
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
+ Set<Integer> broadCastRf =
assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
+ .map(r ->
r.getFilterId().asInt()).collect(Collectors.toSet());
+
for (RuntimeFilter rf : assignedRuntimeFilters) {
if (!ridToTargetParam.containsKey(rf.getFilterId())) {
continue;
@@ -3668,7 +3671,8 @@ public class Coordinator implements CoordInterface {
}
for (Map.Entry<RuntimeFilterId, Integer> entry :
ridToBuilderNum.entrySet()) {
params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(
- entry.getKey().asInt(), entry.getValue());
+ entry.getKey().asInt(),
broadCastRf.contains(entry.getKey().asInt())
+ ? 1 : entry.getValue());
}
for (RuntimeFilter rf : assignedRuntimeFilters) {
params.params.runtime_filter_params.putToRidToRuntimeFilter(
@@ -3753,6 +3757,9 @@ public class Coordinator implements CoordInterface {
localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
localParams.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
if
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
+ Set<Integer> broadCastRf =
assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
+ .map(r ->
r.getFilterId().asInt()).collect(Collectors.toSet());
+
for (RuntimeFilter rf : assignedRuntimeFilters) {
if (!ridToTargetParam.containsKey(rf.getFilterId())) {
continue;
@@ -3795,7 +3802,8 @@ public class Coordinator implements CoordInterface {
}
for (Map.Entry<RuntimeFilterId, Integer> entry :
ridToBuilderNum.entrySet()) {
localParams.runtime_filter_params.putToRuntimeFilterBuilderNum(
- entry.getKey().asInt(), entry.getValue());
+ entry.getKey().asInt(),
broadCastRf.contains(entry.getKey().asInt()) ? 1 :
+ entry.getValue());
}
for (RuntimeFilter rf : assignedRuntimeFilters) {
localParams.runtime_filter_params.putToRidToRuntimeFilter(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]