This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_repartition by this push:
     new 995aeec3ef9 Make repartitioner level-aware
995aeec3ef9 is described below

commit 995aeec3ef91da9ddaccac5c5e9c2ec1b9b31410
Author: Hu Shenggang <[email protected]>
AuthorDate: Fri Feb 27 18:33:47 2026 +0800

    Make repartitioner level-aware
---
 .../partitioned_aggregation_source_operator.cpp    | 11 ++--
 .../exec/partitioned_aggregation_source_operator.h |  3 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  | 58 +++++-----------------
 be/src/pipeline/exec/spill_utils.h                 |  9 +++-
 be/src/vec/runtime/partitioner.cpp                 |  1 +
 be/src/vec/runtime/partitioner.h                   | 11 ++++
 be/src/vec/spill/spill_repartitioner.cpp           | 29 ++++++++---
 be/src/vec/spill/spill_repartitioner.h             | 19 ++++---
 8 files changed, 75 insertions(+), 66 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index d6d660b1e8f..02a55bd27c4 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -414,7 +414,8 @@ Status 
PartitionedAggLocalState::_repartition_partition(RuntimeState* state,
 
     // Initialize repartitioner with key columns and operator-configured 
fanout.
     _repartitioner.init_with_key_columns(std::move(key_column_indices), 
std::move(key_data_types),
-                                         operator_profile(), 
static_cast<int>(p._partition_count));
+                                         operator_profile(), 
static_cast<int>(p._partition_count),
+                                         new_level);
 
     std::vector<vectorized::SpillStreamSPtr> output_streams;
     RETURN_IF_ERROR(SpillRepartitioner::create_output_streams(
@@ -494,7 +495,8 @@ Status 
PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
 }
 
 Status PartitionedAggLocalState::flush_hash_table_to_sub_streams(
-        RuntimeState* state, std::vector<vectorized::SpillStreamSPtr>& 
output_streams) {
+        RuntimeState* state, std::vector<vectorized::SpillStreamSPtr>& 
output_streams,
+        int repartition_level) {
     auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
     auto* runtime_state = _runtime_state.get();
 
@@ -508,7 +510,8 @@ Status 
PartitionedAggLocalState::flush_hash_table_to_sub_streams(
     }
 
     _repartitioner.init_with_key_columns(std::move(key_column_indices), 
std::move(key_data_types),
-                                         operator_profile(), 
static_cast<int>(p._partition_count));
+                                         operator_profile(), 
static_cast<int>(p._partition_count),
+                                         repartition_level);
 
     in_mem_state->aggregate_data_container->init_once();
     bool inner_eos = false;
@@ -557,7 +560,7 @@ Status PartitionedAggLocalState::flush_and_repartition(
             output_streams, static_cast<int>(p._partition_count)));
 
     // 2. Flush the in-memory hash table into the sub-streams.
-    RETURN_IF_ERROR(flush_hash_table_to_sub_streams(state, output_streams));
+    RETURN_IF_ERROR(flush_hash_table_to_sub_streams(state, output_streams, 
new_level));
 
     // 3. Repartition remaining unread streams into the same sub-streams.
     for (auto& stream : remaining_streams) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 3bdb4892b27..4c43e205e67 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -86,7 +86,8 @@ public:
     /// Flush the current in-memory hash table by draining it as blocks and 
routing
     /// each block through the repartitioner into the output sub-streams.
     Status flush_hash_table_to_sub_streams(
-            RuntimeState* state, std::vector<vectorized::SpillStreamSPtr>& 
output_streams);
+            RuntimeState* state, std::vector<vectorized::SpillStreamSPtr>& 
output_streams,
+            int repartition_level);
 
     /// Flush the in-memory hash table into FANOUT sub-streams, repartition 
remaining
     /// unread streams from `remaining_streams`, and push resulting 
