This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e23551dba78 [fix](spill) Finish dependency of join sink operator was
released early (#49701)
e23551dba78 is described below
commit e23551dba787428a60447d45394bd2317805fa9a
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Apr 2 20:54:47 2025 +0800
[fix](spill) Finish dependency of join sink operator was released early
(#49701)
### What problem does this PR solve?
The `inner_runtime_state` will be replaced in probe phase for spilling
cases, and the finish dependency will be released.
```
*** SIGABRT unknown detail explain (@0x21c4) received by PID 8644 (TID
10986 OR 0x7f68dc440640) from PID 8644; stack trace: ***
0# doris::signal::(anonymous namespace)::FailureSignalHandler(int,
siginfo_t*, void*) at /root/doris/be/src/common/signal_handler.h:421
1# 0x00007F6CBA242520 in /lib/x86_64-linux-gnu/libc.so.6
2# pthread_kill at ./nptl/pthread_kill.c:89
3# raise at ../sysdeps/posix/raise.c:27
4# abort at ./stdlib/abort.c:81
5# _nl_load_domain at ./intl/loadmsgcat.c:1177
6# 0x00007F6CBA239E96 in /lib/x86_64-linux-gnu/libc.so.6
7# pthread_mutex_lock at ./nptl/pthread_mutex_lock.c:139
8# doris::pipeline::PipelineTask::terminate() at
/root/doris/be/src/pipeline/pipeline_task.cpp:316
9# doris::pipeline::Pipeline::make_all_runnable() in
/mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be
10#
doris::pipeline::PipelineFragmentContext::decrement_running_task(unsigned int)
at /root/doris/be/src/pipeline/pipeline_fragment_context.cpp:1781
11# doris::Defer::~Defer() at /root/doris/be/src/util/defer_op.h:37
12# doris::pipeline::TaskScheduler::_do_work(int) at
/root/doris/be/src/pipeline/task_scheduler.cpp:166
13# doris::ThreadPool::dispatch_thread() at
/root/doris/be/src/util/threadpool.cpp:623
14# doris::Thread::supervise_thread(void*) at
/root/doris/be/src/util/thread.cpp:499
15# start_thread at ./nptl/pthread_create.c:442
16# 0x00007F6CBA326850 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:
```
---
.../pipeline/exec/partitioned_hash_join_probe_operator.cpp | 13 +------------
be/src/pipeline/exec/partitioned_hash_join_probe_operator.h | 2 --
.../pipeline/exec/partitioned_hash_join_sink_operator.cpp | 10 ++++------
be/src/pipeline/exec/partitioned_hash_join_sink_operator.h | 2 ++
.../operator/partitioned_hash_join_probe_operator_test.cpp | 6 +-----
.../operator/partitioned_hash_join_sink_operator_test.cpp | 4 ++++
6 files changed, 12 insertions(+), 25 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 12f2850cf05..90ea463ca1b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -577,14 +577,6 @@ Status
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
return Status::OK();
}
-Status
PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill(
- PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) {
- DCHECK(local_state._shared_state->inner_runtime_state);
- local_state._in_mem_shared_state_sptr =
- std::move(local_state._shared_state->inner_shared_state);
- return Status::OK();
-}
-
Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state)
const {
local_state._shared_state->inner_runtime_state =
RuntimeState::create_unique(
@@ -876,10 +868,7 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
return _revoke_memory(state);
}
} else {
- if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
-
RETURN_IF_ERROR(_setup_internal_operator_for_non_spill(local_state, state));
- }
-
+ DCHECK(local_state._shared_state->inner_runtime_state);
RETURN_IF_ERROR(_inner_probe_operator->push(
local_state._shared_state->inner_runtime_state.get(),
local_state._child_block.get(), local_state._child_eos));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 2e8efef4201..8dad0b8469b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -167,8 +167,6 @@ private:
[[nodiscard]] Status
_setup_internal_operators(PartitionedHashJoinProbeLocalState& local_state,
RuntimeState* state) const;
- [[nodiscard]] Status _setup_internal_operator_for_non_spill(
- PartitionedHashJoinProbeLocalState& local_state, RuntimeState*
state);
bool _should_revoke_memory(RuntimeState* state) const;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index e27b215a911..1c4de355aaa 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -164,12 +164,7 @@ size_t
PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta
}
Dependency* PartitionedHashJoinSinkLocalState::finishdependency() {
- if (auto* tmp_sink_state =
_shared_state->inner_runtime_state->get_sink_local_state()) {
- auto* inner_sink_state =
assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state);
- return inner_sink_state->finishdependency();
- }
- DCHECK(false) << "Should not reach here!";
- return nullptr;
+ return _finish_dependency.get();
}
Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
@@ -197,6 +192,7 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
// therefore, all runtime filters are temporarily disabled.
RETURN_IF_ERROR(inner_sink_state->_runtime_filter_producer_helper->skip_process(
_shared_state->inner_runtime_state.get()));
+ _finish_dependency->set_ready();
}
if (build_block.rows() <= 1) {
@@ -548,6 +544,8 @@ Status
PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState*
RETURN_IF_ERROR(probe_local_state->open(state));
RETURN_IF_ERROR(sink_local_state->open(state));
+ _finish_dependency =
sink_local_state->finishdependency()->shared_from_this();
+
/// Set these two values after all the work is ready.
_shared_state->inner_shared_state = std::move(inner_shared_state);
_shared_state->inner_runtime_state = std::move(inner_runtime_state);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 7bca4da8ce2..cae48462014 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -24,6 +24,7 @@
#include "common/be_mock_util.h"
#include "common/status.h"
#include "operator.h"
+#include "pipeline/dependency.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/join_build_sink_operator.h"
@@ -80,6 +81,7 @@ protected:
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
+ std::shared_ptr<Dependency> _finish_dependency;
RuntimeProfile::Counter* _partition_timer = nullptr;
RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
diff --git
a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
index b9b1a1329df..dbcd41b40ee 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -1129,14 +1129,10 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, Other) {
auto local_state =
_helper.create_probe_local_state(_helper.runtime_state.get(),
probe_operator.get(),
shared_state);
- auto st =
probe_operator->_setup_internal_operator_for_non_spill(*local_state,
-
_helper.runtime_state.get());
- ASSERT_TRUE(st.ok()) << "Setup internal operator failed: " <<
st.to_string();
-
local_state->_shared_state->need_to_spill = true;
ASSERT_FALSE(probe_operator->_should_revoke_memory(_helper.runtime_state.get()));
- st = probe_operator->_revoke_memory(_helper.runtime_state.get());
+ auto st = probe_operator->_revoke_memory(_helper.runtime_state.get());
ASSERT_TRUE(st.ok()) << "Revoke memory failed: " << st.to_string();
}
diff --git
a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
index a9dae776a2d..010cd6ea223 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
@@ -348,6 +348,10 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemory) {
sink_state->_shared_state->inner_runtime_state->emplace_sink_local_state(
0, std::move(inner_sink_local_state));
+ sink_state->_finish_dependency =
+ Dependency::create_shared(sink_operator->operator_id(),
sink_operator->node_id(),
+ "HashJoinBuildFinishDependency", true);
+
// Expect revoke memory to trigger spilling
status = sink_state->revoke_memory(_helper.runtime_state.get(), nullptr);
ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]