This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 87cf2d1fb45 [fix](spill) Duplicate calls to Dependency::set_ready() in
hash join(#37461) (#38399)
87cf2d1fb45 is described below
commit 87cf2d1fb45c1d95772455238946df3f4617f46a
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Jul 29 09:44:48 2024 +0800
[fix](spill) Duplicate calls to Dependency::set_ready() in hash
join(#37461) (#38399)
## Proposed changes
pick #37461
Duplicate calling the function `Dependency::set_ready()` will cause
pipeline tasks to be scheduled incorrectly.
---
be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index fc006764452..3cc3c3a9d0b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -205,7 +205,6 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
VLOG_DEBUG << "query: " << print_id(query_id)
<< " hash probe revoke done, node: " << p.node_id()
<< ", task: " << state->task_id();
- _dependency->set_ready();
return Status::OK();
};
@@ -335,7 +334,6 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
<< ", task id: " << state->task_id();
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
shared_state_sptr->spilled_streams[partition_index].reset();
- _dependency->set_ready();
};
auto exception_catch_func = [read_func, query_id, mem_tracker,
shared_state_holder,
@@ -361,6 +359,7 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
_spill_status_ok = false;
_spill_status = std::move(status);
}
+ _dependency->set_ready();
};
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
@@ -423,8 +422,6 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
}
-
- _dependency->set_ready();
};
auto exception_catch_func = [read_func, mem_tracker, shared_state_holder,
execution_context,
@@ -450,6 +447,7 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
_spill_status_ok = false;
_spill_status = std::move(status);
}
+ _dependency->set_ready();
};
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]