sub-partitions into
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 8d087a80e92..6a49fdb6f85 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -167,7 +167,7 @@ Status 
PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
     // Create a fanout-sized partitioner for repartitioning.
     // Use operator-configured partition count instead of static FANOUT.
     _fanout_partitioner =
-            
std::make_unique<SpillPartitionerType>(static_cast<int>(p._partition_count));
+            
std::make_unique<SpillRePartitionerType>(static_cast<int>(p._partition_count));
     RETURN_IF_ERROR(_fanout_partitioner->init(p._probe_exprs));
     RETURN_IF_ERROR(_fanout_partitioner->prepare(state, p._child->row_desc()));
     RETURN_IF_ERROR(_fanout_partitioner->open(state));
@@ -463,7 +463,7 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
     std::unique_ptr<vectorized::PartitionerBase> fanout_clone;
     RETURN_IF_ERROR(_fanout_partitioner->clone(state, fanout_clone));
     _repartitioner.init(std::move(fanout_clone), operator_profile(),
-                        static_cast<int>(p._partition_count));
+                        static_cast<int>(p._partition_count), new_level);
 
     // Repartition build stream
     std::vector<vectorized::SpillStreamSPtr> build_output_streams;
@@ -471,6 +471,14 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
             state, p.node_id(), fmt::format("hash_build_repart_l{}", 
new_level), operator_profile(),
             build_output_streams, static_cast<int>(p._partition_count)));
 
+    // Repartition already-recovered in-memory build data first to avoid extra
+    // write/read I/O through an intermediate spill stream.
+    if (_recovered_build_block && _recovered_build_block->rows() > 0) {
+        auto recovered_block = _recovered_build_block->to_block();
+        _recovered_build_block.reset();
+        RETURN_IF_ERROR(_repartitioner.route_block(state, recovered_block, 
build_output_streams));
+    }
+
     if (partition.build_stream && partition.build_stream->ready_for_reading()) 
{
         partition.build_stream->set_read_counters(operator_profile());
         bool done = false;
@@ -500,7 +508,7 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
         std::unique_ptr<vectorized::PartitionerBase> probe_fanout_clone;
         RETURN_IF_ERROR(_fanout_partitioner->clone(state, probe_fanout_clone));
         _repartitioner.init(std::move(probe_fanout_clone), operator_profile(),
-                            static_cast<int>(p._partition_count));
+                            static_cast<int>(p._partition_count), new_level);
 
         bool done = false;
         while (!done && !state->is_cancelled()) {
@@ -1036,53 +1044,13 @@ size_t 
PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta
 //   (a) _current_partition.build_stream  (SpillStream, may have been 
partially read)
 //   (b) _recovered_build_block           (partially-recovered MutableBlock)
 //
