This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 27d094afe97 [feat](spill) support runtime filter (#46972)
27d094afe97 is described below
commit 27d094afe973c99b88bf7e2ddb20b4f005700173
Author: Jerry Hu <[email protected]>
AuthorDate: Tue Jan 14 22:09:33 2025 +0800
[feat](spill) support runtime filter (#46972)
---
.../pipeline/exec/partitioned_hash_join_sink_operator.cpp | 2 ++
be/src/pipeline/pipeline_fragment_context.cpp | 14 ++++++++++----
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 8d100dd4530..4da7abec23e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -176,6 +176,8 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
if (inner_sink_state) {
build_block = inner_sink_state->_build_side_mutable_block.to_block();
block_old_mem = build_block.allocated_bytes();
+ RETURN_IF_ERROR(inner_sink_state->disable_runtime_filters(
+ _shared_state->inner_runtime_state.get()));
}
if (build_block.rows() <= 1) {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 09a14c66a7f..2d96b7f7b1d 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1376,20 +1376,22 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
const auto enable_spill = _runtime_state->enable_spill();
if (enable_spill && !is_broadcast_join) {
auto tnode_ = tnode;
- /// TODO: support rf in partitioned hash join
tnode_.runtime_filters.clear();
uint32_t partition_count =
_runtime_state->spill_hash_join_partition_count();
auto inner_probe_operator =
std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0,
descs);
- auto inner_sink_operator =
+ // probe side inner sink operator is used to build hash table on
probe side when data is spilled.
+ // So here use `tnode_` which has no runtime filters.
+ auto probe_side_inner_sink_operator =
std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0,
tnode_, descs);
RETURN_IF_ERROR(inner_probe_operator->init(tnode_,
_runtime_state.get()));
- RETURN_IF_ERROR(inner_sink_operator->init(tnode_,
_runtime_state.get()));
+ RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_,
_runtime_state.get()));
auto probe_operator =
std::make_shared<PartitionedHashJoinProbeOperatorX>(
pool, tnode_, next_operator_id(), descs, partition_count);
- probe_operator->set_inner_operators(inner_sink_operator,
inner_probe_operator);
+ probe_operator->set_inner_operators(probe_side_inner_sink_operator,
+ inner_probe_operator);
op = std::move(probe_operator);
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
@@ -1401,8 +1403,12 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
+ auto inner_sink_operator =
+ std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0,
tnode, descs);
auto sink_operator =
std::make_shared<PartitionedHashJoinSinkOperatorX>(
pool, next_sink_operator_id(), tnode_, descs,
partition_count);
+
+ RETURN_IF_ERROR(inner_sink_operator->init(tnode,
_runtime_state.get()));
sink_operator->set_inner_operators(inner_sink_operator,
inner_probe_operator);
DataSinkOperatorPtr sink = std::move(sink_operator);
sink->set_dests_id({op->operator_id()});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]