This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 103796a2e99 branch-4.1: [fix](be) Stop extra operator work after 
cancellation (#64077) (#64816)
103796a2e99 is described below

commit 103796a2e997eb93f837220d119cd8ce49f51210
Author: zclllyybb <[email protected]>
AuthorDate: Fri Jun 26 13:56:26 2026 +0800

    branch-4.1: [fix](be) Stop extra operator work after cancellation (#64077) 
(#64816)
    
    pick https://github.com/apache/doris/pull/64077
---
 be/src/exec/operator/analytic_sink_operator.cpp    |   5 +-
 be/src/exec/operator/analytic_sink_operator.h      |   2 +-
 .../operator/nested_loop_join_build_operator.cpp   |   5 +-
 .../partitioned_aggregation_sink_operator.cpp      |   2 +
 .../partitioned_aggregation_source_operator.cpp    |   4 +-
 .../partitioned_hash_join_probe_operator.cpp       |   3 +
 .../partitioned_hash_join_sink_operator.cpp        |   1 +
 .../operator/spill_iceberg_table_sink_operator.cpp |   1 +
 be/src/exec/operator/spill_sort_sink_operator.cpp  |   2 +
 .../exec/operator/spill_sort_source_operator.cpp   |   3 +-
 be/src/exec/operator/spill_utils.h                 |   2 +
 .../exec/operator/analytic_sink_operator_test.cpp  |  62 ++++++++-
 .../nested_loop_join_build_operator_test.cpp       | 143 +++++++++++++++++++++
 .../partitioned_aggregation_sink_operator_test.cpp |  28 +++-
 ...artitioned_aggregation_source_operator_test.cpp |  48 +++++++
 .../partitioned_hash_join_probe_operator_test.cpp  |  33 +++++
 .../partitioned_hash_join_sink_operator_test.cpp   |  26 ++++
 .../operator/spill_sort_sink_operator_test.cpp     |  79 ++++++++++++
 .../operator/spill_sort_source_operator_test.cpp   |  20 +++
 19 files changed, 461 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/operator/analytic_sink_operator.cpp 
b/be/src/exec/operator/analytic_sink_operator.cpp
index 7a6b0d659c0..863acc4a59b 100644
--- a/be/src/exec/operator/analytic_sink_operator.cpp
+++ b/be/src/exec/operator/analytic_sink_operator.cpp
@@ -341,8 +341,9 @@ bool 
AnalyticSinkLocalState::_get_next_for_range_between(int64_t current_block_r
     return false;
 }
 
-Status AnalyticSinkLocalState::_execute_impl() {
+Status AnalyticSinkLocalState::_execute_impl(RuntimeState* state) {
     while (_output_block_index < _input_blocks.size()) {
+        RETURN_IF_CANCELLED(state);
         {
             _get_partition_by_end();
             // streaming_mode means no need get all parition data, could 
calculate data when it's arrived
@@ -754,7 +755,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, Block* input_bloc
     local_state._reserve_mem_size = 0;
     SCOPED_PEAK_MEM(&local_state._reserve_mem_size);
     RETURN_IF_ERROR(_add_input_block(state, input_block));
-    RETURN_IF_ERROR(local_state._execute_impl());
+    RETURN_IF_ERROR(local_state._execute_impl(state));
     if (local_state._input_eos) {
         LockGuard lc(local_state._shared_state->sink_eos_lock);
         local_state._shared_state->sink_eos = true;
diff --git a/be/src/exec/operator/analytic_sink_operator.h 
b/be/src/exec/operator/analytic_sink_operator.h
index 7b6975e5e68..a54088025f1 100644
--- a/be/src/exec/operator/analytic_sink_operator.h
+++ b/be/src/exec/operator/analytic_sink_operator.h
@@ -74,7 +74,7 @@ public:
 
 private:
     friend class AnalyticSinkOperatorX;
-    Status _execute_impl();
+    Status _execute_impl(RuntimeState* state);
     // over(partition by k1 order by k2 range|rows unbounded preceding and 
unbounded following)
     bool _get_next_for_partition(int64_t current_block_rows, int64_t 
current_block_base_pos);
     // over(partition by k1 order by k2 range between unbounded preceding and 
current row)
diff --git a/be/src/exec/operator/nested_loop_join_build_operator.cpp 
b/be/src/exec/operator/nested_loop_join_build_operator.cpp
index 30e3fe73826..91a0debda69 100644
--- a/be/src/exec/operator/nested_loop_join_build_operator.cpp
+++ b/be/src/exec/operator/nested_loop_join_build_operator.cpp
@@ -54,7 +54,10 @@ Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* 
state) {
 }
 
 Status NestedLoopJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
-    RETURN_IF_ERROR(_runtime_filter_producer_helper->process(state, 
_shared_state->build_blocks));
+    if (!state->is_cancelled()) {
+        RETURN_IF_ERROR(
+                _runtime_filter_producer_helper->process(state, 
_shared_state->build_blocks));
+    }
     
_runtime_filter_producer_helper->collect_realtime_profile(custom_profile());
     RETURN_IF_ERROR(JoinBuildSinkLocalState::close(state, exec_status));
     return Status::OK();
diff --git a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp 
b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
index 94b469a5df3..45a7f117913 100644
--- a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
@@ -356,6 +356,7 @@ Status 
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
                                                        HashTableCtxType& 
context,
                                                        HashTableType& 
hash_table,
                                                        const size_t 
size_to_revoke, bool eos) {
+    RETURN_IF_CANCELLED(state);
     Status status;
 
     context.init_iterator();
@@ -427,6 +428,7 @@ Status 
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
 }
 
 Status PartitionedAggSinkLocalState::_revoke_memory(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     if (_eos) {
         return Status::OK();
     }
diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp 
b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
index 057915cac93..79b1d2211ff 100644
--- a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
@@ -357,8 +357,9 @@ void PartitionedAggLocalState::_init_partition_queue() {
 
 Status PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* 
state,
                                                                 
AggSpillPartitionInfo& partition) {
+    RETURN_IF_CANCELLED(state);
     size_t accumulated_bytes = 0;
-    if (!partition.spill_file || state->is_cancelled()) {
+    if (!partition.spill_file) {
         return Status::OK();
     }
 
@@ -448,6 +449,7 @@ Status 
PartitionedAggLocalState::_flush_hash_table_to_sub_spill_files(RuntimeSta
 }
 
 Status PartitionedAggLocalState::_flush_and_repartition(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
     const int new_level = _current_partition.level + 1;
 
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp 
b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
index 25161399c3a..6cb9ce5c101 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
@@ -335,6 +335,7 @@ bool PartitionedHashJoinProbeLocalState::is_blockable() 
const {
 
 Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
         RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
+    RETURN_IF_CANCELLED(state);
     if (!partition_info.build_file) {
         // Build file is already exhausted for this partition.
         return Status::OK();
@@ -385,6 +386,7 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
 
 Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
         RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
+    RETURN_IF_CANCELLED(state);
     if (!partition_info.probe_file) {
         // Probe file is already exhausted for this partition.
         return Status::OK();
@@ -997,6 +999,7 @@ Status 
PartitionedHashJoinProbeLocalState::revoke_build_data(RuntimeState* state
 //   repartitioned and pushed back to the queue so the hash table build can
 //   proceed later with a smaller footprint.
 Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     VLOG_DEBUG << fmt::format("Query:{}, hash join probe:{}, task:{}, 
revoke_memory, child_eos:{}",
                               print_id(state->query_id()), node_id(), 
state->task_id(),
diff --git a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp 
b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
index 7e3267d8e58..b481c5a6b5b 100644
--- a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
@@ -300,6 +300,7 @@ Status 
PartitionedHashJoinSinkLocalState::_finish_spilling(RuntimeState* state)
 /// because we use limit 1MB here. So we need to force spill all memory to 
disk to make sure we can make progress.
 Status 
PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(RuntimeState*
 state,
                                                                             
bool force_spill) {
+    RETURN_IF_CANCELLED(state);
     
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
 {
         auto status = Status::InternalError(
                 "fault_inject partitioned_hash_join_sink revoke_memory 
canceled");
diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp 
b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
index ab02a0a1f36..58a54868799 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
@@ -80,6 +80,7 @@ size_t 
SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* sta
 }
 
 Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     if (!_writer) {
         return Status::OK();
     }
diff --git a/be/src/exec/operator/spill_sort_sink_operator.cpp 
b/be/src/exec/operator/spill_sort_sink_operator.cpp
index 1e88f1e3850..c0ccf4657d4 100644
--- a/be/src/exec/operator/spill_sort_sink_operator.cpp
+++ b/be/src/exec/operator/spill_sort_sink_operator.cpp
@@ -188,6 +188,7 @@ size_t 
SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool e
 }
 
 Status SpillSortSinkLocalState::_execute_spill_sort(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     auto& parent = Base::_parent->template cast<Parent>();
     
state->get_query_ctx()->resource_ctx()->task_controller()->increase_revoking_tasks_count();
     Defer defer {[&]() {
@@ -221,6 +222,7 @@ Status 
SpillSortSinkLocalState::_execute_spill_sort(RuntimeState* state) {
 }
 
 Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
+    RETURN_IF_CANCELLED(state);
     auto& parent = Base::_parent->template cast<Parent>();
     if (!_shared_state->is_spilled) {
         _shared_state->is_spilled = true;
diff --git a/be/src/exec/operator/spill_sort_source_operator.cpp 
b/be/src/exec/operator/spill_sort_source_operator.cpp
index 7bb1bfa448b..e516ead73c6 100644
--- a/be/src/exec/operator/spill_sort_source_operator.cpp
+++ b/be/src/exec/operator/spill_sort_source_operator.cpp
@@ -83,6 +83,7 @@ int 
SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const
 }
 
 Status SpillSortLocalState::execute_merge_sort_spill_files(RuntimeState* 
state) {
+    RETURN_IF_CANCELLED(state);
     auto& parent = Base::_parent->template cast<Parent>();
     SCOPED_TIMER(_spill_merge_sort_timer);
     Status status;
@@ -264,4 +265,4 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* 
state, Block* block, bo
 }
 
 #include "common/compile_check_end.h"
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/operator/spill_utils.h 
b/be/src/exec/operator/spill_utils.h
index 6a84c6fc2b0..a54c1860ad9 100644
--- a/be/src/exec/operator/spill_utils.h
+++ b/be/src/exec/operator/spill_utils.h
@@ -79,8 +79,10 @@ struct SpillContext {
 // small utility to run the provided callbacks and forward cancellation.
 inline Status run_spill_task(RuntimeState* state, std::function<Status()> 
exec_func,
                              std::function<Status()> fin_cb = {}) {
+    RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(exec_func());
     if (fin_cb) {
+        RETURN_IF_CANCELLED(state);
         RETURN_IF_ERROR(fin_cb());
     }
     return Status::OK();
diff --git a/be/test/exec/operator/analytic_sink_operator_test.cpp 
b/be/test/exec/operator/analytic_sink_operator_test.cpp
index a64d16c676f..517c73642ce 100644
--- a/be/test/exec/operator/analytic_sink_operator_test.cpp
+++ b/be/test/exec/operator/analytic_sink_operator_test.cpp
@@ -21,6 +21,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <string>
 
 #include "core/block/block.h"
 #include "core/data_type/data_type.h"
@@ -51,6 +52,35 @@ private:
     std::unique_ptr<MockRowDescriptor> _mock_row_desc;
 };
 
+namespace {
+
+constexpr auto CANCEL_REASON = "analytic sink cancelled";
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
+class CancelAfterChecksRuntimeState : public MockRuntimeState {
+public:
+    void reset_cancel_after(int64_t check_count) {
+        _check_count = 0;
+        _cancel_after = check_count;
+    }
+
+    bool is_cancelled() const override {
+        return _cancel_after >= 0 && ++_check_count > _cancel_after;
+    }
+
+    Status cancel_reason() const override { return 
Status::Cancelled(CANCEL_REASON); }
+
+private:
+    mutable int64_t _check_count = 0;
+    int64_t _cancel_after = -1;
+};
+
+} // namespace
+
 struct AnalyticSinkOperatorTest : public ::testing::Test {
     void Initialize(int batch_size) {
         sink = std::make_unique<AnalyticSinkOperatorX>(&pool);
@@ -167,6 +197,36 @@ struct AnalyticSinkOperatorTest : public ::testing::Test {
     std::vector<int64_t> _data_vals;
 };
 
+TEST_F(AnalyticSinkOperatorTest, SinkReturnsCancelBeforeOutputBlock) {
+    Initialize(10);
+    create_operator(false, 0, "", {}, nullptr);
+    create_local_state();
+
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+    Block block = ColumnHelper::create_block<DataTypeInt64>({2, 3, 1});
+    expect_cancelled(sink->sink(state.get(), &block, true));
+}
+
+TEST_F(AnalyticSinkOperatorTest, 
SinkStopsBeforeNextBufferedBlockWhenCancelled) {
+    const int batch_size = 2;
+    Initialize(batch_size);
+    auto cancel_state = std::make_shared<CancelAfterChecksRuntimeState>();
+    cancel_state->_batch_size = batch_size;
+    state = cancel_state;
+    create_operator(false, 0, "", {}, nullptr);
+    create_local_state();
+
+    {
+        Block block = ColumnHelper::create_block<DataTypeInt64>({0, 1});
+        auto status = sink->sink(state.get(), &block, false);
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
+    cancel_state->reset_cancel_after(1);
+    Block block = ColumnHelper::create_block<DataTypeInt64>({2, 3});
+    expect_cancelled(sink->sink(state.get(), &block, true));
+}
+
 TEST_F(AnalyticSinkOperatorTest, withoutAggFunction) {
     Initialize(10);
     create_operator(false, 0, "", {}, nullptr);
@@ -817,4 +877,4 @@ TEST_F(AnalyticSinkOperatorTest, AggFunction8) {
     std::cout << "######### AggFunction with row_number test end #########" << 
std::endl;
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/exec/operator/nested_loop_join_build_operator_test.cpp 
b/be/test/exec/operator/nested_loop_join_build_operator_test.cpp
new file mode 100644
index 00000000000..34141e78aac
--- /dev/null
+++ b/be/test/exec/operator/nested_loop_join_build_operator_test.cpp
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/operator/nested_loop_join_build_operator.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gtest/gtest.h>
+
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "exec/runtime_filter/runtime_filter_test_utils.h"
+#include "runtime/descriptor_helper.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_profile.h"
+#include "testutil/mock/mock_operators.h"
+
+namespace doris {
+namespace {
+
+TDescriptorTable create_desc_table() {
+    TDescriptorTableBuilder builder;
+    TTupleDescriptorBuilder()
+            .add_slot(TSlotDescriptorBuilder()
+                              .type(TYPE_INT)
+                              .nullable(false)
+                              .column_name("probe_col")
+                              .column_pos(0)
+                              .build())
+            .build(&builder);
+    TTupleDescriptorBuilder()
+            .add_slot(TSlotDescriptorBuilder()
+                              .type(TYPE_INT)
+                              .nullable(false)
+                              .column_name("build_col")
+                              .column_pos(0)
+                              .build())
+            .build(&builder);
+    return builder.desc_tbl();
+}
+
+TExpr build_slot_ref_expr() {
+    auto expr = TRuntimeFilterDescBuilder::get_default_expr();
+    expr.nodes[0].slot_ref.__set_slot_id(1);
+    expr.nodes[0].slot_ref.__set_tuple_id(1);
+    return expr;
+}
+
+TPlanNode create_nested_loop_join_plan_node() {
+    TPlanNode node;
+    node.node_id = 0;
+    node.node_type = TPlanNodeType::CROSS_JOIN_NODE;
+    node.num_children = 2;
+    node.limit = -1;
+
+    TNestedLoopJoinNode join_node;
+    join_node.__set_join_op(TJoinOp::INNER_JOIN);
+    node.__set_nested_loop_join_node(join_node);
+    node.row_tuples.push_back(0);
+    node.row_tuples.push_back(1);
+    node.nullable_tuples.push_back(false);
+    node.nullable_tuples.push_back(false);
+
+    auto src_expr = build_slot_ref_expr();
+    TRuntimeFilterDescBuilder runtime_filter_builder(0, src_expr, 0);
+    node.__isset.runtime_filters = true;
+    node.runtime_filters.push_back(runtime_filter_builder.build());
+    return node;
+}
+
+} // namespace
+
+class NestedLoopJoinBuildOperatorTest : public RuntimeFilterTest {};
+
+TEST_F(NestedLoopJoinBuildOperatorTest, 
CloseSkipsRuntimeFilterProcessWhenCancelled) {
+    ObjectPool pool;
+    DescriptorTbl* desc_tbl = nullptr;
+    auto desc_table = create_desc_table();
+    auto status = DescriptorTbl::create(&pool, desc_table, &desc_tbl);
+    ASSERT_TRUE(status.ok()) << status.to_string();
+
+    auto* state = _runtime_states[0].get();
+    state->set_desc_tbl(desc_tbl);
+
+    auto plan_node = create_nested_loop_join_plan_node();
+    auto sink_operator =
+            std::make_shared<NestedLoopJoinBuildSinkOperatorX>(&pool, 0, 1, 
plan_node, *desc_tbl);
+
+    auto child = std::make_shared<MockSourceOperator>();
+    child->_row_descriptor = RowDescriptor(*desc_tbl, {1}, {false});
+    ASSERT_TRUE(sink_operator->set_child(child));
+
+    status = sink_operator->init(plan_node, state);
+    ASSERT_TRUE(status.ok()) << status.to_string();
+    status = sink_operator->prepare(state);
+    ASSERT_TRUE(status.ok()) << status.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+    ASSERT_NE(shared_state, nullptr);
+
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = &_profile,
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .shared_state_map = shared_state_map,
+                             .tsink = TDataSink()};
+    status = sink_operator->setup_local_state(state, info);
+    ASSERT_TRUE(status.ok()) << status.to_string();
+
+    auto* local_state =
+            
dynamic_cast<NestedLoopJoinBuildSinkLocalState*>(state->get_sink_local_state());
+    ASSERT_NE(local_state, nullptr);
+
+    status = local_state->open(state);
+    ASSERT_TRUE(status.ok()) << status.to_string();
+    local_state->build_blocks().emplace_back();
+
+    state->cancel(Status::Cancelled("nested loop join build close cancelled"));
+    status = local_state->close(state, Status::OK());
+    ASSERT_TRUE(status.ok()) << status.to_string();
+}
+
+} // namespace doris
diff --git 
a/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp 
b/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp
index 42cc320f183..15559910494 100644
--- a/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp
@@ -43,6 +43,32 @@ protected:
     PartitionedAggregationTestHelper _helper;
 };
 
+namespace {
+
+constexpr auto CANCEL_REASON = "partitioned aggregation sink cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
+} // namespace
+
+TEST_F(PartitionedAggregationSinkOperatorTest, 
RevokeMemoryReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+    auto* local_state = 
_helper.create_sink_local_state(_helper.runtime_state.get(),
+                                                        sink_operator.get(), 
shared_state);
+    ASSERT_NE(local_state, nullptr);
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
 TEST_F(PartitionedAggregationSinkOperatorTest, Init) {
     auto [source_operator, sink_operator] = _helper.create_operators();
     ASSERT_TRUE(source_operator != nullptr);
@@ -709,4 +735,4 @@ TEST_F(PartitionedAggregationNullableKeySinkTest, 
SinkEOSFlushNullKeyOnly) {
     ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
 }
 
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git 
a/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp 
b/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp
index 2b13ad97f4c..14b567d7e31 100644
--- a/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp
@@ -46,6 +46,54 @@ protected:
     PartitionedAggregationTestHelper _helper;
 };
 
+namespace {
+
+constexpr auto CANCEL_REASON = "partitioned aggregation source cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
+} // namespace
+
+TEST_F(PartitionedAggregationSourceOperatorTest, 
RecoverBlocksFromPartitionReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+    auto* local_state = 
_helper.create_source_local_state(_helper.runtime_state.get(),
+                                                          
source_operator.get(), shared_state);
+    AggSpillPartitionInfo partition;
+
+    cancel_state(_helper.runtime_state.get());
+    expect_cancelled(
+            
local_state->_recover_blocks_from_partition(_helper.runtime_state.get(), 
partition));
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, 
RevokeMemoryReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+    auto* local_state = 
_helper.create_source_local_state(_helper.runtime_state.get(),
+                                                          
source_operator.get(), shared_state);
+    ASSERT_NE(local_state, nullptr);
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(source_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, 
FlushAndRepartitionReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+    auto* local_state = 
_helper.create_source_local_state(_helper.runtime_state.get(),
+                                                          
source_operator.get(), shared_state);
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(local_state->_flush_and_repartition(_helper.runtime_state.get()));
+}
+
 TEST_F(PartitionedAggregationSourceOperatorTest, Init) {
     auto [source_operator, sink_operator] = _helper.create_operators();
     ASSERT_TRUE(source_operator != nullptr);
diff --git 
a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp 
b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
index 2ce5f1921b0..e099f0d3827 100644
--- a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -49,6 +49,17 @@ protected:
 
 namespace {
 
+constexpr auto CANCEL_REASON = "partitioned hash join probe cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
 SpillFileSPtr create_probe_test_spill_file(RuntimeState* state, 
RuntimeProfile* profile,
                                            int node_id, const std::string& 
prefix,
                                            const 
std::vector<std::vector<int32_t>>& batches) {
@@ -124,6 +135,28 @@ Status 
prepare_probe_local_state_for_repartition(PartitionedHashJoinProbeOperato
 
 } // namespace
 
+TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildAndProbeReturnCancelAtEntry) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto* local_state = 
_helper.create_probe_local_state(_helper.runtime_state.get(),
+                                                         probe_operator.get(), 
shared_state);
+    JoinSpillPartitionInfo build_partition(nullptr, nullptr, 0);
+    JoinSpillPartitionInfo probe_partition(nullptr, nullptr, 0);
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+                                                                      
build_partition));
+    
expect_cancelled(local_state->recover_probe_blocks_from_partition(_helper.runtime_state.get(),
+                                                                      
probe_partition));
+}
+
+TEST_F(PartitionedHashJoinProbeOperatorTest, RevokeMemoryReturnsCancelAtEntry) 
{
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(probe_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
 TEST_F(PartitionedHashJoinProbeOperatorTest, debug_string) {
     auto [probe_operator, sink_operator] = _helper.create_operators();
 
diff --git a/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp 
b/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp
index 92479589603..f28fc8cbddb 100644
--- a/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp
@@ -60,6 +60,32 @@ protected:
     PartitionedHashJoinTestHelper _helper;
 };
 
+namespace {
+
+constexpr auto CANCEL_REASON = "partitioned hash join sink cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
+} // namespace
+
+TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemoryReturnsCancelAtEntry) {
+    auto [probe_operator, sink_operator] = _helper.create_operators();
+    std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+    auto* local_state = 
_helper.create_sink_local_state(_helper.runtime_state.get(),
+                                                        sink_operator.get(), 
shared_state);
+    ASSERT_NE(local_state, nullptr);
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
 TEST_F(PartitionedHashJoinSinkOperatorTest, Init) {
     TPlanNode tnode = _helper.create_test_plan_node();
     const DescriptorTbl& desc_tbl = _helper.runtime_state->desc_tbl();
diff --git a/be/test/exec/operator/spill_sort_sink_operator_test.cpp 
b/be/test/exec/operator/spill_sort_sink_operator_test.cpp
index db47d9565a3..ead792a1e32 100644
--- a/be/test/exec/operator/spill_sort_sink_operator_test.cpp
+++ b/be/test/exec/operator/spill_sort_sink_operator_test.cpp
@@ -25,6 +25,7 @@
 #include "core/block/block.h"
 #include "core/data_type/data_type_number.h"
 #include "exec/operator/spill_sort_test_helper.h"
+#include "exec/operator/spill_utils.h"
 #include "exec/pipeline/dependency.h"
 #include "exec/pipeline/pipeline_task.h"
 #include "testutil/column_helper.h"
@@ -38,6 +39,84 @@ protected:
     SpillSortTestHelper _helper;
 };
 
+namespace {
+
+constexpr auto CANCEL_REASON = "spill sort sink cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
+} // namespace
+
+TEST_F(SpillSortSinkOperatorTest, ExecuteSpillSortReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    auto sink_local_state = 
SpillSortSinkLocalState::create_unique(sink_operator.get(),
+                                                                   
_helper.runtime_state.get());
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(sink_local_state->_execute_spill_sort(_helper.runtime_state.get()));
+}
+
+TEST_F(SpillSortSinkOperatorTest, RevokeMemoryReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+
+    auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+    ASSERT_TRUE(shared_state != nullptr);
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.operator_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .shared_state_map = {},
+                             .tsink = {}};
+
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
+TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAtEntry) {
+    cancel_state(_helper.runtime_state.get());
+
+    bool executed = false;
+    expect_cancelled(run_spill_task(_helper.runtime_state.get(), [&]() {
+        executed = true;
+        return Status::OK();
+    }));
+    EXPECT_FALSE(executed);
+}
+
+TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAfterCallback) {
+    bool finalized = false;
+
+    auto status = run_spill_task(
+            _helper.runtime_state.get(),
+            [&]() {
+                cancel_state(_helper.runtime_state.get());
+                return Status::OK();
+            },
+            [&]() {
+                finalized = true;
+                return Status::OK();
+            });
+    expect_cancelled(status);
+    EXPECT_FALSE(finalized);
+}
+
 TEST_F(SpillSortSinkOperatorTest, Basic) {
     auto [source_operator, sink_operator] = _helper.create_operators();
     ASSERT_TRUE(source_operator != nullptr);
diff --git a/be/test/exec/operator/spill_sort_source_operator_test.cpp 
b/be/test/exec/operator/spill_sort_source_operator_test.cpp
index 9f458699fa9..e4eed08b42c 100644
--- a/be/test/exec/operator/spill_sort_source_operator_test.cpp
+++ b/be/test/exec/operator/spill_sort_source_operator_test.cpp
@@ -188,8 +188,28 @@ void delete_spill_files(const std::vector<SpillFileSPtr>& 
spill_files) {
     }
 }
 
+constexpr auto CANCEL_REASON = "spill sort source cancelled";
+
+void cancel_state(RuntimeState* state) {
+    state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+    EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+    EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) << 
status.to_string();
+}
+
 } // namespace
 
+TEST_F(SpillSortSourceOperatorTest, 
ExecuteMergeSortSpillFilesReturnsCancelAtEntry) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    auto local_state = 
std::make_unique<SpillSortLocalState>(_helper.runtime_state.get(),
+                                                             
source_operator.get());
+
+    cancel_state(_helper.runtime_state.get());
+    
expect_cancelled(local_state->execute_merge_sort_spill_files(_helper.runtime_state.get()));
+}
+
 TEST_F(SpillSortSourceOperatorTest, Basic) {
     auto [source_operator, sink_operator] = _helper.create_operators();
     ASSERT_TRUE(source_operator != nullptr);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to