-// repartition_current_partition expects a single unified SpillStream, so any
-// data already pulled into _recovered_build_block is flushed back to a new
-// stream (draining any unread tail from the original build_stream first).
+// During repartition we route (b) directly into sub-streams first, then
+// continue reading (a), avoiding an extra round of spill write/read for (b).
 Status PartitionedHashJoinProbeLocalState::revoke_build_data(RuntimeState* 
state) {
     auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
     DCHECK(_child_eos) << "revoke_build_data should only be called after child 
EOS";
     DCHECK(_spill_queue_initialized) << "queue must be initialized before 
revoke_build_data";
 
-    // If _recovered_build_block has data, flush it (plus any unread tail of
-    // build_stream) into a single combined SpillStream so that
-    // repartition_current_partition sees a unified build stream.
-    if (_recovered_build_block && _recovered_build_block->rows() > 0) {
-        vectorized::SpillStreamSPtr combined_stream;
-        
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
-                state, combined_stream, print_id(state->query_id()),
-                fmt::format("hash_build_revoke_l{}", _current_partition.level 
+ 1), p.node_id(),
-                std::numeric_limits<size_t>::max(), operator_profile()));
-
-        // Write the already-recovered portion first.
-        auto block = _recovered_build_block->to_block();
-        _recovered_build_block.reset();
-        RETURN_IF_ERROR(combined_stream->spill_block(state, block, 
/*eof=*/false));
-
-        // Drain any unread tail from the original build_stream.
-        if (_current_partition.build_stream &&
-            _current_partition.build_stream->ready_for_reading()) {
-            
_current_partition.build_stream->set_read_counters(operator_profile());
-            bool eos = false;
-            while (!eos && !state->is_cancelled()) {
-                vectorized::Block tail_block;
-                RETURN_IF_ERROR(
-                        
_current_partition.build_stream->read_next_block_sync(&tail_block, &eos));
-                if (tail_block.rows() > 0) {
-                    RETURN_IF_ERROR(combined_stream->spill_block(state, 
tail_block, /*eof=*/false));
-                }
-            }
-            ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(
-                    _current_partition.build_stream);
-            _current_partition.build_stream.reset();
-        }
-
-        RETURN_IF_ERROR(combined_stream->close());
-        _current_partition.build_stream = std::move(combined_stream);
-    }
-    // If _recovered_build_block is empty but build_stream is still active,
-    // repartition_current_partition reads it directly — no extra work needed.
-
     VLOG_DEBUG << fmt::format(
             "Query:{}, hash join probe:{}, task:{}, revoke_build_data: "
             "repartitioning queue partition at level {} (build in 
SpillStream)",
diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index d972524951d..e7a16193c0d 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -35,8 +35,15 @@
 
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
+// Default spill partitioner for initial partitioning (level-0). Repartition
+// paths may use different channel-id policies (e.g. raw-hash mode).
 using SpillPartitionerType = 
vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>;
 
