This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9b3fb4a4c73 [Bug](runtime-filter) fix wrong build_bf_exactly when sync
filter siz… #44716 (#46965)
9b3fb4a4c73 is described below
commit 9b3fb4a4c73d9dcf152f47930d439eae4953b660
Author: Pxl <[email protected]>
AuthorDate: Tue Jan 14 17:07:40 2025 +0800
[Bug](runtime-filter) fix wrong build_bf_exactly when sync filter siz…
#44716 (#46965)
pick from #44716
---
be/src/exprs/runtime_filter.cpp | 20 +++----------
be/src/exprs/runtime_filter.h | 5 ++--
be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +--
.../exec/nested_loop_join_build_operator.cpp | 2 +-
be/src/runtime/runtime_filter_mgr.cpp | 22 +++++++-------
be/src/runtime/runtime_filter_mgr.h | 8 ++---
be/src/runtime/runtime_state.cpp | 13 ++++----
be/src/runtime/runtime_state.h | 3 +-
.../glue/translator/RuntimeFilterTranslator.java | 1 +
.../org/apache/doris/planner/RuntimeFilter.java | 35 ++++++++++++++++++++--
gensrc/thrift/PlanNodes.thrift | 4 ++-
11 files changed, 66 insertions(+), 51 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 74ac7bd18be..e98243255bd 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -992,11 +992,10 @@ private:
Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const
TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const
RuntimeFilterRole role,
- int node_id, std::shared_ptr<IRuntimeFilter>*
res,
- bool build_bf_exactly) {
+ int node_id, std::shared_ptr<IRuntimeFilter>*
res) {
*res = std::make_shared<IRuntimeFilter>(state, desc);
(*res)->set_role(role);
- return (*res)->init_with_desc(desc, query_options, node_id,
build_bf_exactly);
+ return (*res)->init_with_desc(desc, query_options, node_id);
}
RuntimeFilterContextSPtr& IRuntimeFilter::get_shared_context_ref() {
@@ -1368,7 +1367,7 @@ std::string IRuntimeFilter::formatted_state() const {
}
Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const
TQueryOptions* options,
- int node_id, bool build_bf_exactly) {
+ int node_id) {
// if node_id == -1 , it shouldn't be a consumer
DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
@@ -1390,21 +1389,10 @@ Status IRuntimeFilter::init_with_desc(const
TRuntimeFilterDesc* desc, const TQue
params.runtime_bloom_filter_max_size =
options->__isset.runtime_bloom_filter_max_size
?
options->runtime_bloom_filter_max_size
: 0;
- auto sync_filter_size = desc->__isset.sync_filter_size &&
desc->sync_filter_size;
- // We build runtime filter by exact distinct count iff three conditions
are met:
- // 1. Only 1 join key
- // 2. Bloom filter
- // 3. Size of all bloom filters will be same (size will be sync or this is
a broadcast join).
- params.build_bf_exactly =
- build_bf_exactly && (_runtime_filter_type ==
RuntimeFilterType::BLOOM_FILTER ||
- _runtime_filter_type ==
RuntimeFilterType::IN_OR_BLOOM_FILTER);
+ params.build_bf_exactly = desc->__isset.build_bf_exactly &&
desc->build_bf_exactly;
params.bloom_filter_size_calculated_by_ndv =
desc->bloom_filter_size_calculated_by_ndv;
- if (!sync_filter_size) {
- params.build_bf_exactly &= !_is_broadcast_join;
- }
-
if (desc->__isset.bloom_filter_size_bytes) {
params.bloom_filter_size = desc->bloom_filter_size_bytes;
}
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index d0bc9be4145..3a3c4a6c416 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -212,8 +212,7 @@ public:
static Status create(RuntimeFilterParamsContext* state, const
TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const
RuntimeFilterRole role,
- int node_id, std::shared_ptr<IRuntimeFilter>* res,
- bool build_bf_exactly = false);
+ int node_id, std::shared_ptr<IRuntimeFilter>* res);
RuntimeFilterContextSPtr& get_shared_context_ref();
@@ -259,7 +258,7 @@ public:
// init filter with desc
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions*
options,
- int node_id = -1, bool build_bf_exactly = false);
+ int node_id = -1);
// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 3f096450204..bcad867495f 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -92,8 +92,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
_hash_table_init(state);
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
- RETURN_IF_ERROR(state->register_producer_runtime_filter(
- p._runtime_filter_descs[i], &_runtime_filters[i],
_build_expr_ctxs.size() == 1));
+
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
+
&_runtime_filters[i]));
}
_runtime_filter_slots =
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index c9f7ee7cf5e..7b531fcd2d5 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -67,7 +67,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkSta
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
-
&_runtime_filters[i], false));
+
&_runtime_filters[i]));
}
return Status::OK();
}
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index f7e22cde144..7e9a0146b21 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -90,7 +90,7 @@ std::vector<std::shared_ptr<IRuntimeFilter>>
RuntimeFilterMgr::get_consume_filte
Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc&
desc,
const TQueryOptions&
options, int node_id,
std::shared_ptr<IRuntimeFilter>* consumer_filter,
- bool build_bf_exactly, bool
need_local_merge) {
+ bool need_local_merge) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
bool has_exist = false;
@@ -110,7 +110,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const
TRuntimeFilterDesc& desc
if (!has_exist) {
std::shared_ptr<IRuntimeFilter> filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
RuntimeFilterRole::CONSUMER,
- node_id, &filter,
build_bf_exactly));
+ node_id, &filter));
_consumer_map[key].emplace_back(node_id, filter);
*consumer_filter = filter;
} else if (!need_local_merge) {
@@ -122,7 +122,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const
TRuntimeFilterDesc& desc
Status RuntimeFilterMgr::register_local_merge_producer_filter(
const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions&
options,
- std::shared_ptr<IRuntimeFilter> producer_filter, bool
build_bf_exactly) {
+ std::shared_ptr<IRuntimeFilter> producer_filter) {
DCHECK(_is_global);
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
@@ -143,8 +143,7 @@ Status
RuntimeFilterMgr::register_local_merge_producer_filter(
if (iter->second.filters.empty()) {
std::shared_ptr<IRuntimeFilter> merge_filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
-
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
- build_bf_exactly));
+
RuntimeFilterRole::PRODUCER, -1, &merge_filter));
merge_filter->set_ignored();
iter->second.filters.emplace_back(merge_filter);
}
@@ -181,10 +180,9 @@ doris::LocalMergeFilters*
RuntimeFilterMgr::get_local_merge_producer_filters(int
return &iter->second;
}
-Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc&
desc,
- const TQueryOptions& options,
-
std::shared_ptr<IRuntimeFilter>* producer_filter,
- bool build_bf_exactly) {
+Status RuntimeFilterMgr::register_producer_filter(
+ const TRuntimeFilterDesc& desc, const TQueryOptions& options,
+ std::shared_ptr<IRuntimeFilter>* producer_filter) {
DCHECK(!_is_global);
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
@@ -196,7 +194,7 @@ Status RuntimeFilterMgr::register_producer_filter(const
TRuntimeFilterDesc& desc
return Status::InvalidArgument("filter has registed");
}
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
RuntimeFilterRole::PRODUCER, -1,
- producer_filter, build_bf_exactly));
+ producer_filter));
_producer_map.emplace(key, *producer_filter);
return Status::OK();
}
@@ -233,8 +231,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state,
runtime_filter_desc));
auto filter_id = runtime_filter_desc->filter_id;
-
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc,
query_options,
- -1, false));
+ RETURN_IF_ERROR(
+ cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc,
query_options, -1));
cnt_val->filter->set_ignored();
_filter_map.emplace(filter_id, cnt_val);
return Status::OK();
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index 0a6f8318fea..9f4cf5f4e22 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -100,19 +100,17 @@ public:
// register filter
Status register_consumer_filter(const TRuntimeFilterDesc& desc, const
TQueryOptions& options,
int node_id,
std::shared_ptr<IRuntimeFilter>* consumer_filter,
- bool build_bf_exactly = false, bool
need_local_merge = false);
+ bool need_local_merge = false);
Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc,
const TQueryOptions& options,
-
std::shared_ptr<IRuntimeFilter> producer_filter,
- bool build_bf_exactly = false);
+
std::shared_ptr<IRuntimeFilter> producer_filter);
Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters**
local_merge_filters);
LocalMergeFilters* get_local_merge_producer_filters(int filter_id);
Status register_producer_filter(const TRuntimeFilterDesc& desc, const
TQueryOptions& options,
- std::shared_ptr<IRuntimeFilter>*
producer_filter,
- bool build_bf_exactly = false);
+ std::shared_ptr<IRuntimeFilter>*
producer_filter);
// update filter by remote
void set_runtime_filter_params(const TRuntimeFilterParams&
runtime_filter_params);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index a22ad18ce04..80b018a4a19 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -522,14 +522,13 @@ RuntimeFilterMgr*
RuntimeState::global_runtime_filter_mgr() {
}
Status RuntimeState::register_producer_runtime_filter(
- const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>*
producer_filter,
- bool build_bf_exactly) {
+ const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>*
producer_filter) {
// Producers are created by local runtime filter mgr and shared by global
runtime filter manager.
// When RF is published, consumers in both global and local RF mgr will be
found.
- RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
- desc, query_options(), producer_filter, build_bf_exactly));
+ RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(desc,
query_options(),
+
producer_filter));
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
- desc, query_options(), *producer_filter, build_bf_exactly));
+ desc, query_options(), *producer_filter));
return Status::OK();
}
@@ -538,10 +537,10 @@ Status RuntimeState::register_consumer_runtime_filter(
std::shared_ptr<IRuntimeFilter>* consumer_filter) {
if (desc.has_remote_targets || need_local_merge) {
return global_runtime_filter_mgr()->register_consumer_filter(desc,
query_options(), node_id,
-
consumer_filter, false, true);
+
consumer_filter, true);
} else {
return local_runtime_filter_mgr()->register_consumer_filter(desc,
query_options(), node_id,
-
consumer_filter, false, false);
+
consumer_filter, false);
}
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index cd9de143522..e86990ae92b 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -561,8 +561,7 @@ public:
}
Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc&
desc,
- std::shared_ptr<IRuntimeFilter>*
producer_filter,
- bool build_bf_exactly);
+ std::shared_ptr<IRuntimeFilter>*
producer_filter);
Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc&
desc,
bool need_local_merge, int node_id,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 3dbd6cfcec7..07e0af60173 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -143,6 +143,7 @@ public class RuntimeFilterTranslator {
targetTupleIdMapList, context.getLimits());
if (node instanceof HashJoinNode) {
origFilter.setIsBroadcast(((HashJoinNode)
node).getDistributionMode() == DistributionMode.BROADCAST);
+ origFilter.setSingleEq(((HashJoinNode)
node).getEqJoinConjuncts().size());
} else {
// nest loop join
origFilter.setIsBroadcast(true);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 2f3948aee16..80497798083 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -112,6 +112,8 @@ public final class RuntimeFilter {
private boolean bloomFilterSizeCalculatedByNdv = false;
+ private boolean singleEq = false;
+
/**
* Internal representation of a runtime filter target.
*/
@@ -216,9 +218,36 @@ public final class RuntimeFilter {
tFilter.setIsBroadcastJoin(isBroadcastJoin);
tFilter.setHasLocalTargets(hasLocalTargets);
tFilter.setHasRemoteTargets(hasRemoteTargets);
+
+ boolean hasSerialTargets = false;
for (RuntimeFilterTarget target : targets) {
tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(),
target.expr.treeToThrift());
+ hasSerialTargets = hasSerialTargets
+ || (target.node.isSerialOperator() &&
target.node.fragment.useSerialSource(ConnectContext.get()));
}
+
+ boolean enableSyncFilterSize = ConnectContext.get() != null
+ &&
ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize();
+
+ // there are two cases has local exchange between join and scan
+ // 1. hasRemoteTargets is true means join probe side do least once
shuffle (has shuffle between join and scan)
+ // 2. hasSerialTargets is true means scan is pooled (has local shuffle
between join and scan)
+ boolean needShuffle = hasRemoteTargets || hasSerialTargets;
+
+ // There are two cases where all instances of rf have the same size.
+ // 1. enableSyncFilterSize is true means backends will collect global
size and send to every instance
+ // 2. isBroadcastJoin is true means each join node instance have the
same full amount of data
+ boolean hasGlobalSize = enableSyncFilterSize || isBroadcastJoin;
+
+ // build runtime filter by exact distinct count if all of 3 conditions
are met:
+ // 1. only single eq conjunct
+ // 2. rf type may be bf
+ // 3. each filter only acts on self instance(do not need any shuffle),
or size of
+ // all filters will be same
+ boolean buildBfExactly = singleEq && (runtimeFilterType ==
TRuntimeFilterType.IN_OR_BLOOM
+ || runtimeFilterType == TRuntimeFilterType.BLOOM) &&
(!needShuffle || hasGlobalSize);
+ tFilter.setBuildBfExactly(buildBfExactly);
+
tFilter.setType(runtimeFilterType);
tFilter.setBloomFilterSizeBytes(filterSizeBytes);
if (runtimeFilterType.equals(TRuntimeFilterType.BITMAP)) {
@@ -239,8 +268,6 @@ public final class RuntimeFilter {
tFilter.setNullAware(false);
}
}
- tFilter.setSyncFilterSize(ConnectContext.get() != null
- &&
ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize());
return tFilter;
}
@@ -597,6 +624,10 @@ public final class RuntimeFilter {
targets.add(target);
}
+ public void setSingleEq(int eqJoinConjunctsNumbers) {
+ singleEq = (eqJoinConjunctsNumbers == 1);
+ }
+
public void setIsBroadcast(boolean isBroadcast) {
isBroadcastJoin = isBroadcast;
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 9aaa7076901..70c6722b9d8 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1301,7 +1301,9 @@ struct TRuntimeFilterDesc {
// true, if join type is null aware like <=>. rf should dispose the case
15: optional bool null_aware;
- 16: optional bool sync_filter_size;
+ 16: optional bool sync_filter_size; // Deprecated
+
+ 17: optional bool build_bf_exactly;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]