This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 103796a2e99 branch-4.1: [fix](be) Stop extra operator work after
cancellation (#64077) (#64816)
103796a2e99 is described below
commit 103796a2e997eb93f837220d119cd8ce49f51210
Author: zclllyybb <[email protected]>
AuthorDate: Fri Jun 26 13:56:26 2026 +0800
branch-4.1: [fix](be) Stop extra operator work after cancellation (#64077)
(#64816)
pick https://github.com/apache/doris/pull/64077
---
be/src/exec/operator/analytic_sink_operator.cpp | 5 +-
be/src/exec/operator/analytic_sink_operator.h | 2 +-
.../operator/nested_loop_join_build_operator.cpp | 5 +-
.../partitioned_aggregation_sink_operator.cpp | 2 +
.../partitioned_aggregation_source_operator.cpp | 4 +-
.../partitioned_hash_join_probe_operator.cpp | 3 +
.../partitioned_hash_join_sink_operator.cpp | 1 +
.../operator/spill_iceberg_table_sink_operator.cpp | 1 +
be/src/exec/operator/spill_sort_sink_operator.cpp | 2 +
.../exec/operator/spill_sort_source_operator.cpp | 3 +-
be/src/exec/operator/spill_utils.h | 2 +
.../exec/operator/analytic_sink_operator_test.cpp | 62 ++++++++-
.../nested_loop_join_build_operator_test.cpp | 143 +++++++++++++++++++++
.../partitioned_aggregation_sink_operator_test.cpp | 28 +++-
...artitioned_aggregation_source_operator_test.cpp | 48 +++++++
.../partitioned_hash_join_probe_operator_test.cpp | 33 +++++
.../partitioned_hash_join_sink_operator_test.cpp | 26 ++++
.../operator/spill_sort_sink_operator_test.cpp | 79 ++++++++++++
.../operator/spill_sort_source_operator_test.cpp | 20 +++
19 files changed, 461 insertions(+), 8 deletions(-)
diff --git a/be/src/exec/operator/analytic_sink_operator.cpp
b/be/src/exec/operator/analytic_sink_operator.cpp
index 7a6b0d659c0..863acc4a59b 100644
--- a/be/src/exec/operator/analytic_sink_operator.cpp
+++ b/be/src/exec/operator/analytic_sink_operator.cpp
@@ -341,8 +341,9 @@ bool
AnalyticSinkLocalState::_get_next_for_range_between(int64_t current_block_r
return false;
}
-Status AnalyticSinkLocalState::_execute_impl() {
+Status AnalyticSinkLocalState::_execute_impl(RuntimeState* state) {
while (_output_block_index < _input_blocks.size()) {
+ RETURN_IF_CANCELLED(state);
{
_get_partition_by_end();
// streaming_mode means no need get all parition data, could
calculate data when it's arrived
@@ -754,7 +755,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState*
state, Block* input_bloc
local_state._reserve_mem_size = 0;
SCOPED_PEAK_MEM(&local_state._reserve_mem_size);
RETURN_IF_ERROR(_add_input_block(state, input_block));
- RETURN_IF_ERROR(local_state._execute_impl());
+ RETURN_IF_ERROR(local_state._execute_impl(state));
if (local_state._input_eos) {
LockGuard lc(local_state._shared_state->sink_eos_lock);
local_state._shared_state->sink_eos = true;
diff --git a/be/src/exec/operator/analytic_sink_operator.h
b/be/src/exec/operator/analytic_sink_operator.h
index 7b6975e5e68..a54088025f1 100644
--- a/be/src/exec/operator/analytic_sink_operator.h
+++ b/be/src/exec/operator/analytic_sink_operator.h
@@ -74,7 +74,7 @@ public:
private:
friend class AnalyticSinkOperatorX;
- Status _execute_impl();
+ Status _execute_impl(RuntimeState* state);
// over(partition by k1 order by k2 range|rows unbounded preceding and
unbounded following)
bool _get_next_for_partition(int64_t current_block_rows, int64_t
current_block_base_pos);
// over(partition by k1 order by k2 range between unbounded preceding and
current row)
diff --git a/be/src/exec/operator/nested_loop_join_build_operator.cpp
b/be/src/exec/operator/nested_loop_join_build_operator.cpp
index 30e3fe73826..91a0debda69 100644
--- a/be/src/exec/operator/nested_loop_join_build_operator.cpp
+++ b/be/src/exec/operator/nested_loop_join_build_operator.cpp
@@ -54,7 +54,10 @@ Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState*
state) {
}
Status NestedLoopJoinBuildSinkLocalState::close(RuntimeState* state, Status
exec_status) {
- RETURN_IF_ERROR(_runtime_filter_producer_helper->process(state,
_shared_state->build_blocks));
+ if (!state->is_cancelled()) {
+ RETURN_IF_ERROR(
+ _runtime_filter_producer_helper->process(state,
_shared_state->build_blocks));
+ }
_runtime_filter_producer_helper->collect_realtime_profile(custom_profile());
RETURN_IF_ERROR(JoinBuildSinkLocalState::close(state, exec_status));
return Status::OK();
diff --git a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
index 94b469a5df3..45a7f117913 100644
--- a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
@@ -356,6 +356,7 @@ Status
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
HashTableCtxType&
context,
HashTableType&
hash_table,
const size_t
size_to_revoke, bool eos) {
+ RETURN_IF_CANCELLED(state);
Status status;
context.init_iterator();
@@ -427,6 +428,7 @@ Status
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
}
Status PartitionedAggSinkLocalState::_revoke_memory(RuntimeState* state) {
+ RETURN_IF_CANCELLED(state);
if (_eos) {
return Status::OK();
}
diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
index 057915cac93..79b1d2211ff 100644
--- a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
@@ -357,8 +357,9 @@ void PartitionedAggLocalState::_init_partition_queue() {
Status PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState*
state,
AggSpillPartitionInfo& partition) {
+ RETURN_IF_CANCELLED(state);
size_t accumulated_bytes = 0;
- if (!partition.spill_file || state->is_cancelled()) {
+ if (!partition.spill_file) {
return Status::OK();
}
@@ -448,6 +449,7 @@ Status
PartitionedAggLocalState::_flush_hash_table_to_sub_spill_files(RuntimeSta
}
Status PartitionedAggLocalState::_flush_and_repartition(RuntimeState* state) {
+ RETURN_IF_CANCELLED(state);
auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
const int new_level = _current_partition.level + 1;
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
index 25161399c3a..6cb9ce5c101 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
@@ -335,6 +335,7 @@ bool PartitionedHashJoinProbeLocalState::is_blockable()
const {
Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
+ RETURN_IF_CANCELLED(state);
if (!partition_info.build_file) {
// Build file is already exhausted for this partition.
return Status::OK();
@@ -385,6 +386,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
+ RETURN_IF_CANCELLED(state);
if (!partition_info.probe_file) {
// Probe file is already exhausted for this partition.
return Status::OK();
@@ -997,6 +999,7 @@ Status
PartitionedHashJoinProbeLocalState::revoke_build_data(RuntimeState* state
// repartitioned and pushed back to the queue so the hash table build can
// proceed later with a smaller footprint.
Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
+ RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
VLOG_DEBUG << fmt::format("Query:{}, hash join probe:{}, task:{},
revoke_memory, child_eos:{}",
print_id(state->query_id()), node_id(),
state->task_id(),
diff --git a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
index 7e3267d8e58..b481c5a6b5b 100644
--- a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
@@ -300,6 +300,7 @@ Status
PartitionedHashJoinSinkLocalState::_finish_spilling(RuntimeState* state)
/// because we use limit 1MB here. So we need to force spill all memory to
disk to make sure we can make progress.
Status
PartitionedHashJoinSinkLocalState::_execute_spill_partitioned_blocks(RuntimeState*
state,
bool force_spill) {
+ RETURN_IF_CANCELLED(state);
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
{
auto status = Status::InternalError(
"fault_inject partitioned_hash_join_sink revoke_memory
canceled");
diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
index ab02a0a1f36..58a54868799 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
@@ -80,6 +80,7 @@ size_t
SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* sta
}
Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
+ RETURN_IF_CANCELLED(state);
if (!_writer) {
return Status::OK();
}
diff --git a/be/src/exec/operator/spill_sort_sink_operator.cpp
b/be/src/exec/operator/spill_sort_sink_operator.cpp
index 1e88f1e3850..c0ccf4657d4 100644
--- a/be/src/exec/operator/spill_sort_sink_operator.cpp
+++ b/be/src/exec/operator/spill_sort_sink_operator.cpp
@@ -188,6 +188,7 @@ size_t
SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool e
}
Status SpillSortSinkLocalState::_execute_spill_sort(RuntimeState* state) {
+ RETURN_IF_CANCELLED(state);
auto& parent = Base::_parent->template cast<Parent>();
state->get_query_ctx()->resource_ctx()->task_controller()->increase_revoking_tasks_count();
Defer defer {[&]() {
@@ -221,6 +222,7 @@ Status
SpillSortSinkLocalState::_execute_spill_sort(RuntimeState* state) {
}
Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
+ RETURN_IF_CANCELLED(state);
auto& parent = Base::_parent->template cast<Parent>();
if (!_shared_state->is_spilled) {
_shared_state->is_spilled = true;
diff --git a/be/src/exec/operator/spill_sort_source_operator.cpp
b/be/src/exec/operator/spill_sort_source_operator.cpp
index 7bb1bfa448b..e516ead73c6 100644
--- a/be/src/exec/operator/spill_sort_source_operator.cpp
+++ b/be/src/exec/operator/spill_sort_source_operator.cpp
@@ -83,6 +83,7 @@ int
SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const
}
Status SpillSortLocalState::execute_merge_sort_spill_files(RuntimeState*
state) {
+ RETURN_IF_CANCELLED(state);
auto& parent = Base::_parent->template cast<Parent>();
SCOPED_TIMER(_spill_merge_sort_timer);
Status status;
@@ -264,4 +265,4 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState*
state, Block* block, bo
}
#include "common/compile_check_end.h"
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/operator/spill_utils.h
b/be/src/exec/operator/spill_utils.h
index 6a84c6fc2b0..a54c1860ad9 100644
--- a/be/src/exec/operator/spill_utils.h
+++ b/be/src/exec/operator/spill_utils.h
@@ -79,8 +79,10 @@ struct SpillContext {
// small utility to run the provided callbacks and forward cancellation.
inline Status run_spill_task(RuntimeState* state, std::function<Status()>
exec_func,
std::function<Status()> fin_cb = {}) {
+ RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(exec_func());
if (fin_cb) {
+ RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(fin_cb());
}
return Status::OK();
diff --git a/be/test/exec/operator/analytic_sink_operator_test.cpp
b/be/test/exec/operator/analytic_sink_operator_test.cpp
index a64d16c676f..517c73642ce 100644
--- a/be/test/exec/operator/analytic_sink_operator_test.cpp
+++ b/be/test/exec/operator/analytic_sink_operator_test.cpp
@@ -21,6 +21,7 @@
#include <cstdint>
#include <memory>
+#include <string>
#include "core/block/block.h"
#include "core/data_type/data_type.h"
@@ -51,6 +52,35 @@ private:
std::unique_ptr<MockRowDescriptor> _mock_row_desc;
};
+namespace {
+
+constexpr auto CANCEL_REASON = "analytic sink cancelled";
+
+void expect_cancelled(const Status& status) {
+ EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+ EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) <<
status.to_string();
+}
+
+class CancelAfterChecksRuntimeState : public MockRuntimeState {
+public:
+ void reset_cancel_after(int64_t check_count) {
+ _check_count = 0;
+ _cancel_after = check_count;
+ }
+
+ bool is_cancelled() const override {
+ return _cancel_after >= 0 && ++_check_count > _cancel_after;
+ }
+
+ Status cancel_reason() const override { return
Status::Cancelled(CANCEL_REASON); }
+
+private:
+ mutable int64_t _check_count = 0;
+ int64_t _cancel_after = -1;
+};
+
+} // namespace
+
struct AnalyticSinkOperatorTest : public ::testing::Test {
void Initialize(int batch_size) {
sink = std::make_unique<AnalyticSinkOperatorX>(&pool);
@@ -167,6 +197,36 @@ struct AnalyticSinkOperatorTest : public ::testing::Test {
std::vector<int64_t> _data_vals;
};
+TEST_F(AnalyticSinkOperatorTest, SinkReturnsCancelBeforeOutputBlock) {
+ Initialize(10);
+ create_operator(false, 0, "", {}, nullptr);
+ create_local_state();
+
+ state->cancel(Status::Cancelled(CANCEL_REASON));
+ Block block = ColumnHelper::create_block<DataTypeInt64>({2, 3, 1});
+ expect_cancelled(sink->sink(state.get(), &block, true));
+}
+
+TEST_F(AnalyticSinkOperatorTest,
SinkStopsBeforeNextBufferedBlockWhenCancelled) {
+ const int batch_size = 2;
+ Initialize(batch_size);
+ auto cancel_state = std::make_shared<CancelAfterChecksRuntimeState>();
+ cancel_state->_batch_size = batch_size;
+ state = cancel_state;
+ create_operator(false, 0, "", {}, nullptr);
+ create_local_state();
+
+ {
+ Block block = ColumnHelper::create_block<DataTypeInt64>({0, 1});
+ auto status = sink->sink(state.get(), &block, false);
+ EXPECT_TRUE(status.ok()) << status.to_string();
+ }
+
+ cancel_state->reset_cancel_after(1);
+ Block block = ColumnHelper::create_block<DataTypeInt64>({2, 3});
+ expect_cancelled(sink->sink(state.get(), &block, true));
+}
+
TEST_F(AnalyticSinkOperatorTest, withoutAggFunction) {
Initialize(10);
create_operator(false, 0, "", {}, nullptr);
@@ -817,4 +877,4 @@ TEST_F(AnalyticSinkOperatorTest, AggFunction8) {
std::cout << "######### AggFunction with row_number test end #########" <<
std::endl;
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/exec/operator/nested_loop_join_build_operator_test.cpp
b/be/test/exec/operator/nested_loop_join_build_operator_test.cpp
new file mode 100644
index 00000000000..34141e78aac
--- /dev/null
+++ b/be/test/exec/operator/nested_loop_join_build_operator_test.cpp
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/operator/nested_loop_join_build_operator.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gtest/gtest.h>
+
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "exec/runtime_filter/runtime_filter_test_utils.h"
+#include "runtime/descriptor_helper.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_profile.h"
+#include "testutil/mock/mock_operators.h"
+
+namespace doris {
+namespace {
+
+TDescriptorTable create_desc_table() {
+ TDescriptorTableBuilder builder;
+ TTupleDescriptorBuilder()
+ .add_slot(TSlotDescriptorBuilder()
+ .type(TYPE_INT)
+ .nullable(false)
+ .column_name("probe_col")
+ .column_pos(0)
+ .build())
+ .build(&builder);
+ TTupleDescriptorBuilder()
+ .add_slot(TSlotDescriptorBuilder()
+ .type(TYPE_INT)
+ .nullable(false)
+ .column_name("build_col")
+ .column_pos(0)
+ .build())
+ .build(&builder);
+ return builder.desc_tbl();
+}
+
+TExpr build_slot_ref_expr() {
+ auto expr = TRuntimeFilterDescBuilder::get_default_expr();
+ expr.nodes[0].slot_ref.__set_slot_id(1);
+ expr.nodes[0].slot_ref.__set_tuple_id(1);
+ return expr;
+}
+
+TPlanNode create_nested_loop_join_plan_node() {
+ TPlanNode node;
+ node.node_id = 0;
+ node.node_type = TPlanNodeType::CROSS_JOIN_NODE;
+ node.num_children = 2;
+ node.limit = -1;
+
+ TNestedLoopJoinNode join_node;
+ join_node.__set_join_op(TJoinOp::INNER_JOIN);
+ node.__set_nested_loop_join_node(join_node);
+ node.row_tuples.push_back(0);
+ node.row_tuples.push_back(1);
+ node.nullable_tuples.push_back(false);
+ node.nullable_tuples.push_back(false);
+
+ auto src_expr = build_slot_ref_expr();
+ TRuntimeFilterDescBuilder runtime_filter_builder(0, src_expr, 0);
+ node.__isset.runtime_filters = true;
+ node.runtime_filters.push_back(runtime_filter_builder.build());
+ return node;
+}
+
+} // namespace
+
+class NestedLoopJoinBuildOperatorTest : public RuntimeFilterTest {};
+
+TEST_F(NestedLoopJoinBuildOperatorTest,
CloseSkipsRuntimeFilterProcessWhenCancelled) {
+ ObjectPool pool;
+ DescriptorTbl* desc_tbl = nullptr;
+ auto desc_table = create_desc_table();
+ auto status = DescriptorTbl::create(&pool, desc_table, &desc_tbl);
+ ASSERT_TRUE(status.ok()) << status.to_string();
+
+ auto* state = _runtime_states[0].get();
+ state->set_desc_tbl(desc_tbl);
+
+ auto plan_node = create_nested_loop_join_plan_node();
+ auto sink_operator =
+ std::make_shared<NestedLoopJoinBuildSinkOperatorX>(&pool, 0, 1,
plan_node, *desc_tbl);
+
+ auto child = std::make_shared<MockSourceOperator>();
+ child->_row_descriptor = RowDescriptor(*desc_tbl, {1}, {false});
+ ASSERT_TRUE(sink_operator->set_child(child));
+
+ status = sink_operator->init(plan_node, state);
+ ASSERT_TRUE(status.ok()) << status.to_string();
+ status = sink_operator->prepare(state);
+ ASSERT_TRUE(status.ok()) << status.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ ASSERT_NE(shared_state, nullptr);
+
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile = &_profile,
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .shared_state_map = shared_state_map,
+ .tsink = TDataSink()};
+ status = sink_operator->setup_local_state(state, info);
+ ASSERT_TRUE(status.ok()) << status.to_string();
+
+ auto* local_state =
+
dynamic_cast<NestedLoopJoinBuildSinkLocalState*>(state->get_sink_local_state());
+ ASSERT_NE(local_state, nullptr);
+
+ status = local_state->open(state);
+ ASSERT_TRUE(status.ok()) << status.to_string();
+ local_state->build_blocks().emplace_back();
+
+ state->cancel(Status::Cancelled("nested loop join build close cancelled"));
+ status = local_state->close(state, Status::OK());
+ ASSERT_TRUE(status.ok()) << status.to_string();
+}
+
+} // namespace doris
diff --git
a/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp
b/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp
index 42cc320f183..15559910494 100644
--- a/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_aggregation_sink_operator_test.cpp
@@ -43,6 +43,32 @@ protected:
PartitionedAggregationTestHelper _helper;
};
+namespace {
+
+constexpr auto CANCEL_REASON = "partitioned aggregation sink cancelled";
+
+void cancel_state(RuntimeState* state) {
+ state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+ EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+ EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) <<
status.to_string();
+}
+
+} // namespace
+
+TEST_F(PartitionedAggregationSinkOperatorTest,
RevokeMemoryReturnsCancelAtEntry) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+ auto* local_state =
_helper.create_sink_local_state(_helper.runtime_state.get(),
+ sink_operator.get(),
shared_state);
+ ASSERT_NE(local_state, nullptr);
+
+ cancel_state(_helper.runtime_state.get());
+
expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
TEST_F(PartitionedAggregationSinkOperatorTest, Init) {
auto [source_operator, sink_operator] = _helper.create_operators();
ASSERT_TRUE(source_operator != nullptr);
@@ -709,4 +735,4 @@ TEST_F(PartitionedAggregationNullableKeySinkTest,
SinkEOSFlushNullKeyOnly) {
ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
}
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git
a/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp
b/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp
index 2b13ad97f4c..14b567d7e31 100644
--- a/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_aggregation_source_operator_test.cpp
@@ -46,6 +46,54 @@ protected:
PartitionedAggregationTestHelper _helper;
};
+namespace {
+
+constexpr auto CANCEL_REASON = "partitioned aggregation source cancelled";
+
+void cancel_state(RuntimeState* state) {
+ state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+ EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+ EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) <<
status.to_string();
+}
+
+} // namespace
+
+TEST_F(PartitionedAggregationSourceOperatorTest,
RecoverBlocksFromPartitionReturnsCancelAtEntry) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+ auto* local_state =
_helper.create_source_local_state(_helper.runtime_state.get(),
+
source_operator.get(), shared_state);
+ AggSpillPartitionInfo partition;
+
+ cancel_state(_helper.runtime_state.get());
+ expect_cancelled(
+
local_state->_recover_blocks_from_partition(_helper.runtime_state.get(),
partition));
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest,
RevokeMemoryReturnsCancelAtEntry) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+ auto* local_state =
_helper.create_source_local_state(_helper.runtime_state.get(),
+
source_operator.get(), shared_state);
+ ASSERT_NE(local_state, nullptr);
+
+ cancel_state(_helper.runtime_state.get());
+
expect_cancelled(source_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest,
FlushAndRepartitionReturnsCancelAtEntry) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ std::shared_ptr<MockPartitionedAggSharedState> shared_state;
+ auto* local_state =
_helper.create_source_local_state(_helper.runtime_state.get(),
+
source_operator.get(), shared_state);
+
+ cancel_state(_helper.runtime_state.get());
+
expect_cancelled(local_state->_flush_and_repartition(_helper.runtime_state.get()));
+}
+
TEST_F(PartitionedAggregationSourceOperatorTest, Init) {
auto [source_operator, sink_operator] = _helper.create_operators();
ASSERT_TRUE(source_operator != nullptr);
diff --git
a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
index 2ce5f1921b0..e099f0d3827 100644
--- a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -49,6 +49,17 @@ protected:
namespace {
+constexpr auto CANCEL_REASON = "partitioned hash join probe cancelled";
+
+void cancel_state(RuntimeState* state) {
+ state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+ EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+ EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) <<
status.to_string();
+}
+
SpillFileSPtr create_probe_test_spill_file(RuntimeState* state,
RuntimeProfile* profile,
int node_id, const std::string&
prefix,
const
std::vector<std::vector<int32_t>>& batches) {
@@ -124,6 +135,28 @@ Status
prepare_probe_local_state_for_repartition(PartitionedHashJoinProbeOperato
} // namespace
+TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverBuildAndProbeReturnCancelAtEntry) {
+ auto [probe_operator, sink_operator] = _helper.create_operators();
+ std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+ auto* local_state =
_helper.create_probe_local_state(_helper.runtime_state.get(),
+ probe_operator.get(),
shared_state);
+ JoinSpillPartitionInfo build_partition(nullptr, nullptr, 0);
+ JoinSpillPartitionInfo probe_partition(nullptr, nullptr, 0);
+
+ cancel_state(_helper.runtime_state.get());
+
expect_cancelled(local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(),
+
build_partition));
+
expect_cancelled(local_state->recover_probe_blocks_from_partition(_helper.runtime_state.get(),
+
probe_partition));
+}
+
+TEST_F(PartitionedHashJoinProbeOperatorTest, RevokeMemoryReturnsCancelAtEntry)
{
+ auto [probe_operator, sink_operator] = _helper.create_operators();
+
+ cancel_state(_helper.runtime_state.get());
+
expect_cancelled(probe_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
TEST_F(PartitionedHashJoinProbeOperatorTest, debug_string) {
auto [probe_operator, sink_operator] = _helper.create_operators();
diff --git a/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp
b/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp
index 92479589603..f28fc8cbddb 100644
--- a/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp
+++ b/be/test/exec/operator/partitioned_hash_join_sink_operator_test.cpp
@@ -60,6 +60,32 @@ protected:
PartitionedHashJoinTestHelper _helper;
};
+namespace {
+
+constexpr auto CANCEL_REASON = "partitioned hash join sink cancelled";
+
+void cancel_state(RuntimeState* state) {
+ state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+ EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+ EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) <<
status.to_string();
+}
+
+} // namespace
+
+TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemoryReturnsCancelAtEntry) {
+ auto [probe_operator, sink_operator] = _helper.create_operators();
+ std::shared_ptr<MockPartitionedHashJoinSharedState> shared_state;
+ auto* local_state =
_helper.create_sink_local_state(_helper.runtime_state.get(),
+ sink_operator.get(),
shared_state);
+ ASSERT_NE(local_state, nullptr);
+
+ cancel_state(_helper.runtime_state.get());
+
expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
TEST_F(PartitionedHashJoinSinkOperatorTest, Init) {
TPlanNode tnode = _helper.create_test_plan_node();
const DescriptorTbl& desc_tbl = _helper.runtime_state->desc_tbl();
diff --git a/be/test/exec/operator/spill_sort_sink_operator_test.cpp
b/be/test/exec/operator/spill_sort_sink_operator_test.cpp
index db47d9565a3..ead792a1e32 100644
--- a/be/test/exec/operator/spill_sort_sink_operator_test.cpp
+++ b/be/test/exec/operator/spill_sort_sink_operator_test.cpp
@@ -25,6 +25,7 @@
#include "core/block/block.h"
#include "core/data_type/data_type_number.h"
#include "exec/operator/spill_sort_test_helper.h"
+#include "exec/operator/spill_utils.h"
#include "exec/pipeline/dependency.h"
#include "exec/pipeline/pipeline_task.h"
#include "testutil/column_helper.h"
@@ -38,6 +39,84 @@ protected:
SpillSortTestHelper _helper;
};
+namespace {
+
+constexpr auto CANCEL_REASON = "spill sort sink cancelled";
+
+void cancel_state(RuntimeState* state) {
+ state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+ EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+ EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) <<
status.to_string();
+}
+
+} // namespace
+
+TEST_F(SpillSortSinkOperatorTest, ExecuteSpillSortReturnsCancelAtEntry) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ auto sink_local_state =
SpillSortSinkLocalState::create_unique(sink_operator.get(),
+
_helper.runtime_state.get());
+
+ cancel_state(_helper.runtime_state.get());
+
expect_cancelled(sink_local_state->_execute_spill_sort(_helper.runtime_state.get()));
+}
+
+TEST_F(SpillSortSinkOperatorTest, RevokeMemoryReturnsCancelAtEntry) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+
+ auto tnode = _helper.create_test_plan_node();
+ auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ ASSERT_TRUE(shared_state != nullptr);
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile = _helper.operator_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .shared_state_map = {},
+ .tsink = {}};
+
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ cancel_state(_helper.runtime_state.get());
+
expect_cancelled(sink_operator->revoke_memory(_helper.runtime_state.get()));
+}
+
+TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAtEntry) {
+ cancel_state(_helper.runtime_state.get());
+
+ bool executed = false;
+ expect_cancelled(run_spill_task(_helper.runtime_state.get(), [&]() {
+ executed = true;
+ return Status::OK();
+ }));
+ EXPECT_FALSE(executed);
+}
+
+TEST_F(SpillSortSinkOperatorTest, RunSpillTaskReturnsCancelAfterCallback) {
+ bool finalized = false;
+
+ auto status = run_spill_task(
+ _helper.runtime_state.get(),
+ [&]() {
+ cancel_state(_helper.runtime_state.get());
+ return Status::OK();
+ },
+ [&]() {
+ finalized = true;
+ return Status::OK();
+ });
+ expect_cancelled(status);
+ EXPECT_FALSE(finalized);
+}
+
TEST_F(SpillSortSinkOperatorTest, Basic) {
auto [source_operator, sink_operator] = _helper.create_operators();
ASSERT_TRUE(source_operator != nullptr);
diff --git a/be/test/exec/operator/spill_sort_source_operator_test.cpp
b/be/test/exec/operator/spill_sort_source_operator_test.cpp
index 9f458699fa9..e4eed08b42c 100644
--- a/be/test/exec/operator/spill_sort_source_operator_test.cpp
+++ b/be/test/exec/operator/spill_sort_source_operator_test.cpp
@@ -188,8 +188,28 @@ void delete_spill_files(const std::vector<SpillFileSPtr>&
spill_files) {
}
}
+constexpr auto CANCEL_REASON = "spill sort source cancelled";
+
+void cancel_state(RuntimeState* state) {
+ state->cancel(Status::Cancelled(CANCEL_REASON));
+}
+
+void expect_cancelled(const Status& status) {
+ EXPECT_TRUE(status.is<ErrorCode::CANCELLED>()) << status.to_string();
+ EXPECT_NE(status.to_string().find(CANCEL_REASON), std::string::npos) <<
status.to_string();
+}
+
} // namespace
+TEST_F(SpillSortSourceOperatorTest,
ExecuteMergeSortSpillFilesReturnsCancelAtEntry) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ auto local_state =
std::make_unique<SpillSortLocalState>(_helper.runtime_state.get(),
+
source_operator.get());
+
+ cancel_state(_helper.runtime_state.get());
+
expect_cancelled(local_state->execute_merge_sort_spill_files(_helper.runtime_state.get()));
+}
+
TEST_F(SpillSortSourceOperatorTest, Basic) {
auto [source_operator, sink_operator] = _helper.create_operators();
ASSERT_TRUE(source_operator != nullptr);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]