This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 1acd8e9fcb5 [fix](spill) incorrect result of hash join (#34450)
1acd8e9fcb5 is described below
commit 1acd8e9fcb553536780e0328abb62dcdd1fe290c
Author: Jerry Hu <[email protected]>
AuthorDate: Wed May 8 10:05:30 2024 +0800
[fix](spill) incorrect result of hash join (#34450)
---
.../exec/partitioned_hash_join_probe_operator.cpp | 63 +++++++++++-----------
.../exec/partitioned_hash_join_sink_operator.cpp | 6 +--
be/src/vec/core/block.cpp | 1 +
3 files changed, 37 insertions(+), 33 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f617ab21b1d..21134487c2e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -197,7 +197,7 @@ Status
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
execution_context_lock = execution_context.lock();
}
if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query " << print_id(query_id)
+ LOG(INFO) << "query: " << print_id(query_id)
<< " execution_context released, maybe query was
cancelled.";
return;
}
@@ -216,12 +216,11 @@ Status
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
COUNTER_UPDATE(_spill_build_blocks, 1);
}
}
- --_spilling_task_count;
- if (_spilling_task_count == 0) {
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ if (_spilling_task_count.fetch_sub(1) == 1) {
LOG(INFO) << "hash probe " << _parent->id()
<< " revoke memory spill_build_block finish";
- std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
});
@@ -274,7 +273,7 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
execution_context_lock = execution_context.lock();
}
if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query " << print_id(query_id)
+ LOG(INFO) << "query: " << print_id(query_id)
<< " execution_context released, maybe query was
cancelled.";
return;
}
@@ -298,19 +297,16 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
}
}
- --_spilling_task_count;
-
- if (_spilling_task_count == 0) {
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ if (_spilling_task_count.fetch_sub(1) == 1) {
LOG(INFO) << "hash probe " << _parent->id()
<< " revoke memory spill_probe_blocks finish";
- std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
}
});
} else {
- --_spilling_task_count;
- if (_spilling_task_count == 0) {
- std::unique_lock<std::mutex> lock(_spill_lock);
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ if (_spilling_task_count.fetch_sub(1) == 1) {
_dependency->set_ready();
}
}
@@ -365,8 +361,9 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
MonotonicStopWatch submit_timer;
submit_timer.start();
- auto read_func = [this, query_id, mem_tracker, state, &spilled_stream,
&mutable_block,
- shared_state_holder, execution_context, submit_timer] {
+ auto read_func = [this, query_id, mem_tracker, state, spilled_stream =
spilled_stream,
+ &mutable_block, shared_state_holder, execution_context,
submit_timer,
+ partition_index] {
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
std::shared_ptr<TaskExecutionContext> execution_context_lock;
auto shared_state_sptr = shared_state_holder.lock();
@@ -374,7 +371,7 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
execution_context_lock = execution_context.lock();
}
if (!shared_state_sptr || !execution_context_lock ||
state->is_cancelled()) {
- LOG(INFO) << "query " << print_id(query_id)
+ LOG(INFO) << "query: " << print_id(query_id)
<< " execution_context released, maybe query was
cancelled.";
return;
}
@@ -420,9 +417,11 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
}
}
- LOG(INFO) << "recovery data done for partition: " <<
spilled_stream->get_spill_dir();
+ VLOG_DEBUG << "query: " << print_id(state->query_id())
+ << ", recovery data done for partition: " <<
spilled_stream->get_spill_dir()
+ << ", task id: " << state->task_id();
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
- spilled_stream.reset();
+ shared_state_sptr->spilled_streams[partition_index].reset();
_dependency->set_ready();
};
@@ -469,7 +468,7 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
execution_context_lock = execution_context.lock();
}
if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query " << print_id(query_id)
+ LOG(INFO) << "query: " << print_id(query_id)
<< " execution_context released, maybe query was
cancelled.";
return;
}
@@ -493,7 +492,8 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
}
if (eos) {
- LOG(INFO) << "recovery probe data done: " <<
spilled_stream->get_spill_dir();
+ VLOG_DEBUG << "query: " << print_id(query_id)
+ << ", recovery probe data done: " <<
spilled_stream->get_spill_dir();
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
}
@@ -677,9 +677,10 @@ Status
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
partitioned_block.reset();
}
RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._runtime_state.get(),
&block, true));
- LOG(INFO) << "internal build operator finished, node id: " << id()
- << ", task id: " << state->task_id()
- << ", partition: " << local_state._partition_cursor;
+ VLOG_DEBUG << "query: " << print_id(state->query_id())
+ << ", internal build operator finished, node id: " << id()
+ << ", task id: " << state->task_id()
+ << ", partition: " << local_state._partition_cursor;
return Status::OK();
}
@@ -728,6 +729,9 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
if (!has_data) {
vectorized::Block block;
RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state,
&block, true));
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ",
node: " << node_id()
+ << ", task: " << state->task_id() << "partition: "
<< partition_index
+ << " has no data to recovery";
break;
} else {
return Status::OK();
@@ -746,6 +750,9 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
*eos = false;
if (in_mem_eos) {
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: "
<< node_id()
+ << ", task: " << state->task_id()
+ << ", partition: " << local_state._partition_cursor;
local_state._partition_cursor++;
if (local_state._partition_cursor == _partition_count) {
*eos = true;
@@ -770,12 +777,7 @@ bool
PartitionedHashJoinProbeOperatorX::need_more_input_data(RuntimeState* state
}
bool PartitionedHashJoinProbeOperatorX::need_data_from_children(RuntimeState*
state) const {
- auto& local_state = get_local_state(state);
- if (local_state._spilling_task_count != 0) {
- return true;
- }
-
- return JoinProbeOperatorX::need_data_from_children(state);
+ return true;
}
size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState*
state) const {
@@ -822,8 +824,9 @@ Status
PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo
return Status::OK();
}
- LOG(INFO) << "hash probe " << id()
- << " revoke memory, spill task count: " <<
local_state._spilling_task_count;
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe
node: " << id()
+ << ", task: " << state->task_id()
+ << ", revoke memory, spill task count: " <<
local_state._spilling_task_count;
for (uint32_t i = spilling_start; i < _partition_count; ++i) {
RETURN_IF_ERROR(local_state.spill_build_block(state, i));
RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 8924ee6f773..97d5d145604 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -171,7 +171,7 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
for (size_t block_idx = 0; block_idx != build_blocks.size();
++block_idx) {
auto& build_block = build_blocks[block_idx];
- const auto is_last_block = block_idx == build_blocks.size() - 1;
+ const auto is_last_block = (block_idx == (build_blocks.size() -
1));
if (UNLIKELY(build_block.empty())) {
continue;
}
@@ -207,8 +207,6 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
partitions_indexes[partition_idx].clear();
}
- build_block.clear();
-
if (partition_block->rows() >= reserved_size || is_last_block)
{
if (!flush_rows(partition_block, spilling_stream)) {
return;
@@ -217,6 +215,8 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
vectorized::MutableBlock::create_unique(build_block.clone_empty());
}
}
+
+ build_block.clear();
}
_dependency->set_ready();
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index d30eee8fcef..83ecf568d6f 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -978,6 +978,7 @@ void MutableBlock::add_rows(const Block* block, const
uint32_t* row_begin,
DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
auto& dst = _columns[i];
const auto& src = *block_data[i].column.get();
+ DCHECK_GE(src.size(), row_end - row_begin);
dst->insert_indices_from(src, row_begin, row_end);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]