This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new e191478b079 branch-4.1: [Chore](be) Stop spill hash join repartition 
on cancel #63456 (#63532)
e191478b079 is described below

commit e191478b0796b95aa052b7bd1e275c6723369362
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 22 22:41:54 2026 +0800

    branch-4.1: [Chore](be) Stop spill hash join repartition on cancel #63456 
(#63532)
    
    Cherry-picked from #63456
    
    Co-authored-by: Pxl <[email protected]>
---
 .../partitioned_hash_join_probe_operator.cpp       | 19 ++++++++---
 .../partitioned_hash_join_probe_operator_test.cpp  | 39 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp 
b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
index 1bbc1972bec..25161399c3a 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
@@ -36,6 +36,7 @@
 #include "exec/spill/spill_repartitioner.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/runtime_profile.h"
+#include "runtime/runtime_state.h"
 
 namespace doris {
 
@@ -217,6 +218,7 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
         }
         _current_probe_reader.reset();
     }
+    _recovered_build_block.reset();
 
     // Clean up any remaining spill partition queue entries
     for (auto& entry : _spill_partition_queue) {
@@ -349,7 +351,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
         RETURN_IF_ERROR(_current_build_reader->open());
     }
     bool eos = false;
-    while (!eos) {
+    while (!eos && !state->is_cancelled()) {
         Block block;
         RETURN_IF_ERROR(_current_build_reader->read(&block, &eos));
         COUNTER_UPDATE(_recovery_build_rows, block.rows());
@@ -373,6 +375,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
             return Status::OK(); // yield — buffer full, more data may remain
         }
     }
+    RETURN_IF_CANCELLED(state);
     // Build file fully consumed.
     RETURN_IF_ERROR(_current_build_reader->close());
     _current_build_reader.reset();
@@ -409,6 +412,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
             return Status::OK(); // yield — enough data read
         }
     }
+    RETURN_IF_CANCELLED(state);
     // Probe file fully consumed.
     RETURN_IF_ERROR(_current_probe_reader->close());
     _current_probe_reader.reset();
@@ -416,6 +420,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
     return Status::OK();
 }
 
+// 
NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity):
 existing spill repartition state machine handles build/probe phases together.
 Status PartitionedHashJoinProbeLocalState::repartition_current_partition(
         RuntimeState* state, JoinSpillPartitionInfo& partition) {
     auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
@@ -474,6 +479,7 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
         }
     }
     RETURN_IF_ERROR(_repartitioner.finalize());
+    RETURN_IF_CANCELLED(state);
     _recovered_build_block.reset();
     _current_build_reader.reset(); // clear any leftover reader state
     partition.build_file.reset();
@@ -497,9 +503,9 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
         while (!done && !state->is_cancelled()) {
             RETURN_IF_ERROR(_repartitioner.repartition(state, 
partition.probe_file, &done));
         }
-        partition.probe_file.reset();
-
         RETURN_IF_ERROR(_repartitioner.finalize());
+        RETURN_IF_CANCELLED(state);
+        partition.probe_file.reset();
         _current_probe_reader.reset();
     }
 
@@ -698,7 +704,11 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
     // per-partition build and probe spill streams. After this point every 
partition
     // (including the original "level-0" ones) is accessed uniformly via the 
queue.
     if (!local_state._spill_queue_initialized) {
-        DCHECK(local_state._child_eos) << "pull() with is_spilled=true called 
before child EOS";
+        if (UNLIKELY(!local_state._child_eos)) {
+            return Status::InternalError(
+                    "query:{}, node:{}, pull() with is_spilled=true called 
before child EOS",
+                    print_id(state->query_id()), node_id());
+        }
         // There maybe some blocks still in partitioned block or probe blocks. 
Flush them to disk.
         RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true));
         // Close all probe writers so that SpillFile metadata (part_count, 
etc.)
@@ -731,6 +741,7 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
     return _pull_from_spill_queue(local_state, state, output_block, eos);
 }
 
+// 
NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity):
 existing spill queue pull handles setup, recovery, and probing phases.
 Status PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
         PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state, 
Block* output_block,
         bool* eos) const {
diff --git 
a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp 
b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
index 0bcb609438e..2ce5f1921b0 100644
--- a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -1139,6 +1139,45 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskEmpty) {
     ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
 }
 
+TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskCancelledBeforeEmptyEos) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                        probe_operator.get(), 
shared_state);
+
+    SpillFileSPtr spill_file;
+    auto relative_path = fmt::format(
+            "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()),
+            probe_operator->node_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
+    ASSERT_TRUE(ExecEnv::GetInstance()
+                        ->spill_file_mgr()
+                        ->create_spill_file(relative_path, spill_file)
+                        .ok());
+
+    {
+        SpillFileWriterSPtr writer;
+        ASSERT_TRUE(spill_file
+                            ->create_writer(_helper.runtime_state.get(),
+                                            local_state->operator_profile(), 
writer)
+                            .ok());
+        ASSERT_TRUE(writer->close().ok());
+    }
+
+    _helper.runtime_state->cancel(Status::Cancelled("test cancel"));
+
+    JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0);
+    auto status = 
local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+                                                                   
partition_info);
+    ASSERT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    ASSERT_NE(partition_info.build_file, nullptr);
+    ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
+
+    ASSERT_TRUE(local_state->close(_helper.runtime_state.get()).ok());
+    
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition_info.build_file);
+    partition_info.build_file.reset();
+}
+
 TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskLargeData) {
     // Similar setup as above...
     auto [probe_operator, sink_operator] = _helper.create_operators();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to