This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_repartition by this push:
new fdb355fd4fc simplify probe code enhancement probe operator
fdb355fd4fc is described below
commit fdb355fd4fc0c09add4e3df85bb14058ccf1ae86
Author: yiguolei <[email protected]>
AuthorDate: Tue Mar 3 14:22:26 2026 +0800
simplify probe code enhancement probe operator
---
.../partitioned_aggregation_source_operator.cpp | 120 +++++-----
.../exec/partitioned_aggregation_source_operator.h | 7 +-
.../exec/partitioned_hash_join_probe_operator.cpp | 241 +++++++--------------
.../exec/partitioned_hash_join_probe_operator.h | 42 +++-
be/src/vec/spill/spill_file.h | 2 +-
5 files changed, 163 insertions(+), 249 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 059ae72e369..28eab411051 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -177,7 +177,7 @@ bool PartitionedAggSourceOperatorX::is_shuffled_operator()
const {
size_t PartitionedAggSourceOperatorX::revocable_mem_size(RuntimeState* state)
const {
auto& local_state = get_local_state(state);
- if (!local_state._shared_state->_is_spilled ||
!local_state._current_partition.has_data()) {
+ if (!local_state._shared_state->_is_spilled ||
!local_state._current_partition.spill_file) {
return 0;
}
@@ -214,9 +214,7 @@ Status
PartitionedAggSourceOperatorX::revoke_memory(RuntimeState* state) {
PrettyPrinter::print_bytes(local_state._estimate_memory_usage));
// Flush hash table + repartition remaining spill files of the current
partition.
- RETURN_IF_ERROR(local_state.flush_and_repartition(state,
-
local_state._current_partition.spill_file,
-
local_state._current_partition.level));
+ RETURN_IF_ERROR(local_state.flush_and_repartition(state));
local_state._current_partition = AggSpillPartitionInfo {};
local_state._need_to_setup_partition = true;
return Status::OK();
@@ -265,19 +263,17 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
VLOG_DEBUG << fmt::format(
"Query:{}, agg source:{}, task:{}, setup partition level:{}, "
- "queue remaining:{}, partition bytes:{}",
+ "queue remaining:{}",
print_id(state->query_id()), node_id(), state->task_id(),
- local_state._current_partition.level,
local_state._partition_queue.size(),
-
PrettyPrinter::print_bytes(local_state._current_partition.total_bytes()));
+ local_state._current_partition.level,
local_state._partition_queue.size());
local_state._need_to_setup_partition = false;
}
// Phase 2: Recover blocks from disk into _blocks (batch of ~8MB).
if (local_state._blocks.empty() &&
local_state._current_partition.spill_file) {
bool has_data = false;
- status = local_state._recover_blocks_from_partition(state,
local_state._current_partition,
- has_data);
- RETURN_IF_ERROR(status);
+ RETURN_IF_ERROR(local_state._recover_blocks_from_partition(
+ state, local_state._current_partition, has_data));
// Return empty block to yield to pipeline scheduler.
// Pipeline task will check memory and call revoke_memory if needed.
*eos = false;
@@ -309,8 +305,7 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
auto* runtime_state = local_state._runtime_state.get();
local_state._shared_state->_in_mem_shared_state->aggregate_data_container->init_once();
bool inner_eos = false;
- status = _agg_source_operator->get_block(runtime_state, block, &inner_eos);
- RETURN_IF_ERROR(status);
+ RETURN_IF_ERROR(_agg_source_operator->get_block(runtime_state, block,
&inner_eos));
if (inner_eos) {
auto* source_local_state =
@@ -318,8 +313,7 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
local_state.update_profile<true>(source_local_state->custom_profile());
// Current partition fully output. Reset hash table, pop next
partition.
- status = _agg_source_operator->reset_hash_table(runtime_state);
- RETURN_IF_ERROR(status);
+ RETURN_IF_ERROR(_agg_source_operator->reset_hash_table(runtime_state));
local_state._current_partition = AggSpillPartitionInfo {};
local_state._estimate_memory_usage = 0;
@@ -340,13 +334,11 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
void PartitionedAggLocalState::_init_partition_queue() {
for (auto& spill_file : _shared_state->_spill_partitions) {
- if (spill_file && spill_file->has_data()) {
- _partition_queue.emplace_back(std::move(spill_file), /*level=*/0);
- // Track metrics: each queued partition counts as one spill at
level 0
- COUNTER_UPDATE(_total_partition_spills, 1);
- _max_partition_level_seen = 0;
- COUNTER_SET(_max_partition_level,
int64_t(_max_partition_level_seen));
- }
+ _partition_queue.emplace_back(std::move(spill_file), /*level=*/0);
+ // Track metrics: each queued partition counts as one spill at level 0
+ COUNTER_UPDATE(_total_partition_spills, 1);
+ _max_partition_level_seen = 0;
+ COUNTER_SET(_max_partition_level, int64_t(_max_partition_level_seen));
}
_shared_state->_spill_partitions.clear();
}
@@ -356,57 +348,42 @@ Status
PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* st
bool&
has_data) {
has_data = false;
size_t accumulated_bytes = 0;
+ if (!partition.spill_file || state->is_cancelled()) {
+ return Status::OK();
+ }
- auto exception_catch_func = [&]() -> Status {
- if (!partition.spill_file || state->is_cancelled()) {
- return Status::OK();
- }
-
- // Create or reuse a persistent reader for this file
- if (!_current_reader) {
- _current_reader = partition.spill_file->create_reader(state,
operator_profile());
- RETURN_IF_ERROR(_current_reader->open());
- }
+ // Create or reuse a persistent reader for this file
+ if (!_current_reader) {
+ _current_reader = partition.spill_file->create_reader(state,
operator_profile());
+ RETURN_IF_ERROR(_current_reader->open());
+ }
- bool eos = false;
+ bool eos = false;
- while (!eos && !state->is_cancelled()) {
- vectorized::Block block;
-
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data", {
- return Status::Error<INTERNAL_ERROR>(
- "fault_inject partitioned_agg_source
recover_spill_data failed");
- });
- RETURN_IF_ERROR(_current_reader->read(&block, &eos));
+ while (!eos && !state->is_cancelled()) {
+ vectorized::Block block;
+
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data", {
+ return Status::Error<INTERNAL_ERROR>(
+ "fault_inject partitioned_agg_source recover_spill_data
failed");
+ });
+ RETURN_IF_ERROR(_current_reader->read(&block, &eos));
- if (!block.empty()) {
- has_data = true;
- accumulated_bytes += block.allocated_bytes();
- _blocks.emplace_back(std::move(block));
+ if (!block.empty()) {
+ has_data = true;
+ accumulated_bytes += block.allocated_bytes();
+ _blocks.emplace_back(std::move(block));
- if (accumulated_bytes >= state->spill_buffer_size_bytes()) {
- return Status::OK();
- }
+ if (accumulated_bytes >= state->spill_buffer_size_bytes()) {
+ return Status::OK();
}
}
+ }
- if (eos) {
- _current_reader.reset();
- partition.spill_file.reset();
- }
- return Status::OK();
- };
-
- DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::submit_func", {
- return Status::Error<INTERNAL_ERROR>(
- "fault_inject partitioned_agg_source submit_func failed");
- });
-
- auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return
exception_catch_func(); }); }();
- LOG_IF(WARNING, !status.ok()) << fmt::format(
- "Query:{}, agg source:{}, task:{}, recover exception:{}",
print_id(state->query_id()),
- _parent->node_id(), state->task_id(), status.to_string());
-
- return status;
+ if (eos) {
+ _current_reader.reset();
+ partition.spill_file.reset();
+ }
+ return Status::OK();
}
Status PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
@@ -461,10 +438,9 @@ Status
PartitionedAggLocalState::flush_hash_table_to_sub_spill_files(RuntimeStat
return Status::OK();
}
-Status PartitionedAggLocalState::flush_and_repartition(
- RuntimeState* state, vectorized::SpillFileSPtr& remaining_spill_file,
int level) {
+Status PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) {
auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
- const int new_level = level + 1;
+ const int new_level = _current_partition.level + 1;
if (new_level >= p._repartition_max_depth) {
return Status::InternalError(
@@ -476,7 +452,8 @@ Status PartitionedAggLocalState::flush_and_repartition(
VLOG_DEBUG << fmt::format(
"Query:{}, agg source:{}, task:{}, flush_and_repartition: "
"flushing hash table and repartitioning remaining spill file at
level {} -> {}",
- print_id(state->query_id()), p.node_id(), state->task_id(), level,
new_level);
+ print_id(state->query_id()), p.node_id(), state->task_id(),
_current_partition.level,
+ new_level);
{
auto* source_local_state =
@@ -531,15 +508,16 @@ Status PartitionedAggLocalState::flush_and_repartition(
RETURN_IF_ERROR(_repartitioner.repartition(state, _current_reader,
&done));
}
// reader is reset by repartitioner on completion
- } else if (remaining_spill_file) {
+ } else if (_current_partition.spill_file) {
// No partial read — repartition the entire file from scratch.
bool done = false;
while (!done && !state->is_cancelled()) {
- RETURN_IF_ERROR(_repartitioner.repartition(state,
remaining_spill_file, &done));
+ RETURN_IF_ERROR(
+ _repartitioner.repartition(state,
_current_partition.spill_file, &done));
}
}
_current_reader.reset();
- remaining_spill_file.reset();
+ _current_partition.spill_file.reset();
RETURN_IF_ERROR(_repartitioner.finalize());
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 1647c2db19e..2ce67cbf5f8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -47,10 +47,6 @@ struct AggSpillPartitionInfo {
AggSpillPartitionInfo() = default;
AggSpillPartitionInfo(vectorized::SpillFileSPtr s, int lvl)
: spill_file(std::move(s)), level(lvl) {}
-
- bool has_data() const { return spill_file &&
spill_file->get_written_bytes() > 0; }
-
- int64_t total_bytes() const { return spill_file ?
spill_file->get_written_bytes() : 0; }
};
class PartitionedAggLocalState MOCK_REMOVE(final)
@@ -81,8 +77,7 @@ public:
/// unread spill files from `remaining_spill_files`, and push resulting
sub-partitions into
/// `_partition_queue`. After this call the hash table is reset and
/// `remaining_spill_files` is cleared.
- Status flush_and_repartition(RuntimeState* state,
- vectorized::SpillFileSPtr&
remaining_spill_file, int level);
+ Status flush_and_repartition(RuntimeState* state);
private:
friend class PartitionedAggSourceOperatorX;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 6db83c92638..f5949ab6436 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -220,19 +220,13 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
"fault_inject partitioned_hash_join_probe "
"spill_probe_blocks canceled");
});
- size_t not_revoked_size = 0;
auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
for (uint32_t partition_index = 0; partition_index != p._partition_count;
++partition_index) {
auto& blocks = _probe_blocks[partition_index];
auto& partitioned_block = _partitioned_blocks[partition_index];
- if (partitioned_block) {
- const auto size = partitioned_block->allocated_bytes();
- if (size >= vectorized::SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
- blocks.emplace_back(partitioned_block->to_block());
- partitioned_block.reset();
- } else {
- not_revoked_size += size;
- }
+ if (partitioned_block && !partitioned_block->empty()) {
+ blocks.emplace_back(partitioned_block->to_block());
+ partitioned_block.reset();
}
if (blocks.empty()) {
@@ -272,8 +266,6 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
}
}
- COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size));
-
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" spill_probe_blocks done",
@@ -309,15 +301,9 @@ bool PartitionedHashJoinProbeLocalState::is_blockable()
const {
}
Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
- RuntimeState* state, JoinSpillPartitionInfo& partition_info,
- bool& recovered_data_available) {
- // recovered_data_available signals the caller that this synchronous
- // recovery call produced (or attempted to produce) at least one batch of
- // recovered build data stored in _recovered_build_block. Callers should
- // yield and consume the recovered data before continuing further
- // processing for this partition.
- recovered_data_available = false;
+ RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
if (!partition_info.build_file) {
+ // Build file is already exhausted for this partition.
return Status::OK();
}
SCOPED_TIMER(_recovery_build_timer);
@@ -348,27 +334,20 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
RETURN_IF_ERROR(_recovered_build_block->merge(std::move(block)));
}
if (_recovered_build_block->allocated_bytes() >=
state->spill_buffer_size_bytes()) {
- recovered_data_available = true;
return Status::OK(); // yield — buffer full, more data may remain
}
}
- // File exhausted
+ // Build file fully consumed.
RETURN_IF_ERROR(_current_build_reader->close());
_current_build_reader.reset();
partition_info.build_file.reset();
- recovered_data_available = true;
return Status::OK();
}
Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
- RuntimeState* state, JoinSpillPartitionInfo& partition_info,
- bool& recovered_data_available) {
- // recovered_data_available: this call performs synchronous reads of probe
- // blocks into _queue_probe_blocks up to a batch threshold and sets this
- // flag to true when data is available. Caller should return/yield and
- // then consume the recovered probe blocks before continuing.
- recovered_data_available = false;
+ RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
if (!partition_info.probe_file) {
+ // Probe file is already exhausted for this partition.
return Status::OK();
}
@@ -391,15 +370,13 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
_queue_probe_blocks.emplace_back(std::move(block));
}
if (read_size >= state->spill_buffer_size_bytes()) {
- recovered_data_available = true;
return Status::OK(); // yield — enough data read
}
}
- // File exhausted
+ // Probe file fully consumed.
RETURN_IF_ERROR(_current_probe_reader->close());
_current_probe_reader.reset();
partition_info.probe_file.reset();
- recovered_data_available = true;
return Status::OK();
}
@@ -454,17 +431,17 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
RETURN_IF_ERROR(_repartitioner.repartition(state,
_current_build_reader, &done));
}
// reader is reset by repartitioner on completion
- } else if (partition.build_file &&
partition.build_file->ready_for_reading()) {
+ } else if (partition.build_file) {
// No partial read — repartition the entire file from scratch.
bool done = false;
while (!done && !state->is_cancelled()) {
RETURN_IF_ERROR(_repartitioner.repartition(state,
partition.build_file, &done));
}
}
+ RETURN_IF_ERROR(_repartitioner.finalize());
_recovered_build_block.reset();
- partition.build_file.reset();
_current_build_reader.reset(); // clear any leftover reader state
- RETURN_IF_ERROR(_repartitioner.finalize());
+ partition.build_file.reset();
// Repartition probe files
std::vector<vectorized::SpillFileSPtr> probe_output_spill_files;
@@ -481,19 +458,19 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
RETURN_IF_ERROR(_repartitioner.setup_output(state,
probe_output_spill_files));
- if (partition.probe_file->ready_for_reading()) {
- bool done = false;
- while (!done && !state->is_cancelled()) {
- RETURN_IF_ERROR(_repartitioner.repartition(state,
partition.probe_file, &done));
- }
- partition.probe_file.reset();
+ bool done = false;
+ while (!done && !state->is_cancelled()) {
+ RETURN_IF_ERROR(_repartitioner.repartition(state,
partition.probe_file, &done));
}
- _current_probe_reader.reset();
+ partition.probe_file.reset();
+
RETURN_IF_ERROR(_repartitioner.finalize());
+ _current_probe_reader.reset();
}
// Push all sub-partitions into work queue; build/probe emptiness is
handled
- // later during recovery.
+ // later during recovery. New sub-partitions start with build_finished =
+ // probe_finished = false (via constructor).
for (int i = 0; i < static_cast<int>(p._partition_count); ++i) {
_spill_partition_queue.emplace_back(std::move(build_output_spill_files[i]),
std::move(probe_output_spill_files[i]), new_level);
@@ -595,6 +572,7 @@ Status
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(input_block,
partition_indexes[i].data(),
partition_indexes[i].data() + count));
+ // Has to add min batch size check, or there will be too many small
blocks during read blocks from file and do probe phase.
if (partitioned_blocks[i]->rows() > 0 &&
(eos || partitioned_blocks[i]->allocated_bytes() >=
vectorized::SpillFile::MIN_SPILL_WRITE_BATCH_MEM))
{
@@ -669,6 +647,7 @@ Status
PartitionedHashJoinProbeOperatorX::_setup_internal_operators_from_partiti
RETURN_IF_ERROR(_inner_sink_operator->sink(
local_state._shared_state->_inner_runtime_state.get(), &block,
true));
+ local_state._current_partition.build_finished = true;
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" internal build from partition (level:{}) finished, rows:{},
memory usage:{}",
@@ -688,39 +667,19 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
// (including the original "level-0" ones) is accessed uniformly via the
queue.
if (!local_state._spill_queue_initialized) {
DCHECK(local_state._child_eos) << "pull() with is_spilled=true called
before child EOS";
+ // There maybe some blocks still in partitioned block or probe blocks.
Flush them to disk.
+ RETURN_IF_ERROR(local_state.spill_probe_blocks(state));
+ // Close all probe writers so that SpillFile metadata (part_count,
etc.)
+ // is finalized and the files become readable. Without this the readers
+ // would see _part_count == 0 and return no data.
+ for (auto& writer : local_state._probe_writers) {
+ if (writer) {
+ RETURN_IF_ERROR(writer->close());
+ }
+ }
for (uint32_t i = 0; i < _partition_count; ++i) {
auto& build_file =
local_state._shared_state->_spilled_build_groups[i];
auto& probe_file = local_state._probe_spilling_groups[i];
- auto& probe_writer = local_state._probe_writers[i];
-
- // Flush any remaining in-memory probe blocks for this partition.
- auto& partitioned_block = local_state._partitioned_blocks[i];
- if (partitioned_block && !partitioned_block->empty()) {
-
local_state._probe_blocks[i].emplace_back(partitioned_block->to_block());
- partitioned_block.reset();
- }
- for (auto& pb : local_state._probe_blocks[i]) {
- if (pb.empty()) continue;
- // Lazy-create SpillFile + Writer for this probe partition
- if (!probe_writer) {
- auto relative_path = fmt::format(
- "{}/{}-{}-{}-{}", print_id(state->query_id()),
"hash_probe", node_id(),
- state->task_id(),
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
-
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
- relative_path, probe_file));
- RETURN_IF_ERROR(probe_file->create_writer(state,
local_state.operator_profile(),
- probe_writer));
- }
- RETURN_IF_ERROR(probe_writer->write_block(state, pb));
- }
- local_state._probe_blocks[i].clear();
-
- // Close probe writer to finalize probe SpillFile for reading.
- if (probe_writer) {
- RETURN_IF_ERROR(probe_writer->close());
- probe_writer.reset();
- }
-
// Transfer SpillFiles into JoinSpillPartitionInfo unconditionally.
local_state._spill_partition_queue.emplace_back(std::move(build_file),
std::move(probe_file), 0);
@@ -746,36 +705,23 @@ Status
PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
*eos = false;
if (local_state._need_to_setup_queue_partition) {
- // If the queue is empty AND we don't already have a current
- // partition or recovered build data pending, we're done. It's
- // possible the queue was popped on a previous scheduling and the
- // recovered data is waiting to be processed; in that case we must
- // not return EOS here.
+ // No more partitions to process and no active partition — EOS.
if (local_state._spill_partition_queue.empty() &&
- local_state._current_partition.build_exhausted() &&
- !(local_state._recovered_build_block &&
- local_state._recovered_build_block->rows() > 0)) {
+ (!local_state._current_partition.is_valid() ||
+ local_state._current_partition.probe_finished)) {
*eos = true;
return Status::OK();
}
- // Pop next partition to process
- // Invariant: we only pop a new queue partition when
`_current_partition`
- // is exhausted and we have not already recovered build data for the
- // current partition. `build_file` being null is used as a sentinel
- // for whether the partition's build data has been fully consumed.
- // After reading all build data into `_recovered_build_block`, the
- // file is reset. In that state we must NOT pop the next queued
partition
- // — we still need to setup internal operators from the recovered data.
- if (local_state._current_partition.build_exhausted() &&
- !local_state._recovered_build_block) {
- DCHECK(!local_state._spill_partition_queue.empty());
-
+ // Pop next partition to process.
+ // Invariant: we only pop when there is no active current partition and
+ // no pending recovered build data waiting to be consumed.
+ if (!local_state._current_partition.is_valid() ||
+ local_state._current_partition.probe_finished) {
local_state._current_partition =
std::move(local_state._spill_partition_queue.front());
local_state._spill_partition_queue.pop_front();
local_state._recovered_build_block.reset();
local_state._queue_probe_blocks.clear();
-
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
" processing queue partition at level:{}, queue
remaining:{}",
@@ -784,61 +730,31 @@ Status
PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
local_state._spill_partition_queue.size());
}
- // If we've already recovered build data into `_recovered_build_block`
- // (from a previous scheduling), we must only perform the internal
- // operator setup once we've recovered *all* build data for this
- // partition. The recovery logic resets build_file when all data has
- // been read. Only when build_file is null may we safely build
- // the hash table from the accumulated `_recovered_build_block`.
- //
- // Note: if `_recovered_build_block` contains rows but build_file
- // is non-null, more build data may still arrive; continue recovery
- // instead of starting the build now.
- if (local_state._recovered_build_block &&
local_state._recovered_build_block->rows() > 0 &&
- local_state._current_partition.build_exhausted()) {
-
RETURN_IF_ERROR(_setup_internal_operators_from_partition(local_state, state));
- local_state._need_to_setup_queue_partition = false;
- } else {
- // Recover build data from the partition's build stream
- bool recovered_data_available = false;
- RETURN_IF_ERROR(local_state.recover_build_blocks_from_partition(
- state, local_state._current_partition,
recovered_data_available));
-
- if (local_state._current_partition.build_exhausted()) {
- // Build stream is exhausted (possibly empty from the
beginning),
- // setup inner operators now and continue to probe phase.
-
RETURN_IF_ERROR(_setup_internal_operators_from_partition(local_state, state));
- local_state._need_to_setup_queue_partition = false;
- return Status::OK();
- }
-
- if (recovered_data_available) {
- // We read some build data — yield so it can be consumed in
the next
- // scheduling of this operator.
- return Status::OK();
- }
-
- // All build data recovered. Yield once so the pipeline scheduler
can
- // re-evaluate reservations based on the recovered data before we
- // actually build the hash table. On the next scheduling of this
- // operator we will fall through and call
- // `_setup_internal_operators_from_partition`.
- return Status::OK();
+ // Continue recovering build data while there is unread build file.
+ if (local_state._current_partition.build_file) {
+ // Partially read build data — yield so it can be consumed
+ // before continuing recovery in the next scheduling slice.
+ return local_state.recover_build_blocks_from_partition(state,
+
local_state._current_partition);
}
+ RETURN_IF_ERROR(_setup_internal_operators_from_partition(local_state,
state));
+ local_state._current_partition.build_finished = true;
+ local_state._need_to_setup_queue_partition = false;
+ return Status::OK();
}
// Probe phase: feed probe blocks from the current partition's probe stream
+ // into the inner probe operator.
bool in_mem_eos = false;
auto* runtime_state =
local_state._shared_state->_inner_runtime_state.get();
auto& probe_blocks = local_state._queue_probe_blocks;
while (_inner_probe_operator->need_more_input_data(runtime_state)) {
if (probe_blocks.empty()) {
- bool recovered_data_available = false;
- RETURN_IF_ERROR(local_state.recover_probe_blocks_from_partition(
- state, local_state._current_partition,
recovered_data_available));
- if (!recovered_data_available) {
- // No more probe data — send eos to inner probe
+ // Try to recover more probe blocks. If the probe stream is
+ // finished (probe_file == nullptr) and no blocks are buffered,
+ // we send EOS to the inner probe operator.
+ if (!local_state._current_partition.probe_file) {
vectorized::Block block;
RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state,
&block, true));
VLOG_DEBUG << fmt::format(
@@ -847,9 +763,12 @@ Status
PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
print_id(state->query_id()), node_id(),
state->task_id(),
local_state._current_partition.level);
break;
- } else {
- return Status::OK();
}
+
+ // Probe data recovered — yield to let the pipeline scheduler
+ // re-schedule us so we can push the recovered blocks.
+ return local_state.recover_probe_blocks_from_partition(state,
+
local_state._current_partition);
}
auto block = std::move(probe_blocks.back());
@@ -858,9 +777,7 @@ Status
PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block,
false));
}
}
-
RETURN_IF_ERROR(_inner_probe_operator->pull(runtime_state, output_block,
&in_mem_eos));
-
if (in_mem_eos) {
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{},"
@@ -868,8 +785,10 @@ Status
PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
print_id(state->query_id()), node_id(), state->task_id(),
local_state._current_partition.level);
local_state.update_profile_from_inner();
+ local_state._current_partition.probe_finished = true;
- // Reset for next queue entry
+ // Reset for next queue entry — default-constructed partition has
+ // is_valid() == false, signaling "no partition in progress".
local_state._current_partition = JoinSpillPartitionInfo {};
local_state._need_to_setup_queue_partition = true;
local_state._queue_probe_blocks.clear();
@@ -900,6 +819,15 @@ bool
PartitionedHashJoinProbeOperatorX::need_more_input_data(RuntimeState* state
// This is the memory that can be freed if we choose to revoke and repartition
the current
size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState*
state) const {
auto& local_state = get_local_state(state);
+ if (!local_state._shared_state->_is_spilled) {
+ return 0;
+ }
+ if (!local_state._current_partition.is_valid() ||
+ local_state._current_partition.build_finished) {
+ // No active partition — no revocable memory.
+ // Or if current partition has finished build hash table.
+ return 0;
+ }
size_t mem_size = 0;
auto& probe_blocks = local_state._probe_blocks;
for (uint32_t i = 0; i < _partition_count; ++i) {
@@ -945,13 +873,8 @@ size_t
PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta
// Baseline reservation is one block of spill I/O.
size_t size_to_reserve = state->minimum_operator_memory_required_bytes();
- // Queue path: _need_to_setup_queue_partition is true AND recovered build
- // data is fully available (build_file exhausted), meaning the next pull()
- // will call _setup_internal_operators_from_partition.
- const bool about_to_build =
- local_state._need_to_setup_queue_partition &&
local_state._recovered_build_block &&
- local_state._recovered_build_block->rows() > 0 &&
- local_state._current_partition.build_exhausted(); // file
exhausted → ready to build
+ const bool about_to_build = local_state._current_partition.is_valid() &&
+ !local_state._current_partition.build_finished;
if (about_to_build) {
// Estimate rows that will land in the hash table so we can reserve
@@ -1028,7 +951,10 @@ Status
PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
// Probe-data accumulation phase: spill in-memory probe blocks to disk.
return local_state.spill_probe_blocks(state);
}
-
+ if (!local_state._current_partition.is_valid() ||
+ local_state._current_partition.build_finished) {
+ return Status::OK();
+ }
// Recovery/build phase: repartition the current partition's in-memory
build
// data so the hash table build can be deferred to a smaller sub-partition.
return local_state.revoke_build_data(state);
@@ -1039,17 +965,6 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
*eos = false;
auto& local_state = get_local_state(state);
const auto is_spilled = local_state._shared_state->_is_spilled;
-#ifndef NDEBUG
- Defer eos_check_defer([&] {
- if (*eos) {
- LOG(INFO) << fmt::format(
- "Query:{}, hash join probe:{}, task:{}, child eos:{}, need
spill:{}",
- print_id(state->query_id()), node_id(), state->task_id(),
- local_state._child_eos, is_spilled);
- }
- });
-#endif
-
Defer defer([&]() {
COUNTER_SET(local_state._memory_usage_reserved,
int64_t(local_state.estimate_memory_usage()));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index b911b5c6983..a375fe1d1f7 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -45,18 +45,46 @@ class PartitionedHashJoinProbeOperatorX;
/// during recovery. For multi-level spill, when a partition is too large to
fit in
/// memory, it gets repartitioned into FANOUT sub-partitions, each represented
by a
/// new JoinSpillPartitionInfo at level + 1.
+///
+/// Lifecycle of partition progress:
+/// build_file == nullptr:
+/// - all build-side spill data has been read from disk for this partition
+/// probe_file == nullptr:
+/// - all probe-side spill data has been read from disk for this partition
+/// build_finished = true:
+/// - build side has completed hash table construction
+/// probe_finished = true:
+/// - probe side has completed probing all rows for this partition
+///
+/// A default-constructed instance has is_valid() == false, representing "no
partition".
+/// New sub-partitions created by repartitioning start with both flags = false
and
+/// initialized = true.
struct JoinSpillPartitionInfo {
+ // build_file == nullptr means all build data has been read from disk.
vectorized::SpillFileSPtr build_file;
+ // probe_file == nullptr means all probe data has been read from disk.
vectorized::SpillFileSPtr probe_file;
int level = 0; // 0 = original level-0 partition, 1+ = repartitioned
sub-partition
+ // Read all build data from disk and finished building the hash table.
+ bool build_finished = false;
+ // Read all probe data from disk and probed all rows against the hash
table.
+ bool probe_finished = false;
+ // Whether this struct currently represents an active queue partition.
+ bool initialized = false;
+
JoinSpillPartitionInfo() = default;
JoinSpillPartitionInfo(vectorized::SpillFileSPtr build,
vectorized::SpillFileSPtr probe,
int lvl)
- : build_file(std::move(build)), probe_file(std::move(probe)),
level(lvl) {}
-
- bool build_exhausted() const { return !build_file; }
- bool probe_exhausted() const { return !probe_file; }
+ : build_file(std::move(build)),
+ probe_file(std::move(probe)),
+ level(lvl),
+ initialized(true) {}
+
+ /// Returns true if this struct currently represents an active partition
entry
+ /// from the spill queue. A default-constructed partition is "invalid" and
+ /// serves as a sentinel meaning "no partition is being processed".
+ bool is_valid() const { return initialized; }
};
class PartitionedHashJoinProbeLocalState MOCK_REMOVE(final)
@@ -85,12 +113,10 @@ public:
/// Recover build blocks from a JoinSpillPartitionInfo's build stream (for
multi-level recovery).
Status recover_build_blocks_from_partition(RuntimeState* state,
- JoinSpillPartitionInfo&
partition_info,
- bool& recovered_data_available);
+ JoinSpillPartitionInfo&
partition_info);
/// Recover probe blocks from a JoinSpillPartitionInfo's probe stream (for
multi-level recovery).
Status recover_probe_blocks_from_partition(RuntimeState* state,
- JoinSpillPartitionInfo&
partition_info,
- bool& recovered_data_available);
+ JoinSpillPartitionInfo&
partition_info);
/// Repartition the current partition's build and probe streams into
FANOUT sub-partitions
/// and push them into _spill_partition_queue for subsequent processing.
diff --git a/be/src/vec/spill/spill_file.h b/be/src/vec/spill/spill_file.h
index bfce76d1031..9cfbd210ddb 100644
--- a/be/src/vec/spill/spill_file.h
+++ b/be/src/vec/spill/spill_file.h
@@ -58,7 +58,7 @@ using SpillFileReaderUPtr = std::unique_ptr<SpillFileReader>;
class SpillFile {
public:
// to avoid too many small file writes
- static constexpr size_t MIN_SPILL_WRITE_BATCH_MEM = 32 * 1024;
+ static constexpr size_t MIN_SPILL_WRITE_BATCH_MEM = 512 * 1024;
static constexpr size_t MAX_SPILL_WRITE_BATCH_MEM = 32 * 1024 * 1024;
/// @param data_dir The spill storage directory (disk) selected by
SpillFileManager.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]