This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 083af74c255 [test](ut) add cases about partitioned aggregation
operators (#48925)
083af74c255 is described below
commit 083af74c255b12e90749574e6b76b7958bc7dcdd
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Mar 17 18:49:16 2025 +0800
[test](ut) add cases about partitioned aggregation operators (#48925)
---
be/src/pipeline/exec/aggregation_source_operator.h | 3 +-
.../exec/partitioned_aggregation_sink_operator.cpp | 7 +-
.../exec/partitioned_aggregation_source_operator.h | 3 +-
.../partitioned_aggregation_sink_operator_test.cpp | 467 ++++++++++++++++++++
...artitioned_aggregation_source_operator_test.cpp | 470 +++++++++++++++++++++
.../partitioned_aggregation_test_helper.cpp | 238 +++++++++++
.../operator/partitioned_aggregation_test_helper.h | 155 +++++++
.../partitioned_hash_join_probe_operator_test.cpp | 32 +-
.../partitioned_hash_join_sink_operator_test.cpp | 8 -
.../operator/partitioned_hash_join_test_helper.cpp | 76 ++--
.../operator/partitioned_hash_join_test_helper.h | 83 +---
.../operator/spillable_operator_test_helper.cpp | 76 ++++
.../operator/spillable_operator_test_helper.h | 132 ++++++
be/test/testutil/creators.h | 44 +-
14 files changed, 1606 insertions(+), 188 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h
b/be/src/pipeline/exec/aggregation_source_operator.h
index f18e9345b44..4d29cfb603d 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -18,6 +18,7 @@
#include <stdint.h>
+#include "common/be_mock_util.h"
#include "common/status.h"
#include "operator.h"
@@ -28,7 +29,7 @@ namespace pipeline {
#include "common/compile_check_begin.h"
class AggSourceOperatorX;
-class AggLocalState final : public PipelineXLocalState<AggSharedState> {
+class AggLocalState MOCK_REMOVE(final) : public
PipelineXLocalState<AggSharedState> {
public:
using Base = PipelineXLocalState<AggSharedState>;
ENABLE_FACTORY_CREATOR(AggLocalState);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index e003ea23240..521da54f21f 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -130,8 +130,6 @@ Status PartitionedAggSinkOperatorX::init(const TPlanNode&
tnode, RuntimeState* s
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::init(tnode,
state));
_name = "PARTITIONED_AGGREGATION_SINK_OPERATOR";
_spill_partition_count = state->spill_aggregation_partition_count();
- RETURN_IF_ERROR(
-
_agg_sink_operator->set_child(DataSinkOperatorX<PartitionedAggSinkLocalState>::_child));
return _agg_sink_operator->init(tnode, state);
}
@@ -255,10 +253,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(
update_profile<true>(sink_local_state->profile());
}
- // TODO: spill thread may set_ready before the task::execute thread put
the task to blocked state
- if (!_eos) {
- Base::_spill_dependency->Dependency::block();
- }
auto& parent = Base::_parent->template cast<Parent>();
Status status;
Defer defer {[&]() {
@@ -331,6 +325,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(
return status;
});
+ Base::_spill_dependency->Dependency::block();
return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
std::move(spill_runnable));
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 24e56df1be8..c7d4b21af56 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -30,7 +30,8 @@ namespace pipeline {
class PartitionedAggSourceOperatorX;
class PartitionedAggLocalState;
-class PartitionedAggLocalState final : public
PipelineXSpillLocalState<PartitionedAggSharedState> {
+class PartitionedAggLocalState MOCK_REMOVE(final)
+ : public PipelineXSpillLocalState<PartitionedAggSharedState> {
public:
ENABLE_FACTORY_CREATOR(PartitionedAggLocalState);
using Base = PipelineXSpillLocalState<PartitionedAggSharedState>;
diff --git
a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
new file mode 100644
index 00000000000..930eba76fd6
--- /dev/null
+++ b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
@@ -0,0 +1,467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/partitioned_aggregation_sink_operator.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "common/config.h"
+#include "partitioned_aggregation_test_helper.h"
+#include "pipeline/exec/aggregation_sink_operator.h"
+#include "pipeline/exec/partitioned_hash_join_probe_operator.h"
+#include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/fragment_mgr.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::pipeline {
+class PartitionedAggregationSinkOperatorTest : public testing::Test {
+protected:
+ void SetUp() override { _helper.SetUp(); }
+ void TearDown() override { _helper.TearDown(); }
+ PartitionedAggregationTestHelper _helper;
+};
+
+TEST_F(PartitionedAggregationSinkOperatorTest, Init) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ std::shared_ptr<MockPartitionedAggSharedState> shared_state =
+ MockPartitionedAggSharedState::create_shared();
+
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile = _helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = _helper.runtime_state->get_sink_local_state();
+ ASSERT_TRUE(local_state != nullptr);
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ st = sink_operator->close(_helper.runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+ st = local_state->close(_helper.runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, Sink) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ auto* dep =
shared_state->create_source_dependency(source_operator->operator_id(),
+
source_operator->node_id(),
+
"PartitionedAggSinkTestDep");
+
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile = _helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = _helper.runtime_state->get_sink_local_state();
+ ASSERT_TRUE(local_state != nullptr);
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ auto block =
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+ ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(),
false), 0);
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(),
true), 0);
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+ ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr);
+
+ st = sink_operator->close(_helper.runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithEmptyEOS) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ auto* dep =
shared_state->create_source_dependency(source_operator->operator_id(),
+
source_operator->node_id(),
+
"PartitionedAggSinkTestDep");
+
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile = _helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = _helper.runtime_state->get_sink_local_state();
+ ASSERT_TRUE(local_state != nullptr);
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ auto block =
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+ ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(),
false), 0);
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(),
true), 0);
+ block.clear_column_data();
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+ ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr);
+
+ st = sink_operator->close(_helper.runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpill) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ auto* dep =
shared_state->create_source_dependency(source_operator->operator_id(),
+
source_operator->node_id(),
+
"PartitionedAggSinkTestDep");
+
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile = _helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+ _helper.runtime_state->get_sink_local_state());
+ ASSERT_TRUE(local_state != nullptr);
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ auto block =
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()),
0);
+
+ auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+ local_state->_runtime_state->get_sink_local_state());
+ ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+
+ st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+ ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+ while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr);
+
+ st = sink_operator->close(_helper.runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpillAndEmptyEOS) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ auto* dep =
shared_state->create_source_dependency(source_operator->operator_id(),
+
source_operator->node_id(),
+
"PartitionedAggSinkTestDep");
+
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile = _helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+ _helper.runtime_state->get_sink_local_state());
+ ASSERT_TRUE(local_state != nullptr);
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ auto block =
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()),
0);
+
+ auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+ local_state->_runtime_state->get_sink_local_state());
+ ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+
+ st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+ ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+ while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+ block.clear_column_data();
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+ ASSERT_TRUE(local_state->_spill_dependency->is_blocked_by(nullptr) ==
nullptr);
+ ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr);
+
+ st = sink_operator->close(_helper.runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpillLargeData) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ auto* dep =
shared_state->create_source_dependency(source_operator->operator_id(),
+
source_operator->node_id(),
+
"PartitionedAggSinkTestDep");
+
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile = _helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+ _helper.runtime_state->get_sink_local_state());
+ ASSERT_TRUE(local_state != nullptr);
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ auto block =
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()),
0);
+
+ auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+ local_state->_runtime_state->get_sink_local_state());
+ ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+
+ st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+ ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+ while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ auto* spill_write_rows_counter =
local_state->profile()->get_counter("SpillWriteRows");
+ ASSERT_TRUE(spill_write_rows_counter != nullptr);
+ ASSERT_EQ(spill_write_rows_counter->value(), 4);
+
+ ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+ const size_t count = 1048576;
+ std::vector<int32_t> data(count);
+ std::iota(data.begin(), data.end(), 0);
+ block =
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(data);
+
+ block.insert(
+
vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(data));
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ block.clear_column_data();
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+ ASSERT_TRUE(local_state->_spill_dependency->is_blocked_by(nullptr) ==
nullptr);
+ ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr);
+
+ st = sink_operator->close(_helper.runtime_state.get(), st);
+ ASSERT_EQ(spill_write_rows_counter->value(), 1048576 + 4);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpilError) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile = _helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+ _helper.runtime_state->get_sink_local_state());
+ ASSERT_TRUE(local_state != nullptr);
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ auto block =
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()),
0);
+
+ auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+ local_state->_runtime_state->get_sink_local_state());
+ ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+
+ SpillableDebugPointHelper
dp_helper("fault_inject::spill_stream::spill_block");
+ st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+ ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+ while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ std::cout << "profile: " << _helper.runtime_profile->pretty_print() <<
std::endl;
+
+ ASSERT_FALSE(dp_helper.get_spill_status().ok()) << "spilll status should
be failed";
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git
a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
new file mode 100644
index 00000000000..a23c7237979
--- /dev/null
+++ b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
@@ -0,0 +1,470 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/partitioned_aggregation_source_operator.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "common/config.h"
+#include "partitioned_aggregation_test_helper.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/aggregation_sink_operator.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/exec/partitioned_aggregation_sink_operator.h"
+#include "pipeline/exec/partitioned_hash_join_probe_operator.h"
+#include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/fragment_mgr.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris::pipeline {
+class PartitionedAggregationSourceOperatorTest : public testing::Test {
+protected:
+ void SetUp() override { _helper.SetUp(); }
+ void TearDown() override { _helper.TearDown(); }
+
+ PartitionedAggregationTestHelper _helper;
+};
+
+TEST_F(PartitionedAggregationSourceOperatorTest, Init) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = source_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = source_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ std::shared_ptr<MockPartitionedAggSharedState> shared_state =
+ MockPartitionedAggSharedState::create_shared();
+
+ shared_state->in_mem_shared_state_sptr =
std::make_shared<AggSharedState>();
+ shared_state->in_mem_shared_state =
+
reinterpret_cast<AggSharedState*>(shared_state->in_mem_shared_state_sptr.get());
+
+ LocalStateInfo info {
+ .parent_profile = _helper.runtime_profile.get(),
+ .scan_ranges = {},
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .task_idx = 0,
+ };
+ st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state =
_helper.runtime_state->get_local_state(source_operator->operator_id());
+ ASSERT_TRUE(local_state != nullptr);
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ st = source_operator->close(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockEmpty) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = source_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = source_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ shared_state->create_source_dependency(source_operator->operator_id(),
+ source_operator->node_id(),
"PartitionedAggSinkTestDep");
+
+ LocalSinkStateInfo sink_info {.task_idx = 0,
+ .parent_profile =
_helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(),
sink_info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* sink_local_state = _helper.runtime_state->get_sink_local_state();
+ ASSERT_TRUE(sink_local_state != nullptr);
+
+ st = sink_local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ LocalStateInfo info {
+ .parent_profile = _helper.runtime_profile.get(),
+ .scan_ranges = {},
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .task_idx = 0,
+ };
+ st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = reinterpret_cast<PartitionedAggLocalState*>(
+
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+ ASSERT_TRUE(local_state != nullptr);
+
+ local_state->_copy_shared_spill_profile = false;
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ vectorized::Block block;
+ bool eos = false;
+ st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos);
+ ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+
+ ASSERT_EQ(block.rows(), 0);
+ ASSERT_TRUE(eos);
+
+ st = source_operator->close(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, GetBlock) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = source_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = source_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ shared_state->create_source_dependency(source_operator->operator_id(),
+ source_operator->node_id(),
"PartitionedAggSinkTestDep");
+
+ LocalSinkStateInfo sink_info {.task_idx = 0,
+ .parent_profile =
_helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(),
sink_info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+ _helper.runtime_state->get_sink_local_state());
+ ASSERT_TRUE(sink_local_state != nullptr);
+
+ st = sink_local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ auto block =
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()),
0);
+
+ auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+ sink_local_state->_runtime_state->get_sink_local_state());
+ ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+
+ LocalStateInfo info {
+ .parent_profile = _helper.runtime_profile.get(),
+ .scan_ranges = {},
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .task_idx = 0,
+ };
+ st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = reinterpret_cast<PartitionedAggLocalState*>(
+
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+ ASSERT_TRUE(local_state != nullptr);
+
+ local_state->_copy_shared_spill_profile = false;
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ block.clear();
+ bool eos = false;
+ st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos);
+ ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+
+ ASSERT_TRUE(eos);
+ DCHECK_EQ(block.rows(), 4);
+ ASSERT_EQ(block.rows(), 4);
+
+ st = source_operator->close(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpill) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = source_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = source_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>(
+ sink_operator->create_shared_state());
+ shared_state->create_source_dependency(source_operator->operator_id(),
+ source_operator->node_id(),
"PartitionedAggSinkTestDep");
+
+ LocalSinkStateInfo sink_info {.task_idx = 0,
+ .parent_profile =
_helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(),
sink_info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+ _helper.runtime_state->get_sink_local_state());
+ ASSERT_TRUE(sink_local_state != nullptr);
+
+ st = sink_local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ auto block =
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+ ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+ while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) !=
nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ ASSERT_TRUE(shared_state->is_spilled);
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) !=
nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()),
0);
+
+ auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+ sink_local_state->_runtime_state->get_sink_local_state());
+ ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+ LocalStateInfo info {
+ .parent_profile = _helper.runtime_profile.get(),
+ .scan_ranges = {},
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .task_idx = 0,
+ };
+ st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = reinterpret_cast<PartitionedAggLocalState*>(
+
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+ ASSERT_TRUE(local_state != nullptr);
+
+ local_state->_copy_shared_spill_profile = false;
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ block.clear();
+ bool eos = false;
+ size_t rows = 0;
+ while (!eos) {
+ st = source_operator->get_block(_helper.runtime_state.get(), &block,
&eos);
+ ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+
+ while (local_state->_spill_dependency->is_blocked_by(nullptr) !=
nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ rows += block.rows();
+ block.clear_column_data();
+ }
+
+ ASSERT_TRUE(eos);
+ ASSERT_EQ(rows, 4);
+
+ st = source_operator->close(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpillError) {
+ auto [source_operator, sink_operator] = _helper.create_operators();
+ ASSERT_TRUE(source_operator != nullptr);
+ ASSERT_TRUE(sink_operator != nullptr);
+
+ const auto tnode = _helper.create_test_plan_node();
+ auto st = source_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = source_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>(
+ sink_operator->create_shared_state());
+ shared_state->create_source_dependency(source_operator->operator_id(),
+ source_operator->node_id(),
"PartitionedAggSinkTestDep");
+
+ LocalSinkStateInfo sink_info {.task_idx = 0,
+ .parent_profile =
_helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(),
sink_info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+ _helper.runtime_state->get_sink_local_state());
+ ASSERT_TRUE(sink_local_state != nullptr);
+
+ st = sink_local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ auto block =
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+ {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+ ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+ while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) !=
nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ ASSERT_TRUE(shared_state->is_spilled);
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) !=
nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()),
0);
+
+ auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+ sink_local_state->_runtime_state->get_sink_local_state());
+ ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+ LocalStateInfo info {
+ .parent_profile = _helper.runtime_profile.get(),
+ .scan_ranges = {},
+ .shared_state = shared_state.get(),
+ .le_state_map = {},
+ .task_idx = 0,
+ };
+ st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = reinterpret_cast<PartitionedAggLocalState*>(
+
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+ ASSERT_TRUE(local_state != nullptr);
+
+ local_state->_copy_shared_spill_profile = false;
+
+ st = local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ SpillableDebugPointHelper
dp_helper("fault_inject::spill_stream::read_next_block");
+
+ block.clear();
+ bool eos = false;
+ while (!eos && dp_helper.get_spill_status().ok()) {
+ st = source_operator->get_block(_helper.runtime_state.get(), &block,
&eos);
+ ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+
+ while (local_state->_spill_dependency->is_blocked_by(nullptr) !=
nullptr) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ block.clear_column_data();
+ }
+
+ ASSERT_FALSE(dp_helper.get_spill_status().ok());
+}
+} // namespace doris::pipeline
diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
new file mode 100644
index 00000000000..0d2a653c6c3
--- /dev/null
+++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "partitioned_aggregation_test_helper.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "runtime/define_primitive_type.h"
+#include "testutil/creators.h"
+#include "testutil/mock/mock_operators.h"
+
+namespace doris::pipeline {
+TPlanNode PartitionedAggregationTestHelper::create_test_plan_node() {
+ TPlanNode tnode;
+ tnode.node_id = 0;
+ tnode.node_type = TPlanNodeType::AGGREGATION_NODE;
+ tnode.num_children = 1;
+ tnode.agg_node.use_streaming_preaggregation = false;
+ tnode.agg_node.need_finalize = false;
+ tnode.agg_node.intermediate_tuple_id = 1;
+ tnode.agg_node.output_tuple_id = 2;
+ tnode.limit = -1;
+
+ auto& grouping_expr = tnode.agg_node.grouping_exprs.emplace_back();
+ auto& expr_node = grouping_expr.nodes.emplace_back();
+ expr_node.node_type = TExprNodeType::SLOT_REF;
+
+ TTypeNode type_node;
+ type_node.type = TTypeNodeType::SCALAR;
+ type_node.scalar_type.type = TPrimitiveType::INT;
+ type_node.__isset.scalar_type = true;
+
+ expr_node.type.types.emplace_back(type_node);
+ expr_node.__set_is_nullable(false);
+ expr_node.num_children = 0;
+ expr_node.slot_ref.slot_id = 0;
+ expr_node.slot_ref.tuple_id = 0;
+
+ auto& agg_function = tnode.agg_node.aggregate_functions.emplace_back();
+ auto& fn_node = agg_function.nodes.emplace_back();
+ fn_node.node_type = TExprNodeType::FUNCTION_CALL;
+ fn_node.__set_is_nullable(false);
+ fn_node.num_children = 1;
+
+ TFunctionName fn_name;
+ fn_name.function_name = "sum";
+
+ fn_node.fn.__set_name(fn_name);
+
+ TTypeDesc ret_type;
+ auto& ret_type_node = ret_type.types.emplace_back();
+ ret_type_node.scalar_type.type = TPrimitiveType::BIGINT;
+ ret_type_node.__isset.scalar_type = true;
+ ret_type_node.type = TTypeNodeType::SCALAR;
+ ret_type.__set_is_nullable(false);
+
+ TTypeDesc arg_type;
+ auto& arg_type_node = arg_type.types.emplace_back();
+ arg_type_node.scalar_type.type = TPrimitiveType::INT;
+ arg_type_node.__isset.scalar_type = true;
+ arg_type_node.type = TTypeNodeType::SCALAR;
+
+ fn_node.fn.__set_ret_type(ret_type);
+
+ fn_node.fn.__set_arg_types({arg_type});
+ fn_node.agg_expr.__set_param_types({arg_type});
+
+ auto& fn_child_node = agg_function.nodes.emplace_back();
+ fn_child_node.node_type = TExprNodeType::SLOT_REF;
+ fn_child_node.__set_is_nullable(false);
+ fn_child_node.num_children = 0;
+ fn_child_node.slot_ref.slot_id = 1;
+ fn_child_node.slot_ref.tuple_id = 0;
+ fn_child_node.type.types.emplace_back(type_node);
+
+ tnode.row_tuples.push_back(0);
+ tnode.nullable_tuples.push_back(false);
+
+ return tnode;
+}
+
+TDescriptorTable
PartitionedAggregationTestHelper::create_test_table_descriptor(
+ bool nullable = false) {
+ TTupleDescriptorBuilder tuple_builder;
+ tuple_builder
+ .add_slot(TSlotDescriptorBuilder()
+ .type(PrimitiveType::TYPE_INT)
+ .column_name("col1")
+ .column_pos(0)
+ .nullable(nullable)
+ .build())
+ .add_slot(TSlotDescriptorBuilder()
+ .type(PrimitiveType::TYPE_INT)
+ .column_name("col2")
+ .column_pos(0)
+ .nullable(nullable)
+ .build());
+
+ TDescriptorTableBuilder builder;
+
+ tuple_builder.build(&builder);
+
+ TTupleDescriptorBuilder()
+ .add_slot(TSlotDescriptorBuilder()
+ .type(TYPE_INT)
+ .column_name("col3")
+ .column_pos(0)
+ .nullable(nullable)
+ .build())
+ .add_slot(TSlotDescriptorBuilder()
+ .type(TYPE_BIGINT)
+ .column_name("col4")
+ .column_pos(0)
+ .nullable(true)
+ .build())
+ .build(&builder);
+
+ TTupleDescriptorBuilder()
+ .add_slot(TSlotDescriptorBuilder()
+ .type(TYPE_INT)
+ .column_name("col5")
+ .column_pos(0)
+ .nullable(nullable)
+ .build())
+ .add_slot(TSlotDescriptorBuilder()
+ .type(TYPE_BIGINT)
+ .column_name("col6")
+ .column_pos(0)
+ .nullable(true)
+ .build())
+ .build(&builder);
+
+ return builder.desc_tbl();
+}
+
+std::tuple<std::shared_ptr<PartitionedAggSourceOperatorX>,
+ std::shared_ptr<PartitionedAggSinkOperatorX>>
+PartitionedAggregationTestHelper::create_operators() {
+ TPlanNode tnode = create_test_plan_node();
+ auto desc_tbl = runtime_state->desc_tbl();
+
+ EXPECT_EQ(desc_tbl.get_tuple_descs().size(), 3);
+
+ auto source_operator =
+ std::make_shared<PartitionedAggSourceOperatorX>(obj_pool.get(),
tnode, 0, desc_tbl);
+ auto sink_operator =
std::make_shared<PartitionedAggSinkOperatorX>(obj_pool.get(), 0, 0, tnode,
+
desc_tbl, false);
+
+ auto child_operator = std::make_shared<MockChildOperator>();
+ auto probe_side_source_operator = std::make_shared<MockChildOperator>();
+ auto source_side_sink_operator = std::make_shared<MockSinkOperator>();
+ auto [source_pipeline, _] = generate_agg_pipeline(source_operator,
source_side_sink_operator,
+ sink_operator,
child_operator);
+
+ RowDescriptor row_desc(runtime_state->desc_tbl(), {0}, {false});
+ child_operator->_row_descriptor = row_desc;
+
+ EXPECT_TRUE(sink_operator->set_child(child_operator));
+
+ // Setup task and state
+ std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
+ le_state_map;
+ pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0,
runtime_state.get(), nullptr,
+ nullptr, le_state_map, 0);
+ runtime_state->set_task(pipeline_task.get());
+ return {std::move(source_operator), std::move(sink_operator)};
+}
+
+PartitionedAggLocalState*
PartitionedAggregationTestHelper::create_source_local_state(
+ RuntimeState* state, PartitionedAggSourceOperatorX* probe_operator,
+ std::shared_ptr<MockPartitionedAggSharedState>& shared_state) {
+ auto local_state_uptr =
std::make_unique<MockPartitionedAggLocalState>(state, probe_operator);
+ auto* local_state = local_state_uptr.get();
+ shared_state = std::make_shared<MockPartitionedAggSharedState>();
+ local_state->_shared_state = shared_state.get();
+ shared_state->is_spilled = true;
+
+ ADD_TIMER(local_state->profile(), "ExecTime");
+ local_state->profile()->AddHighWaterMarkCounter("MemoryUsage",
TUnit::BYTES, "", 0);
+ local_state->init_spill_read_counters();
+ local_state->init_spill_write_counters();
+ local_state->_copy_shared_spill_profile = false;
+ local_state->_internal_runtime_profile =
std::make_unique<RuntimeProfile>("inner_test");
+
+ local_state->_spill_dependency =
+ Dependency::create_shared(0, 0,
"PartitionedHashJoinProbeOperatorTestSpillDep", true);
+
+ state->emplace_local_state(probe_operator->operator_id(),
std::move(local_state_uptr));
+ return local_state;
+}
+
+PartitionedAggSinkLocalState*
PartitionedAggregationTestHelper::create_sink_local_state(
+ RuntimeState* state, PartitionedAggSinkOperatorX* sink_operator,
+ std::shared_ptr<MockPartitionedAggSharedState>& shared_state) {
+ auto local_state_uptr =
MockPartitionedAggSinkLocalState::create_unique(sink_operator, state);
+ auto* local_state = local_state_uptr.get();
+ shared_state = std::make_shared<MockPartitionedAggSharedState>();
+ local_state->init_spill_counters();
+
+ ADD_TIMER(local_state->profile(), "ExecTime");
+ local_state->profile()->AddHighWaterMarkCounter("MemoryUsage",
TUnit::BYTES, "", 0);
+ local_state->_internal_runtime_profile =
std::make_unique<RuntimeProfile>("inner_test");
+
+ local_state->_dependency = shared_state->create_sink_dependency(
+ sink_operator->dests_id().front(), sink_operator->operator_id(),
+ "PartitionedHashJoinTestDep");
+
+ local_state->_spill_dependency =
+ Dependency::create_shared(0, 0,
"PartitionedHashJoinSinkOperatorTestSpillDep", true);
+ shared_state->setup_shared_profile(local_state->profile());
+
+ state->emplace_sink_local_state(sink_operator->operator_id(),
std::move(local_state_uptr));
+ return local_state;
+}
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
b/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
new file mode 100644
index 00000000000..cc87236d6d4
--- /dev/null
+++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "common/config.h"
+#include "common/factory_creator.h"
+#include "common/object_pool.h"
+#include "pipeline/exec/aggregation_sink_operator.h"
+#include "pipeline/exec/aggregation_source_operator.h"
+#include "pipeline/exec/partitioned_aggregation_sink_operator.h"
+#include "pipeline/exec/partitioned_aggregation_source_operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/fragment_mgr.h"
+#include "spillable_operator_test_helper.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::pipeline {
+class MockAggSharedState : public AggSharedState {
+public:
+};
+
+class MockPartitionedAggSharedState : public PartitionedAggSharedState {
+ ENABLE_FACTORY_CREATOR(MockPartitionedAggSharedState);
+
+public:
+ MockPartitionedAggSharedState() { is_spilled = false; }
+};
+
+class MockPartitionedAggSinkLocalState : public PartitionedAggSinkLocalState {
+ ENABLE_FACTORY_CREATOR(MockPartitionedAggSinkLocalState)
+public:
+ MockPartitionedAggSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
+ : PartitionedAggSinkLocalState(parent, state) {
+ _runtime_profile = std::make_unique<RuntimeProfile>("test");
+ _profile = _runtime_profile.get();
+ _memory_used_counter =
+ _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES,
"", 1);
+ }
+
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
return Status::OK(); }
+ Status open(RuntimeState* state) override { return Status::OK(); }
+ Status close(RuntimeState* state, Status status) override { return
Status::OK(); }
+
+private:
+ std::unique_ptr<RuntimeProfile> _runtime_profile;
+};
+
+class MockPartitionedAggSinkOperatorX : public PartitionedAggSinkOperatorX {
+public:
+ MockPartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, int
dest_id,
+ const TPlanNode& tnode, const
DescriptorTbl& descs)
+ : PartitionedAggSinkOperatorX(pool, operator_id, dest_id, tnode,
descs, false) {}
+ ~MockPartitionedAggSinkOperatorX() override = default;
+
+ Status prepare(RuntimeState* state) override { return Status::OK(); }
+
+ Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override {
+ state->emplace_sink_local_state(
+ _operator_id,
MockPartitionedAggSinkLocalState::create_unique(this, state));
+ return Status::OK();
+ }
+
+ Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override {
+ return Status::OK();
+ }
+};
+
+class MockPartitionedAggLocalState : public PartitionedAggLocalState {
+ ENABLE_FACTORY_CREATOR(MockPartitionedAggLocalState);
+
+public:
+ MockPartitionedAggLocalState(RuntimeState* state, OperatorXBase* parent)
+ : PartitionedAggLocalState(state, parent) {
+ _runtime_profile = std::make_unique<RuntimeProfile>("test");
+ }
+
+ Status open(RuntimeState* state) override { return Status::OK(); }
+};
+
+class MockAggLocalState : public AggLocalState {
+ ENABLE_FACTORY_CREATOR(MockAggLocalState);
+
+public:
+ MockAggLocalState(RuntimeState* state, OperatorXBase* parent) :
AggLocalState(state, parent) {};
+};
+
+class MockAggSourceOperatorX : public AggSourceOperatorX {
+public:
+ MockAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
+ const DescriptorTbl& descs)
+ : AggSourceOperatorX(pool, tnode, operator_id, descs) {}
+ ~MockAggSourceOperatorX() override = default;
+
+ Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override {
+ state->emplace_local_state(_operator_id,
MockAggLocalState::create_unique(state, this));
+ return Status::OK();
+ }
+
+ bool need_more_input_data(RuntimeState* state) const override { return
need_more_data; }
+ bool need_more_data = true;
+
+ vectorized::Block block;
+ bool eos = false;
+};
+
+class MockAggSinkOperatorX : public AggSinkOperatorX {
+public:
+ MockAggSinkOperatorX() = default;
+ ~MockAggSinkOperatorX() override = default;
+};
+
+class PartitionedAggregationTestHelper : public SpillableOperatorTestHelper {
+public:
+ ~PartitionedAggregationTestHelper() override = default;
+ TPlanNode create_test_plan_node() override;
+ TDescriptorTable create_test_table_descriptor(bool nullable) override;
+
+ PartitionedAggLocalState* create_source_local_state(
+ RuntimeState* state, PartitionedAggSourceOperatorX*
source_operator,
+ std::shared_ptr<MockPartitionedAggSharedState>& shared_state);
+
+ PartitionedAggSinkLocalState* create_sink_local_state(
+ RuntimeState* state, PartitionedAggSinkOperatorX* sink_operator,
+ std::shared_ptr<MockPartitionedAggSharedState>& shared_state);
+
+ std::tuple<std::shared_ptr<PartitionedAggSourceOperatorX>,
+ std::shared_ptr<PartitionedAggSinkOperatorX>>
+ create_operators();
+};
+} // namespace doris::pipeline
\ No newline at end of file
diff --git
a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
index 67378fe66a8..9f2c3b48790 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -22,11 +22,9 @@
#include <memory>
#include "common/config.h"
-#include "olap/olap_define.h"
#include "partitioned_hash_join_test_helper.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/partitioned_hash_join_sink_operator.h"
-#include "pipeline/exec/spill_utils.h"
#include "pipeline/pipeline_task.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
@@ -34,7 +32,6 @@
#include "testutil/creators.h"
#include "testutil/mock/mock_operators.h"
#include "testutil/mock/mock_runtime_state.h"
-#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_number.h"
@@ -426,13 +423,7 @@ TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverProbeBlocksFromDiskError) {
ASSERT_TRUE(spilling_stream->spill_eof().ok());
}
- Status spill_status;
- ExecEnv::GetInstance()->_fragment_mgr =
- new MockFragmentManager(spill_status, ExecEnv::GetInstance());
-
- const auto enable_debug_points = config::enable_debug_points;
- config::enable_debug_points = true;
-
DebugPoints::instance()->add("fault_inject::spill_stream::read_next_block");
+ SpillableDebugPointHelper
dp_helper("fault_inject::spill_stream::read_next_block");
bool has_data = false;
ASSERT_TRUE(local_state
->recover_probe_blocks_from_disk(_helper.runtime_state.get(),
@@ -446,12 +437,10 @@ TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverProbeBlocksFromDiskError) {
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilling_stream);
spilling_stream.reset();
- config::enable_debug_points = enable_debug_points;
-
- ASSERT_FALSE(spill_status.ok());
- ASSERT_TRUE(spill_status.to_string().find("fault_inject spill_stream
read_next_block") !=
- std::string::npos)
- << "unexpected error: " << spill_status.to_string();
+ ASSERT_FALSE(dp_helper.get_spill_status().ok());
+ ASSERT_TRUE(dp_helper.get_spill_status().to_string().find(
+ "fault_inject spill_stream read_next_block") !=
std::string::npos)
+ << "unexpected error: " <<
dp_helper.get_spill_status().to_string();
}
TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDisk) {
@@ -784,14 +773,9 @@ TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverBuildBlocksFromDiskError) {
ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
- Status spill_status;
- ExecEnv::GetInstance()->_fragment_mgr =
- new MockFragmentManager(spill_status, ExecEnv::GetInstance());
-
- const auto enable_debug_points = config::enable_debug_points;
- config::enable_debug_points = true;
// Test error handling with fault injection
-
DebugPoints::instance()->add("fault_inject::partitioned_hash_join_probe::recover_build_blocks");
+ SpillableDebugPointHelper dp_helper(
+ "fault_inject::partitioned_hash_join_probe::recover_build_blocks");
bool has_data = false;
auto status =
local_state->recover_build_blocks_from_disk(_helper.runtime_state.get(),
test_partition,
has_data);
@@ -801,7 +785,7 @@ TEST_F(PartitionedHashJoinProbeOperatorTest,
RecoverBuildBlocksFromDiskError) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
- config::enable_debug_points = enable_debug_points;
+ auto spill_status = dp_helper.get_spill_status();
ASSERT_FALSE(spill_status.ok());
ASSERT_TRUE(spill_status.to_string().find("fault_inject
partitioned_hash_join_probe "
"recover_build_blocks failed")
!= std::string::npos)
diff --git
a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
index 0514d481d3a..29cb355b0a0 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
@@ -26,33 +26,25 @@
#include <gtest/gtest.h>
#include <memory>
-#include <sstream>
#include <vector>
#include "common/config.h"
-#include "common/exception.h"
#include "partitioned_hash_join_test_helper.h"
#include "pipeline/common/data_gen_functions/vnumbers_tvf.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/partitioned_hash_join_probe_operator.h"
#include "pipeline/pipeline_task.h"
-#include "runtime/define_primitive_type.h"
-#include "runtime/descriptor_helper.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "testutil/column_helper.h"
-#include "testutil/mock/mock_data_stream_sender.h"
-#include "testutil/mock/mock_descriptors.h"
#include "testutil/mock/mock_operators.h"
#include "testutil/mock/mock_runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
-#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_number.h"
#include "vec/exprs/vexpr_context.h"
-#include "vec/exprs/vexpr_fwd.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
index 03f9b3f8ae0..001bcd8e224 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
@@ -26,57 +26,19 @@
#include <gtest/gtest.h>
#include <memory>
-#include <sstream>
#include <vector>
-namespace doris::pipeline {
-void PartitionedHashJoinTestHelper::SetUp() {
- runtime_state = std::make_unique<MockRuntimeState>();
- obj_pool = std::make_unique<ObjectPool>();
-
- runtime_profile = std::make_shared<RuntimeProfile>("test");
-
- query_ctx = generate_one_query();
-
- runtime_state->_query_ctx = query_ctx.get();
- runtime_state->_query_id = query_ctx->query_id();
- runtime_state->resize_op_id_to_local_state(-100);
-
- ADD_TIMER(runtime_profile.get(), "ExecTime");
- runtime_profile->AddHighWaterMarkCounter("MemoryUsed", TUnit::BYTES, "",
0);
-
- auto desc_table = create_test_table_descriptor();
- auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
- DCHECK(!desc_table.slotDescriptors.empty());
- EXPECT_TRUE(st.ok()) << "create descriptor table failed: " <<
st.to_string();
- runtime_state->set_desc_tbl(desc_tbl);
-
- auto spill_data_dir =
std::make_unique<vectorized::SpillDataDir>("/tmp/partitioned_join_test",
- 1024L *
1024 * 4);
- st =
io::global_local_filesystem()->create_directory(spill_data_dir->path(), false);
- EXPECT_TRUE(st.ok()) << "create directory: " << spill_data_dir->path()
- << " failed: " << st.to_string();
- std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>>
data_map;
- data_map.emplace("test", std::move(spill_data_dir));
- auto* spill_stream_manager = new
vectorized::SpillStreamManager(std::move(data_map));
- ExecEnv::GetInstance()->_spill_stream_mgr = spill_stream_manager;
- st = spill_stream_manager->init();
- EXPECT_TRUE(st.ok()) << "init spill stream manager failed: " <<
st.to_string();
-}
-
-void PartitionedHashJoinTestHelper::TearDown() {
-
ExecEnv::GetInstance()->spill_stream_mgr()->async_cleanup_query(runtime_state->query_id());
-
doris::ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->wait();
- doris::ExecEnv::GetInstance()->spill_stream_mgr()->stop();
- SAFE_DELETE(ExecEnv::GetInstance()->_spill_stream_mgr);
-}
+#include "testutil/creators.h"
+#include "testutil/mock/mock_operators.h"
+namespace doris::pipeline {
TPlanNode PartitionedHashJoinTestHelper::create_test_plan_node() {
TPlanNode tnode;
tnode.node_id = 0;
tnode.node_type = TPlanNodeType::HASH_JOIN_NODE;
tnode.num_children = 2;
tnode.hash_join_node.join_op = TJoinOp::INNER_JOIN;
+ tnode.limit = -1;
TEqJoinCondition eq_cond;
eq_cond.left = TExpr();
@@ -118,6 +80,32 @@ TPlanNode
PartitionedHashJoinTestHelper::create_test_plan_node() {
return tnode;
}
+TDescriptorTable PartitionedHashJoinTestHelper::create_test_table_descriptor(
+ bool nullable = false) {
+ TTupleDescriptorBuilder tuple_builder;
+ tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .type(PrimitiveType::TYPE_INT)
+ .column_name("col1")
+ .column_pos(0)
+ .nullable(nullable)
+ .build());
+
+ TDescriptorTableBuilder builder;
+
+ tuple_builder.build(&builder);
+
+ TTupleDescriptorBuilder()
+ .add_slot(TSlotDescriptorBuilder()
+ .type(TYPE_INT)
+ .column_name("col2")
+ .column_pos(0)
+ .nullable(nullable)
+ .build())
+ .build(&builder);
+
+ return builder.desc_tbl();
+}
+
std::tuple<std::shared_ptr<PartitionedHashJoinProbeOperatorX>,
std::shared_ptr<PartitionedHashJoinSinkOperatorX>>
PartitionedHashJoinTestHelper::create_operators() {
@@ -134,8 +122,8 @@ PartitionedHashJoinTestHelper::create_operators() {
auto child_operator = std::make_shared<MockChildOperator>();
auto probe_side_source_operator = std::make_shared<MockChildOperator>();
auto probe_side_sink_operator = std::make_shared<MockSinkOperator>();
- auto [probe_pipeline, _] = generate_hash_join_pipeline(probe_operator,
child_operator,
-
probe_side_sink_operator, sink_operator);
+ auto [probe_pipeline, _] = generate_hash_join_pipeline(probe_operator,
probe_side_sink_operator,
+ sink_operator,
child_operator);
RowDescriptor row_desc(runtime_state->desc_tbl(), {1}, {false});
child_operator->_row_descriptor = row_desc;
diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h
b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h
index 95865aea21e..0628a81688d 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h
+++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h
@@ -24,75 +24,22 @@
#include <gtest/gtest.h>
#include <memory>
-#include <sstream>
#include <vector>
#include "common/config.h"
#include "common/object_pool.h"
-#include "olap/olap_define.h"
+#include "pipeline/exec/partitioned_hash_join_probe_operator.h"
#include "pipeline/exec/partitioned_hash_join_sink_operator.h"
-#include "pipeline/exec/spill_utils.h"
#include "pipeline/pipeline_task.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
-#include "testutil/column_helper.h"
-#include "testutil/creators.h"
-#include "testutil/mock/mock_operators.h"
+#include "spillable_operator_test_helper.h"
#include "testutil/mock/mock_runtime_state.h"
-#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
-#include "vec/data_types/data_type_number.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
-
-class MockPartitioner : public vectorized::PartitionerBase {
-public:
- MockPartitioner(size_t partition_count) : PartitionerBase(partition_count)
{}
- Status init(const std::vector<TExpr>& texprs) override { return
Status::OK(); }
-
- Status prepare(RuntimeState* state, const RowDescriptor& row_desc)
override {
- return Status::OK();
- }
-
- Status open(RuntimeState* state) override { return Status::OK(); }
-
- Status close(RuntimeState* state) override { return Status::OK(); }
-
- Status do_partitioning(RuntimeState* state, vectorized::Block* block, bool
eos,
- bool* already_sent) const override {
- if (already_sent) {
- *already_sent = false;
- }
- return Status::OK();
- }
-
- Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>&
partitioner) override {
- partitioner = std::make_unique<MockPartitioner>(_partition_count);
- return Status::OK();
- }
-
- vectorized::ChannelField get_channel_ids() const override { return {}; }
-};
-
-class MockExpr : public vectorized::VExpr {
-public:
- Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
- vectorized::VExprContext* context) override {
- return Status::OK();
- }
-
- Status open(RuntimeState* state, vectorized::VExprContext* context,
- FunctionContext::FunctionStateScope scope) override {
- return Status::OK();
- }
-};
-
-class MockHashJoinBuildSharedState : public HashJoinSharedState {
-public:
-};
-
class MockPartitionedHashJoinSharedState : public
PartitionedHashJoinSharedState {
public:
MockPartitionedHashJoinSharedState() {
@@ -164,16 +111,6 @@ public:
std::string get_memory_usage_debug_str(RuntimeState* state) const override
{ return "mock"; }
};
-class MockFragmentManager : public FragmentMgr {
-public:
- MockFragmentManager(Status& status_, ExecEnv* exec_env)
- : FragmentMgr(exec_env), status(status_) {}
- void cancel_query(const TUniqueId query_id, const Status reason) override
{ status = reason; }
-
-private:
- Status& status;
-};
-
class MockHashJoinProbeLocalState : public HashJoinProbeLocalState {
ENABLE_FACTORY_CREATOR(MockHashJoinProbeLocalState);
@@ -262,12 +199,12 @@ public:
void update_profile_from_inner() override {}
};
-class PartitionedHashJoinTestHelper {
+class PartitionedHashJoinTestHelper : public SpillableOperatorTestHelper {
public:
- void SetUp();
- void TearDown();
+ ~PartitionedHashJoinTestHelper() override = default;
+ TPlanNode create_test_plan_node() override;
- TPlanNode create_test_plan_node();
+ TDescriptorTable create_test_table_descriptor(bool nullable) override;
PartitionedHashJoinProbeLocalState* create_probe_local_state(
RuntimeState* state, PartitionedHashJoinProbeOperatorX*
probe_operator,
@@ -280,13 +217,5 @@ public:
std::tuple<std::shared_ptr<PartitionedHashJoinProbeOperatorX>,
std::shared_ptr<PartitionedHashJoinSinkOperatorX>>
create_operators();
-
- std::unique_ptr<MockRuntimeState> runtime_state;
- std::unique_ptr<ObjectPool> obj_pool;
- std::shared_ptr<QueryContext> query_ctx;
- std::shared_ptr<RuntimeProfile> runtime_profile;
- std::shared_ptr<PipelineTask> pipeline_task;
- DescriptorTbl* desc_tbl;
- static constexpr uint32_t TEST_PARTITION_COUNT = 8;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/spillable_operator_test_helper.cpp
b/be/test/pipeline/operator/spillable_operator_test_helper.cpp
new file mode 100644
index 00000000000..b3ebba37aa1
--- /dev/null
+++ b/be/test/pipeline/operator/spillable_operator_test_helper.cpp
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "spillable_operator_test_helper.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "testutil/creators.h"
+
+namespace doris::pipeline {
+void SpillableOperatorTestHelper::SetUp() {
+ runtime_state = std::make_unique<MockRuntimeState>();
+ obj_pool = std::make_unique<ObjectPool>();
+
+ runtime_profile = std::make_shared<RuntimeProfile>("test");
+
+ query_ctx = generate_one_query();
+
+ runtime_state->_query_ctx = query_ctx.get();
+ runtime_state->_query_id = query_ctx->query_id();
+ runtime_state->resize_op_id_to_local_state(-100);
+ runtime_state->set_max_operator_id(-100);
+
+ ADD_TIMER(runtime_profile.get(), "ExecTime");
+ runtime_profile->AddHighWaterMarkCounter("MemoryUsed", TUnit::BYTES, "",
0);
+
+ auto desc_table = create_test_table_descriptor(false);
+ auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
+ DCHECK(!desc_table.slotDescriptors.empty());
+ EXPECT_TRUE(st.ok()) << "create descriptor table failed: " <<
st.to_string();
+ runtime_state->set_desc_tbl(desc_tbl);
+
+ auto spill_data_dir =
std::make_unique<vectorized::SpillDataDir>("/tmp/partitioned_join_test",
+ 1024L *
1024 * 4);
+ st =
io::global_local_filesystem()->create_directory(spill_data_dir->path(), false);
+ EXPECT_TRUE(st.ok()) << "create directory: " << spill_data_dir->path()
+ << " failed: " << st.to_string();
+ std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>>
data_map;
+ data_map.emplace("test", std::move(spill_data_dir));
+ auto* spill_stream_manager = new
vectorized::SpillStreamManager(std::move(data_map));
+ ExecEnv::GetInstance()->_spill_stream_mgr = spill_stream_manager;
+ st = spill_stream_manager->init();
+ EXPECT_TRUE(st.ok()) << "init spill stream manager failed: " <<
st.to_string();
+}
+
+void SpillableOperatorTestHelper::TearDown() {
+
ExecEnv::GetInstance()->spill_stream_mgr()->async_cleanup_query(runtime_state->query_id());
+
doris::ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->wait();
+ doris::ExecEnv::GetInstance()->spill_stream_mgr()->stop();
+ SAFE_DELETE(ExecEnv::GetInstance()->_spill_stream_mgr);
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/spillable_operator_test_helper.h
b/be/test/pipeline/operator/spillable_operator_test_helper.h
new file mode 100644
index 00000000000..2067412ed3c
--- /dev/null
+++ b/be/test/pipeline/operator/spillable_operator_test_helper.h
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "common/object_pool.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/fragment_mgr.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::pipeline {
+
+class MockPartitioner : public vectorized::PartitionerBase {
+public:
+ MockPartitioner(size_t partition_count) : PartitionerBase(partition_count)
{}
+ Status init(const std::vector<TExpr>& texprs) override { return
Status::OK(); }
+
+ Status prepare(RuntimeState* state, const RowDescriptor& row_desc)
override {
+ return Status::OK();
+ }
+
+ Status open(RuntimeState* state) override { return Status::OK(); }
+
+ Status close(RuntimeState* state) override { return Status::OK(); }
+
+ Status do_partitioning(RuntimeState* state, vectorized::Block* block, bool
eos,
+ bool* already_sent) const override {
+ if (already_sent) {
+ *already_sent = false;
+ }
+ return Status::OK();
+ }
+
+ Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>&
partitioner) override {
+ partitioner = std::make_unique<MockPartitioner>(_partition_count);
+ return Status::OK();
+ }
+
+ vectorized::ChannelField get_channel_ids() const override { return {}; }
+};
+
+class MockExpr : public vectorized::VExpr {
+public:
+ Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
+ vectorized::VExprContext* context) override {
+ return Status::OK();
+ }
+
+ Status open(RuntimeState* state, vectorized::VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override {
+ return Status::OK();
+ }
+};
+
+class MockFragmentManager : public FragmentMgr {
+public:
+ MockFragmentManager(Status& status_, ExecEnv* exec_env)
+ : FragmentMgr(exec_env), status(status_) {}
+ void cancel_query(const TUniqueId query_id, const Status reason) override
{ status = reason; }
+
+private:
+ Status& status;
+};
+
+class SpillableDebugPointHelper {
+public:
+ SpillableDebugPointHelper(const std::string name)
+ : _enable_debug_points(config::enable_debug_points),
+ _fragment_mgr(ExecEnv::GetInstance()->_fragment_mgr) {
+ config::enable_debug_points = true;
+ ExecEnv::GetInstance()->_fragment_mgr =
+ new MockFragmentManager(_spill_status, ExecEnv::GetInstance());
+ DebugPoints::instance()->add(name);
+ }
+
+ ~SpillableDebugPointHelper() {
+ config::enable_debug_points = _enable_debug_points;
+ ExecEnv::GetInstance()->_fragment_mgr->stop();
+ SAFE_DELETE(ExecEnv::GetInstance()->_fragment_mgr);
+ ExecEnv::GetInstance()->_fragment_mgr = _fragment_mgr;
+ }
+
+ const Status& get_spill_status() const { return _spill_status; }
+
+private:
+ Status _spill_status;
+ const bool _enable_debug_points;
+ FragmentMgr* const _fragment_mgr;
+};
+
+class SpillableOperatorTestHelper {
+public:
+ virtual ~SpillableOperatorTestHelper() = default;
+ void SetUp();
+ void TearDown();
+
+ virtual TPlanNode create_test_plan_node() = 0;
+ virtual TDescriptorTable create_test_table_descriptor(bool nullable) = 0;
+
+ std::unique_ptr<MockRuntimeState> runtime_state;
+ std::unique_ptr<ObjectPool> obj_pool;
+ std::shared_ptr<QueryContext> query_ctx;
+ std::shared_ptr<RuntimeProfile> runtime_profile;
+ std::shared_ptr<PipelineTask> pipeline_task;
+ DescriptorTbl* desc_tbl;
+ static constexpr uint32_t TEST_PARTITION_COUNT = 8;
+};
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/testutil/creators.h b/be/test/testutil/creators.h
index 91064ade29e..db0ee465b60 100644
--- a/be/test/testutil/creators.h
+++ b/be/test/testutil/creators.h
@@ -55,35 +55,10 @@ inline std::shared_ptr<QueryContext> generate_one_query() {
return generate_one_query(query_options);
}
-inline TDescriptorTable create_test_table_descriptor(bool nullable = false) {
- TTupleDescriptorBuilder tuple_builder;
- tuple_builder.add_slot(TSlotDescriptorBuilder()
- .type(PrimitiveType::TYPE_INT)
- .column_name("col1")
- .column_pos(0)
- .nullable(nullable)
- .build());
-
- TDescriptorTableBuilder builder;
-
- tuple_builder.build(&builder);
-
- TTupleDescriptorBuilder()
- .add_slot(TSlotDescriptorBuilder()
- .type(TYPE_INT)
- .column_name("col2")
- .column_pos(0)
- .nullable(nullable)
- .build())
- .build(&builder);
-
- return builder.desc_tbl();
-}
-
inline std::pair<pipeline::PipelinePtr, pipeline::PipelinePtr>
generate_hash_join_pipeline(
std::shared_ptr<OperatorXBase> probe_operator,
- std::shared_ptr<OperatorXBase> build_side_source,
- pipeline::DataSinkOperatorPtr probe_side_sink_operator,
DataSinkOperatorPtr sink_operator) {
+ pipeline::DataSinkOperatorPtr probe_side_sink_operator,
DataSinkOperatorPtr sink_operator,
+ std::shared_ptr<OperatorXBase> build_side_source) {
auto probe_pipeline = std::make_shared<pipeline::Pipeline>(0, 1, 1);
auto build_pipeline = std::make_shared<pipeline::Pipeline>(1, 1, 1);
@@ -95,6 +70,21 @@ inline std::pair<pipeline::PipelinePtr,
pipeline::PipelinePtr> generate_hash_joi
return {probe_pipeline, build_pipeline};
}
+inline std::pair<pipeline::PipelinePtr, pipeline::PipelinePtr>
generate_agg_pipeline(
+ std::shared_ptr<OperatorXBase> source_operator,
+ pipeline::DataSinkOperatorPtr source_side_sink_operator,
DataSinkOperatorPtr sink_operator,
+ std::shared_ptr<OperatorXBase> sink_side_source) {
+ auto source_pipeline = std::make_shared<pipeline::Pipeline>(0, 1, 1);
+ auto sink_pipeline = std::make_shared<pipeline::Pipeline>(1, 1, 1);
+
+ static_cast<void>(source_pipeline->add_operator(source_operator, 1));
+ static_cast<void>(source_pipeline->set_sink(source_side_sink_operator));
+ static_cast<void>(sink_pipeline->add_operator(sink_side_source, 1));
+ static_cast<void>(sink_pipeline->set_sink(sink_operator));
+
+ return {source_pipeline, sink_pipeline};
+}
+
inline std::unique_ptr<SpillPartitionerType> create_spill_partitioner(
RuntimeState* state, const int32_t partition_count, const
std::vector<TExpr>& exprs,
const RowDescriptor& row_desc) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]