+// Repartition partitioner: keeps raw hash (no final modulo) so 
SpillRepartitioner
+// can apply level-aware hash mixing and channel mapping.
+using SpillRePartitionerType =
+        
vectorized::Crc32HashPartitioner<vectorized::SpillRePartitionChannelIds>;
+
 struct SpillContext {
     std::atomic_int running_tasks_count;
     TUniqueId query_id;
@@ -106,4 +113,4 @@ inline void update_profile_from_inner_profile(const 
std::string& name,
 }
 
 #include "common/compile_check_end.h"
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/vec/runtime/partitioner.cpp 
b/be/src/vec/runtime/partitioner.cpp
index 5095c7a7dbb..de5b6542087 100644
--- a/be/src/vec/runtime/partitioner.cpp
+++ b/be/src/vec/runtime/partitioner.cpp
@@ -87,5 +87,6 @@ Status Crc32CHashPartitioner::clone(RuntimeState* state,
 
 template class Crc32HashPartitioner<ShuffleChannelIds>;
 template class Crc32HashPartitioner<SpillPartitionChannelIds>;
+template class Crc32HashPartitioner<SpillRePartitionChannelIds>;
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h
index dfe0e79f988..2b3a28d1f6c 100644
--- a/be/src/vec/runtime/partitioner.h
+++ b/be/src/vec/runtime/partitioner.h
@@ -114,9 +114,20 @@ struct ShuffleChannelIds {
 
 struct SpillPartitionChannelIds {
     using HashValType = PartitionerBase::HashValType;
+    // Default spill partition mapping used by level-0 partitioning:
+    // rotate hash bits and apply modulo to get a channel id directly.
     HashValType operator()(HashValType l, size_t r) { return ((l >> 16) | (l 
<< 16)) % r; }
 };
 
+struct SpillRePartitionChannelIds {
+    using HashValType = PartitionerBase::HashValType;
+
+    // Repartition mode: return the raw hash value without modulo.
+    // The caller (SpillRepartitioner) will apply level-aware hash mixing and
+    // final channel mapping, so repartition behavior can vary by level.
+    HashValType operator()(HashValType l, size_t /*r*/) { return l; }
+};
+
 static inline PartitionerBase::HashValType 
crc32c_shuffle_mix(PartitionerBase::HashValType h) {
     // Step 1: fold high entropy into low bits
     h ^= h >> 16;
diff --git a/be/src/vec/spill/spill_repartitioner.cpp 
b/be/src/vec/spill/spill_repartitioner.cpp
index 6ef8655c7df..e68ef40e280 100644
--- a/be/src/vec/spill/spill_repartitioner.cpp
+++ b/be/src/vec/spill/spill_repartitioner.cpp
@@ -19,7 +19,6 @@
 
 #include <glog/logging.h>
 
-#include <algorithm>
 #include <limits>
 #include <memory>
 #include <vector>
@@ -38,22 +37,25 @@ namespace doris::pipeline {
 #include "common/compile_check_begin.h"
 
 void SpillRepartitioner::init(std::unique_ptr<vectorized::PartitionerBase> 
partitioner,
-                              RuntimeProfile* profile, int fanout) {
+                              RuntimeProfile* profile, int fanout, int 
repartition_level) {
     _partitioner = std::move(partitioner);
     _use_column_index_mode = false;
     _fanout = fanout;
+    _repartition_level = repartition_level;
     _repartition_timer = ADD_TIMER_WITH_LEVEL(profile, "SpillRepartitionTime", 
1);
     _repartition_rows = ADD_COUNTER_WITH_LEVEL(profile, 
"SpillRepartitionRows", TUnit::UNIT, 1);
 }
 
 void SpillRepartitioner::init_with_key_columns(std::vector<size_t> 
key_column_indices,
                                                
std::vector<vectorized::DataTypePtr> key_data_types,
-                                               RuntimeProfile* profile, int 
fanout) {
+                                               RuntimeProfile* profile, int 
fanout,
+                                               int repartition_level) {
     _key_column_indices = std::move(key_column_indices);
     _key_data_types = std::move(key_data_types);
     _use_column_index_mode = true;
     _partitioner.reset();
     _fanout = fanout;
+    _repartition_level = repartition_level;
     _repartition_timer = ADD_TIMER_WITH_LEVEL(profile, "SpillRepartitionTime", 
1);
     _repartition_rows = ADD_COUNTER_WITH_LEVEL(profile, 
"SpillRepartitionRows", TUnit::UNIT, 1);
 }
@@ -155,15 +157,16 @@ Status SpillRepartitioner::_route_block(
         RuntimeState* state, vectorized::Block& block,
         std::vector<vectorized::SpillStreamSPtr>& output_streams,
         std::vector<std::unique_ptr<vectorized::MutableBlock>>& 
output_buffers) {
-    // Compute partition assignment for every row in the block
+    // Compute raw hash values for every row in the block.
     RETURN_IF_ERROR(_partitioner->do_partitioning(state, &block));
-    const auto& channel_ids = _partitioner->get_channel_ids();
+    const auto& hash_vals = _partitioner->get_channel_ids();
     const auto rows = block.rows();
 
     // Build per-partition row index lists
     std::vector<std::vector<uint32_t>> partition_row_indexes(_fanout);
     for (uint32_t i = 0; i < rows; ++i) {
-        partition_row_indexes[channel_ids[i]].emplace_back(i);
+        auto partition_idx = _map_hash_to_partition(hash_vals[i]);
+        partition_row_indexes[partition_idx].emplace_back(i);
     }
 
     // Scatter rows into per-partition buffers
@@ -210,9 +213,9 @@ Status SpillRepartitioner::_route_block_by_columns(
                                        static_cast<uint32_t>(rows));
     }
 
-    // Apply SpillPartitionChannelIds: ((hash >> 16) | (hash << 16)) % _fanout
+    // Map hash values to output channels with level-aware mixing.
     for (size_t i = 0; i < rows; ++i) {
-        hashes[i] = ((hashes[i] >> 16) | (hashes[i] << 16)) % _fanout;
+        hashes[i] = _map_hash_to_partition(hashes[i]);
     }
 
     // Build per-partition row index lists
@@ -267,5 +270,15 @@ Status SpillRepartitioner::_flush_all_buffers(
     return Status::OK();
 }
 
+uint32_t SpillRepartitioner::_map_hash_to_partition(uint32_t hash) const {
+    DCHECK_GT(_fanout, 0);
+    // Use a level-dependent salt so each repartition level has a different
+    // projection from hash-space to partition-space.
+    constexpr uint32_t LEVEL_SALT_BASE = 0x9E3779B9U;
+    auto salt = static_cast<uint32_t>(_repartition_level + 1) * 
LEVEL_SALT_BASE;
+    auto mixed = vectorized::crc32c_shuffle_mix(hash ^ salt);
+    return ((mixed >> 16) | (mixed << 16)) % static_cast<uint32_t>(_fanout);
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris::pipeline
diff --git a/be/src/vec/spill/spill_repartitioner.h 
b/be/src/vec/spill/spill_repartitioner.h
index ab40d9c307b..fd0b5d33ec1 100644
--- a/be/src/vec/spill/spill_repartitioner.h
+++ b/be/src/vec/spill/spill_repartitioner.h
@@ -51,11 +51,11 @@ namespace pipeline {
 ///    specified column indices. Suitable for Aggregation where spill blocks 
have a
 ///    different schema (key columns at fixed positions 0..N-1).
 ///
-/// For each level of repartitioning, the partitioner uses the same CRC32 hash 
function
-/// on key columns but with the SpillPartitionChannelIds functor (bit rotation 
+ modulo).
-/// Since all rows in the input stream already belong to the same parent 
partition (they
-/// had identical partition indices at the previous level), re-hashing with 
the same
-/// function still produces a valid and well-distributed split into FANOUT 
sub-partitions.
+/// For repartitioning, hash computation and final channel mapping are 
separated:
+/// - a partitioner can provide either direct channel ids or raw hash values
+///   (e.g. SpillRePartitionChannelIds returns raw hash),
+/// - SpillRepartitioner then applies the final channel mapping strategy.
+/// This keeps repartition policy centralized and allows level-aware mapping.
 ///
 /// Processing is incremental: each call to repartition() processes up to 
MAX_BATCH_BYTES
 /// (32 MB) of data and then returns, allowing the pipeline scheduler to yield 
and
@@ -77,7 +77,7 @@ public:
     // @param profile      RuntimeProfile for counters.
     // @param fanout       Number of output sub-partitions. Defaults to 8.
     void init(std::unique_ptr<vectorized::PartitionerBase> partitioner, 
RuntimeProfile* profile,
-              int fanout);
+              int fanout, int repartition_level);
 
     /// Initialize the repartitioner with explicit key column indices (for 
Aggregation).
     /// Computes CRC32 hash directly on the specified columns without using 
VExpr.
@@ -94,7 +94,7 @@ public:
     // @param fanout              Number of output sub-partitions. Defaults to 
8.
     void init_with_key_columns(std::vector<size_t> key_column_indices,
                                std::vector<vectorized::DataTypePtr> 
key_data_types,
-                               RuntimeProfile* profile, int fanout);
+                               RuntimeProfile* profile, int fanout, int 
repartition_level);
 
     /// Repartition data from input_stream into output_streams.
     ///
@@ -147,6 +147,8 @@ private:
             RuntimeState* state, std::vector<vectorized::SpillStreamSPtr>& 
output_streams,
             std::vector<std::unique_ptr<vectorized::MutableBlock>>& 
output_buffers, bool force);
 
+    uint32_t _map_hash_to_partition(uint32_t hash) const;
+
     // Partitioner mode (used by Hash Join)
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
 
@@ -159,6 +161,9 @@ private:
     RuntimeProfile::Counter* _repartition_rows = nullptr;
     // dynamic fanout to allow operator-configured partition counts
     int _fanout = 8;
+    // Target repartition level (parent level + 1). Used to derive level-aware
+    // channel mapping from raw hash values.
+    int _repartition_level = 0;
 
 public:
     // Accessor for configured fanout


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to