This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_repartition by this push:
new 995aeec3ef9 Make repartitioner level-aware
995aeec3ef9 is described below
commit 995aeec3ef91da9ddaccac5c5e9c2ec1b9b31410
Author: Hu Shenggang <[email protected]>
AuthorDate: Fri Feb 27 18:33:47 2026 +0800
Make repartitioner level-aware
---
.../partitioned_aggregation_source_operator.cpp | 11 ++--
.../exec/partitioned_aggregation_source_operator.h | 3 +-
.../exec/partitioned_hash_join_probe_operator.cpp | 58 +++++-----------------
be/src/pipeline/exec/spill_utils.h | 9 +++-
be/src/vec/runtime/partitioner.cpp | 1 +
be/src/vec/runtime/partitioner.h | 11 ++++
be/src/vec/spill/spill_repartitioner.cpp | 29 ++++++++---
be/src/vec/spill/spill_repartitioner.h | 19 ++++---
8 files changed, 75 insertions(+), 66 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index d6d660b1e8f..02a55bd27c4 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -414,7 +414,8 @@ Status
PartitionedAggLocalState::_repartition_partition(RuntimeState* state,
// Initialize repartitioner with key columns and operator-configured
fanout.
_repartitioner.init_with_key_columns(std::move(key_column_indices),
std::move(key_data_types),
- operator_profile(),
static_cast<int>(p._partition_count));
+ operator_profile(),
static_cast<int>(p._partition_count),
+ new_level);
std::vector<vectorized::SpillStreamSPtr> output_streams;
RETURN_IF_ERROR(SpillRepartitioner::create_output_streams(
@@ -494,7 +495,8 @@ Status
PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
}
Status PartitionedAggLocalState::flush_hash_table_to_sub_streams(
- RuntimeState* state, std::vector<vectorized::SpillStreamSPtr>&
output_streams) {
+ RuntimeState* state, std::vector<vectorized::SpillStreamSPtr>&
output_streams,
+ int repartition_level) {
auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
auto* runtime_state = _runtime_state.get();
@@ -508,7 +510,8 @@ Status
PartitionedAggLocalState::flush_hash_table_to_sub_streams(
}
_repartitioner.init_with_key_columns(std::move(key_column_indices),
std::move(key_data_types),
- operator_profile(),
static_cast<int>(p._partition_count));
+ operator_profile(),
static_cast<int>(p._partition_count),
+ repartition_level);
in_mem_state->aggregate_data_container->init_once();
bool inner_eos = false;
@@ -557,7 +560,7 @@ Status PartitionedAggLocalState::flush_and_repartition(
output_streams, static_cast<int>(p._partition_count)));
// 2. Flush the in-memory hash table into the sub-streams.
- RETURN_IF_ERROR(flush_hash_table_to_sub_streams(state, output_streams));
+ RETURN_IF_ERROR(flush_hash_table_to_sub_streams(state, output_streams,
new_level));
// 3. Repartition remaining unread streams into the same sub-streams.
for (auto& stream : remaining_streams) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 3bdb4892b27..4c43e205e67 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -86,7 +86,8 @@ public:
/// Flush the current in-memory hash table by draining it as blocks and
routing
/// each block through the repartitioner into the output sub-streams.
Status flush_hash_table_to_sub_streams(
- RuntimeState* state, std::vector<vectorized::SpillStreamSPtr>&
output_streams);
+ RuntimeState* state, std::vector<vectorized::SpillStreamSPtr>&
output_streams,
+ int repartition_level);
/// Flush the in-memory hash table into FANOUT sub-streams, repartition
remaining
/// unread streams from `remaining_streams`, and push resulting
sub-partitions into
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 8d087a80e92..6a49fdb6f85 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -167,7 +167,7 @@ Status
PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
// Create a fanout-sized partitioner for repartitioning.
// Use operator-configured partition count instead of static FANOUT.
_fanout_partitioner =
-
std::make_unique<SpillPartitionerType>(static_cast<int>(p._partition_count));
+
std::make_unique<SpillRePartitionerType>(static_cast<int>(p._partition_count));
RETURN_IF_ERROR(_fanout_partitioner->init(p._probe_exprs));
RETURN_IF_ERROR(_fanout_partitioner->prepare(state, p._child->row_desc()));
RETURN_IF_ERROR(_fanout_partitioner->open(state));
@@ -463,7 +463,7 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
std::unique_ptr<vectorized::PartitionerBase> fanout_clone;
RETURN_IF_ERROR(_fanout_partitioner->clone(state, fanout_clone));
_repartitioner.init(std::move(fanout_clone), operator_profile(),
- static_cast<int>(p._partition_count));
+ static_cast<int>(p._partition_count), new_level);
// Repartition build stream
std::vector<vectorized::SpillStreamSPtr> build_output_streams;
@@ -471,6 +471,14 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
state, p.node_id(), fmt::format("hash_build_repart_l{}",
new_level), operator_profile(),
build_output_streams, static_cast<int>(p._partition_count)));
+ // Repartition already-recovered in-memory build data first to avoid extra
+ // write/read I/O through an intermediate spill stream.
+ if (_recovered_build_block && _recovered_build_block->rows() > 0) {
+ auto recovered_block = _recovered_build_block->to_block();
+ _recovered_build_block.reset();
+ RETURN_IF_ERROR(_repartitioner.route_block(state, recovered_block,
build_output_streams));
+ }
+
if (partition.build_stream && partition.build_stream->ready_for_reading())
{
partition.build_stream->set_read_counters(operator_profile());
bool done = false;
@@ -500,7 +508,7 @@ Status
PartitionedHashJoinProbeLocalState::repartition_current_partition(
std::unique_ptr<vectorized::PartitionerBase> probe_fanout_clone;
RETURN_IF_ERROR(_fanout_partitioner->clone(state, probe_fanout_clone));
_repartitioner.init(std::move(probe_fanout_clone), operator_profile(),
- static_cast<int>(p._partition_count));
+ static_cast<int>(p._partition_count), new_level);
bool done = false;
while (!done && !state->is_cancelled()) {
@@ -1036,53 +1044,13 @@ size_t
PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta
// (a) _current_partition.build_stream (SpillStream, may have been
partially read)
// (b) _recovered_build_block (partially-recovered MutableBlock)
//
-// repartition_current_partition expects a single unified SpillStream, so any
-// data already pulled into _recovered_build_block is flushed back to a new
-// stream (draining any unread tail from the original build_stream first).
+// During repartition we route (b) directly into sub-streams first, then
+// continue reading (a), avoiding an extra round of spill write/read for (b).
Status PartitionedHashJoinProbeLocalState::revoke_build_data(RuntimeState*
state) {
auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
DCHECK(_child_eos) << "revoke_build_data should only be called after child
EOS";
DCHECK(_spill_queue_initialized) << "queue must be initialized before
revoke_build_data";
- // If _recovered_build_block has data, flush it (plus any unread tail of
- // build_stream) into a single combined SpillStream so that
- // repartition_current_partition sees a unified build stream.
- if (_recovered_build_block && _recovered_build_block->rows() > 0) {
- vectorized::SpillStreamSPtr combined_stream;
-
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
- state, combined_stream, print_id(state->query_id()),
- fmt::format("hash_build_revoke_l{}", _current_partition.level
+ 1), p.node_id(),
- std::numeric_limits<size_t>::max(), operator_profile()));
-
- // Write the already-recovered portion first.
- auto block = _recovered_build_block->to_block();
- _recovered_build_block.reset();
- RETURN_IF_ERROR(combined_stream->spill_block(state, block,
/*eof=*/false));
-
- // Drain any unread tail from the original build_stream.
- if (_current_partition.build_stream &&
- _current_partition.build_stream->ready_for_reading()) {
-
_current_partition.build_stream->set_read_counters(operator_profile());
- bool eos = false;
- while (!eos && !state->is_cancelled()) {
- vectorized::Block tail_block;
- RETURN_IF_ERROR(
-
_current_partition.build_stream->read_next_block_sync(&tail_block, &eos));
- if (tail_block.rows() > 0) {
- RETURN_IF_ERROR(combined_stream->spill_block(state,
tail_block, /*eof=*/false));
- }
- }
- ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
- _current_partition.build_stream);
- _current_partition.build_stream.reset();
- }
-
- RETURN_IF_ERROR(combined_stream->close());
- _current_partition.build_stream = std::move(combined_stream);
- }
- // If _recovered_build_block is empty but build_stream is still active,
- // repartition_current_partition reads it directly — no extra work needed.
-
VLOG_DEBUG << fmt::format(
"Query:{}, hash join probe:{}, task:{}, revoke_build_data: "
"repartitioning queue partition at level {} (build in
SpillStream)",
diff --git a/be/src/pipeline/exec/spill_utils.h
b/be/src/pipeline/exec/spill_utils.h
index d972524951d..e7a16193c0d 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -35,8 +35,15 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"
+// Default spill partitioner for initial partitioning (level-0). Repartition
+// paths may use different channel-id policies (e.g. raw-hash mode).
using SpillPartitionerType =
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
+// Repartition partitioner: keeps raw hash (no final modulo) so
SpillRepartitioner
+// can apply level-aware hash mixing and channel mapping.
+using SpillRePartitionerType =
+
vectorized::Crc32HashPartitioner<vectorized::SpillRePartitionChannelIds>;
+
struct SpillContext {
std::atomic_int running_tasks_count;
TUniqueId query_id;
@@ -106,4 +113,4 @@ inline void update_profile_from_inner_profile(const
std::string& name,
}
#include "common/compile_check_end.h"
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/vec/runtime/partitioner.cpp
b/be/src/vec/runtime/partitioner.cpp
index 5095c7a7dbb..de5b6542087 100644
--- a/be/src/vec/runtime/partitioner.cpp
+++ b/be/src/vec/runtime/partitioner.cpp
@@ -87,5 +87,6 @@ Status Crc32CHashPartitioner::clone(RuntimeState* state,
template class Crc32HashPartitioner<ShuffleChannelIds>;
template class Crc32HashPartitioner<SpillPartitionChannelIds>;
+template class Crc32HashPartitioner<SpillRePartitionChannelIds>;
} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h
index dfe0e79f988..2b3a28d1f6c 100644
--- a/be/src/vec/runtime/partitioner.h
+++ b/be/src/vec/runtime/partitioner.h
@@ -114,9 +114,20 @@ struct ShuffleChannelIds {
struct SpillPartitionChannelIds {
using HashValType = PartitionerBase::HashValType;
+ // Default spill partition mapping used by level-0 partitioning:
+ // rotate hash bits and apply modulo to get a channel id directly.
HashValType operator()(HashValType l, size_t r) { return ((l >> 16) | (l
<< 16)) % r; }
};
+struct SpillRePartitionChannelIds {
+ using HashValType = PartitionerBase::HashValType;
+
+ // Repartition mode: return the raw hash value without modulo.
+ // The caller (SpillRepartitioner) will apply level-aware hash mixing and
+ // final channel mapping, so repartition behavior can vary by level.
+ HashValType operator()(HashValType l, size_t /*r*/) { return l; }
+};
+
static inline PartitionerBase::HashValType
crc32c_shuffle_mix(PartitionerBase::HashValType h) {
// Step 1: fold high entropy into low bits
h ^= h >> 16;
diff --git a/be/src/vec/spill/spill_repartitioner.cpp
b/be/src/vec/spill/spill_repartitioner.cpp
index 6ef8655c7df..e68ef40e280 100644
--- a/be/src/vec/spill/spill_repartitioner.cpp
+++ b/be/src/vec/spill/spill_repartitioner.cpp
@@ -19,7 +19,6 @@
#include <glog/logging.h>
-#include <algorithm>
#include <limits>
#include <memory>
#include <vector>
@@ -38,22 +37,25 @@ namespace doris::pipeline {
#include "common/compile_check_begin.h"
void SpillRepartitioner::init(std::unique_ptr<vectorized::PartitionerBase>
partitioner,
- RuntimeProfile* profile, int fanout) {
+ RuntimeProfile* profile, int fanout, int
repartition_level) {
_partitioner = std::move(partitioner);
_use_column_index_mode = false;
_fanout = fanout;
+ _repartition_level = repartition_level;
_repartition_timer = ADD_TIMER_WITH_LEVEL(profile, "SpillRepartitionTime",
1);
_repartition_rows = ADD_COUNTER_WITH_LEVEL(profile,
"SpillRepartitionRows", TUnit::UNIT, 1);
}
void SpillRepartitioner::init_with_key_columns(std::vector<size_t>
key_column_indices,
std::vector<vectorized::DataTypePtr> key_data_types,
- RuntimeProfile* profile, int
fanout) {
+ RuntimeProfile* profile, int
fanout,
+ int repartition_level) {
_key_column_indices = std::move(key_column_indices);
_key_data_types = std::move(key_data_types);
_use_column_index_mode = true;
_partitioner.reset();
_fanout = fanout;
+ _repartition_level = repartition_level;
_repartition_timer = ADD_TIMER_WITH_LEVEL(profile, "SpillRepartitionTime",
1);
_repartition_rows = ADD_COUNTER_WITH_LEVEL(profile,
"SpillRepartitionRows", TUnit::UNIT, 1);
}
@@ -155,15 +157,16 @@ Status SpillRepartitioner::_route_block(
RuntimeState* state, vectorized::Block& block,
std::vector<vectorized::SpillStreamSPtr>& output_streams,
std::vector<std::unique_ptr<vectorized::MutableBlock>>&
output_buffers) {
- // Compute partition assignment for every row in the block
+ // Compute raw hash values for every row in the block.
RETURN_IF_ERROR(_partitioner->do_partitioning(state, &block));
- const auto& channel_ids = _partitioner->get_channel_ids();
+ const auto& hash_vals = _partitioner->get_channel_ids();
const auto rows = block.rows();
// Build per-partition row index lists
std::vector<std::vector<uint32_t>> partition_row_indexes(_fanout);
for (uint32_t i = 0; i < rows; ++i) {
- partition_row_indexes[channel_ids[i]].emplace_back(i);
+ auto partition_idx = _map_hash_to_partition(hash_vals[i]);
+ partition_row_indexes[partition_idx].emplace_back(i);
}
// Scatter rows into per-partition buffers
@@ -210,9 +213,9 @@ Status SpillRepartitioner::_route_block_by_columns(
static_cast<uint32_t>(rows));
}
- // Apply SpillPartitionChannelIds: ((hash >> 16) | (hash << 16)) % _fanout
+ // Map hash values to output channels with level-aware mixing.
for (size_t i = 0; i < rows; ++i) {
- hashes[i] = ((hashes[i] >> 16) | (hashes[i] << 16)) % _fanout;
+ hashes[i] = _map_hash_to_partition(hashes[i]);
}
// Build per-partition row index lists
@@ -267,5 +270,15 @@ Status SpillRepartitioner::_flush_all_buffers(
return Status::OK();
}
+uint32_t SpillRepartitioner::_map_hash_to_partition(uint32_t hash) const {
+ DCHECK_GT(_fanout, 0);
+ // Use a level-dependent salt so each repartition level has a different
+ // projection from hash-space to partition-space.
+ constexpr uint32_t LEVEL_SALT_BASE = 0x9E3779B9U;
+ auto salt = static_cast<uint32_t>(_repartition_level + 1) *
LEVEL_SALT_BASE;
+ auto mixed = vectorized::crc32c_shuffle_mix(hash ^ salt);
+ return ((mixed >> 16) | (mixed << 16)) % static_cast<uint32_t>(_fanout);
+}
+
#include "common/compile_check_end.h"
} // namespace doris::pipeline
diff --git a/be/src/vec/spill/spill_repartitioner.h
b/be/src/vec/spill/spill_repartitioner.h
index ab40d9c307b..fd0b5d33ec1 100644
--- a/be/src/vec/spill/spill_repartitioner.h
+++ b/be/src/vec/spill/spill_repartitioner.h
@@ -51,11 +51,11 @@ namespace pipeline {
/// specified column indices. Suitable for Aggregation where spill blocks
have a
/// different schema (key columns at fixed positions 0..N-1).
///
-/// For each level of repartitioning, the partitioner uses the same CRC32 hash
function
-/// on key columns but with the SpillPartitionChannelIds functor (bit rotation
+ modulo).
-/// Since all rows in the input stream already belong to the same parent
partition (they
-/// had identical partition indices at the previous level), re-hashing with
the same
-/// function still produces a valid and well-distributed split into FANOUT
sub-partitions.
+/// For repartitioning, hash computation and final channel mapping are
separated:
+/// - a partitioner can provide either direct channel ids or raw hash values
+/// (e.g. SpillRePartitionChannelIds returns raw hash),
+/// - SpillRepartitioner then applies the final channel mapping strategy.
+/// This keeps repartition policy centralized and allows level-aware mapping.
///
/// Processing is incremental: each call to repartition() processes up to
MAX_BATCH_BYTES
/// (32 MB) of data and then returns, allowing the pipeline scheduler to yield
and
@@ -77,7 +77,7 @@ public:
// @param profile RuntimeProfile for counters.
// @param fanout Number of output sub-partitions. Defaults to 8.
void init(std::unique_ptr<vectorized::PartitionerBase> partitioner,
RuntimeProfile* profile,
- int fanout);
+ int fanout, int repartition_level);
/// Initialize the repartitioner with explicit key column indices (for
Aggregation).
/// Computes CRC32 hash directly on the specified columns without using
VExpr.
@@ -94,7 +94,7 @@ public:
// @param fanout Number of output sub-partitions. Defaults to
8.
void init_with_key_columns(std::vector<size_t> key_column_indices,
std::vector<vectorized::DataTypePtr>
key_data_types,
- RuntimeProfile* profile, int fanout);
+ RuntimeProfile* profile, int fanout, int
repartition_level);
/// Repartition data from input_stream into output_streams.
///
@@ -147,6 +147,8 @@ private:
RuntimeState* state, std::vector<vectorized::SpillStreamSPtr>&
output_streams,
std::vector<std::unique_ptr<vectorized::MutableBlock>>&
output_buffers, bool force);
+ uint32_t _map_hash_to_partition(uint32_t hash) const;
+
// Partitioner mode (used by Hash Join)
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
@@ -159,6 +161,9 @@ private:
RuntimeProfile::Counter* _repartition_rows = nullptr;
// dynamic fanout to allow operator-configured partition counts
int _fanout = 8;
+ // Target repartition level (parent level + 1). Used to derive level-aware
+ // channel mapping from raw hash values.
+ int _repartition_level = 0;
public:
// Accessor for configured fanout
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]