This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b836518488b [fix](group commit)Fix wal manager ut (#48789)
b836518488b is described below
commit b836518488bc748118cc454eaf3655a66b6a4ad4
Author: huanghaibin <[email protected]>
AuthorDate: Mon Mar 10 15:13:58 2025 +0800
[fix](group commit)Fix wal manager ut (#48789)
### What problem does this PR solve?
1. remove invalid ut WalManagerTest.recovery_normal, this is a fake ut
because it doesn't recover wal really, and now we have regression test
to do real recovery
2. merge two ut file into one
3. fix heap use after free problem
---
be/test/olap/wal/wal_manager_test.cpp | 455 +++++++++++++++++++++++++--------
be/test/vec/exec/vwal_scanner_test.cpp | 380 ---------------------------
2 files changed, 343 insertions(+), 492 deletions(-)
diff --git a/be/test/olap/wal/wal_manager_test.cpp
b/be/test/olap/wal/wal_manager_test.cpp
index 5a6ce49067b..459a61d4b76 100644
--- a/be/test/olap/wal/wal_manager_test.cpp
+++ b/be/test/olap/wal/wal_manager_test.cpp
@@ -18,150 +18,379 @@
#include <gtest/gtest.h>
-#include <cstddef>
-#include <filesystem>
-#include <map>
#include <string>
#include <vector>
-#include "common/config.h"
-#include "gen_cpp/HeartbeatService_types.h"
-#include "gen_cpp/internal_service.pb.h"
+#include "common/object_pool.h"
+#include "cpp/sync_point.h"
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PlanNodes_types.h"
#include "io/fs/local_file_system.h"
-#include "olap/options.h"
-#include "runtime/decimalv2_value.h"
-#include "runtime/exec_env.h"
-#include "runtime/result_queue_mgr.h"
+#include "pipeline/exec/file_scan_operator.h"
+#include "runtime/cluster_info.h"
+#include "runtime/descriptors.h"
+#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
-#include "runtime/stream_load/new_load_stream_mgr.h"
-#include "runtime/types.h"
-#include "service/brpc.h"
-#include "util/brpc_client_cache.h"
-#include "util/cpu_info.h"
-#include "util/debug/leakcheck_disabler.h"
-#include "util/proto_util.h"
+#include "runtime/user_function_cache.h"
+#include "vec/exec/scan/file_scanner.h"
namespace doris {
-extern TLoadTxnBeginResult k_stream_load_begin_result;
+namespace vectorized {
-ExecEnv* _env = nullptr;
-std::filesystem::path wal_dir = std::filesystem::current_path().string() +
"/wal_test";
+class TestSplitSourceConnector : public SplitSourceConnector {
+private:
+ std::mutex _range_lock;
+ TFileScanRange _scan_range;
+ int _range_index = 0;
+
+public:
+ TestSplitSourceConnector(const TFileScanRange& scan_range) :
_scan_range(scan_range) {}
+
+ Status get_next(bool* has_next, TFileRangeDesc* range) override {
+ std::lock_guard<std::mutex> l(_range_lock);
+ if (_range_index < _scan_range.ranges.size()) {
+ *has_next = true;
+ *range = _scan_range.ranges[_range_index++];
+ } else {
+ *has_next = false;
+ }
+ return Status::OK();
+ }
+
+ int num_scan_ranges() override { return _scan_range.ranges.size(); }
+
+ TFileScanRangeParams* get_params() override { return &_scan_range.params; }
+};
class WalManagerTest : public testing::Test {
public:
- WalManagerTest() {}
- virtual ~WalManagerTest() {}
- void SetUp() override {
- prepare();
- _env = ExecEnv::GetInstance();
- _env->_cluster_info = new ClusterInfo();
- _env->_cluster_info->master_fe_addr.hostname = "host name";
- _env->_cluster_info->master_fe_addr.port = 1234;
- _env->_cluster_info->backend_id = 1001;
- _env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
- _env->_internal_client_cache = new
BrpcClientCache<PBackendService_Stub>();
- _env->_function_client_cache = new
BrpcClientCache<PFunctionService_Stub>();
- _env->_stream_load_executor = StreamLoadExecutor::create_unique(_env);
- _env->_store_paths = {StorePath(std::filesystem::current_path(), 0)};
- _env->set_wal_mgr(WalManager::create_unique(_env, wal_dir.string()));
- k_stream_load_begin_result = TLoadTxnBeginResult();
+ WalManagerTest() : _runtime_state(TQueryGlobals()),
_global_profile("<global profile>") {
+ _runtime_state.resize_op_id_to_local_state(-1);
+ init();
+ _profile = _runtime_state.runtime_profile();
+ WARN_IF_ERROR(_runtime_state.init(_unique_id, _query_options,
_query_globals, _env),
+ "fail to init _runtime_state");
}
+ void init();
+ void generate_scanner(std::shared_ptr<FileScanner>& scanner);
+
void TearDown() override {
- Status st = io::global_local_filesystem()->delete_directory(wal_dir);
+ Status st = io::global_local_filesystem()->delete_directory(_wal_dir);
if (!st.ok()) {
- LOG(WARNING) << "fail to delete " << wal_dir.string();
+ LOG(WARNING) << "fail to delete " << _wal_dir;
} else {
- LOG(INFO) << "delete " << wal_dir.string();
+ LOG(INFO) << "delete " << _wal_dir;
}
+ WARN_IF_ERROR(_scan_node->close(&_runtime_state), "fail to close
scan_node")
+
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir),
+ fmt::format("fail to delete dir={}", _wal_dir));
SAFE_STOP(_env->_wal_manager);
- SAFE_DELETE(_env->_function_client_cache);
- SAFE_DELETE(_env->_internal_client_cache);
- SAFE_DELETE(_env->_cluster_info);
- _env->clear_new_load_stream_mgr();
- _env->clear_stream_load_executor();
- //_env->clear_wal_mgr();
+ delete _env->cluster_info();
+ _env->clear_wal_mgr();
}
- void prepare() {
- Status st = io::global_local_filesystem()->create_directory(wal_dir);
- if (!st.ok()) {
- LOG(WARNING) << "fail to create dir " << wal_dir.string();
- }
+protected:
+ virtual void SetUp() override {}
+
+private:
+ void _prepare();
+ void _init_desc_table();
+
+ ExecEnv* _env = nullptr;
+ std::string _wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
+ int64_t _db_id = 1;
+ int64_t _tb_id = 2;
+ int64_t _txn_id_1 = 123;
+ int64_t _txn_id_2 = 456;
+ uint32_t _version_0 = 0;
+ uint32_t _version_1 = 1;
+ int64_t _backend_id = 1001;
+ std::string _label_1 = "test1";
+ std::string _label_2 = "test2";
+
+ TupleId _dst_tuple_id = 0;
+ RuntimeState _runtime_state;
+ RuntimeProfile _global_profile;
+ RuntimeProfile* _profile;
+ ObjectPool _obj_pool;
+ DescriptorTbl* _desc_tbl;
+ std::vector<TNetworkAddress> _addresses;
+ ScannerCounter _counter;
+ std::vector<TExpr> _pre_filter;
+ TPlanNode _tnode;
+ TUniqueId _unique_id;
+ TQueryOptions _query_options;
+ TQueryGlobals _query_globals;
+ std::shared_ptr<pipeline::FileScanOperatorX> _scan_node = nullptr;
+ std::vector<TFileRangeDesc> _ranges;
+ TFileRangeDesc _range_desc;
+ TFileScanRange _scan_range;
+ std::unique_ptr<ShardedKVCache> _kv_cache = nullptr;
+ std::unique_ptr<ClusterInfo> _cluster_info = nullptr;
+};
+
+void WalManagerTest::_prepare() {
+ Status st = io::global_local_filesystem()->create_directory(_wal_dir);
+ if (!st.ok()) {
+ LOG(WARNING) << "fail to create dir " << _wal_dir;
}
+}
+void WalManagerTest::_init_desc_table() {
+ TDescriptorTable t_desc_table;
- void createWal(const std::string& wal_path) {
- auto wal_writer = WalWriter(wal_path);
- Status st = wal_writer.init();
- if (!st.ok()) {
- LOG(WARNING) << "fail to int wal reader on path " << wal_path;
- }
- st = wal_writer.finalize();
- if (!st.ok()) {
- LOG(WARNING) << "fail to finalize wal reader on path " << wal_path;
+ // table descriptors
+ TTableDescriptor t_table_desc;
+
+ t_table_desc.id = 0;
+ t_table_desc.tableType = TTableType::OLAP_TABLE;
+ t_table_desc.numCols = 0;
+ t_table_desc.numClusteringCols = 0;
+ t_desc_table.tableDescriptors.push_back(t_table_desc);
+ t_desc_table.__isset.tableDescriptors = true;
+
+ int next_slot_id = 1;
+ // TSlotDescriptor
+ // int offset = 1;
+ // int i = 0;
+ // c1
+ {
+ TSlotDescriptor slot_desc;
+
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(32);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
}
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 0;
+ slot_desc.byteOffset = 0;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = -1;
+ slot_desc.colName = "c1";
+ slot_desc.slotIdx = 1;
+ slot_desc.col_unique_id = 0;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
}
-};
+ // c2
+ {
+ TSlotDescriptor slot_desc;
-TEST_F(WalManagerTest, recovery_normal) {
- _env->wal_mgr()->wal_limit_test_bytes = 1099511627776;
-
- std::string db_id = "1";
- int64_t tb_1_id = 1;
- std::string wal_file_1 = "0_1001_1_group_commit_label1";
- std::string wal_file_2 = "0_1001_2_group_commit_label2";
- int64_t tb_2_id = 2;
- std::string wal_file_3 = "0_1001_3_group_commit_label3";
- std::string wal_file_4 = "0_1001_4_group_commit_label4";
-
- bool res = std::filesystem::create_directory(wal_dir.string() + "/" +
db_id);
- ASSERT_TRUE(res);
- res = std::filesystem::create_directory(wal_dir.string() + "/" + db_id +
"/" +
- std::to_string(tb_1_id));
- ASSERT_TRUE(res);
- std::string wal_1 =
- wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_1_id) +
"/" + wal_file_1;
- std::string wal_2 =
- wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_1_id) +
"/" + wal_file_2;
- createWal(wal_1);
- createWal(wal_2);
-
- res = std::filesystem::create_directory(wal_dir.string() + "/" + db_id +
"/" +
- std::to_string(tb_2_id));
- ASSERT_TRUE(res);
- std::string wal_3 =
- wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_2_id) +
"/" + wal_file_3;
- std::string wal_4 =
- wal_dir.string() + "/" + db_id + "/" + std::to_string(tb_2_id) +
"/" + wal_file_4;
- createWal(wal_3);
- createWal(wal_4);
- Status st = _env->wal_mgr()->init();
- if (!st.ok()) {
- LOG(WARNING) << "fail to int wal manager ";
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(32);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
+ }
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 1;
+ slot_desc.byteOffset = 4;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = -1;
+ slot_desc.colName = "c2";
+ slot_desc.slotIdx = 2;
+ slot_desc.col_unique_id = 1;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
}
+ // c3
+ {
+ TSlotDescriptor slot_desc;
- auto count = 0;
- while (std::filesystem::exists(wal_1) || std::filesystem::exists(wal_2) ||
- std::filesystem::exists(wal_3) || std::filesystem::exists(wal_4)) {
- if (count > 30) {
- LOG(WARNING) << "wait time out";
- break;
+ slot_desc.id = next_slot_id++;
+ slot_desc.parent = 0;
+ TTypeDesc type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(32);
+ node.__set_scalar_type(scalar_type);
+ type.types.push_back(node);
}
- sleep(1);
- count++;
- continue;
+ slot_desc.slotType = type;
+ slot_desc.columnPos = 2;
+ slot_desc.byteOffset = 8;
+ slot_desc.nullIndicatorByte = 0;
+ slot_desc.nullIndicatorBit = -1;
+ slot_desc.colName = "c3";
+ slot_desc.slotIdx = 3;
+ slot_desc.col_unique_id = 2;
+ slot_desc.isMaterialized = true;
+
+ t_desc_table.slotDescriptors.push_back(slot_desc);
}
- ASSERT_TRUE(!std::filesystem::exists(wal_1));
- ASSERT_TRUE(!std::filesystem::exists(wal_2));
- ASSERT_TRUE(!std::filesystem::exists(wal_3));
- ASSERT_TRUE(!std::filesystem::exists(wal_4));
+
+ t_desc_table.__isset.slotDescriptors = true;
+ {
+ // TTupleDescriptor dest
+ TTupleDescriptor t_tuple_desc;
+ t_tuple_desc.id = 0;
+ t_tuple_desc.byteSize = 12;
+ t_tuple_desc.numNullBytes = 0;
+ t_tuple_desc.tableId = 0;
+ t_tuple_desc.__isset.tableId = true;
+ t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+ }
+
+ auto st = DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
+
+ _runtime_state.set_desc_tbl(_desc_tbl);
}
-TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
- auto wal_mgr = WalManager::create_unique(_env,
config::group_commit_wal_path);
- static_cast<void>(wal_mgr->init());
- _env->set_wal_mgr(std::move(wal_mgr));
+void WalManagerTest::init() {
+ config::group_commit_wal_max_disk_limit = "100M";
+ _prepare();
+ _init_desc_table();
+ WARN_IF_ERROR(io::global_local_filesystem()->create_directory(
+ _wal_dir + "/" + std::to_string(_db_id) + "/" +
std::to_string(_tb_id)),
+ "fail to creat directory");
+
+ // Node Id
+ _tnode.node_id = 0;
+ _tnode.node_type = TPlanNodeType::FILE_SCAN_NODE;
+ _tnode.num_children = 0;
+ _tnode.limit = -1;
+ _tnode.row_tuples.push_back(0);
+ _tnode.nullable_tuples.push_back(false);
+ _tnode.file_scan_node.tuple_id = 0;
+ _tnode.__isset.file_scan_node = true;
+
+ _scan_node =
+ std::make_shared<pipeline::FileScanOperatorX>(&_obj_pool, _tnode,
0, *_desc_tbl, 1);
+ _scan_node->_output_tuple_desc =
_runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
+ WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init
scan_node");
+ WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to prepare
scan_node");
+ auto local_state =
+ pipeline::FileScanLocalState::create_unique(&_runtime_state,
_scan_node.get());
+ std::vector<TScanRangeParams> scan_ranges;
+ std::map<int,
std::pair<std::shared_ptr<pipeline::LocalExchangeSharedState>,
+ std::shared_ptr<pipeline::Dependency>>>
+ le_state_map;
+ pipeline::LocalStateInfo info {&_global_profile, scan_ranges, nullptr,
le_state_map, 0};
+ WARN_IF_ERROR(local_state->init(&_runtime_state, info), "fail to init
local_state");
+ _runtime_state.emplace_local_state(_scan_node->operator_id(),
std::move(local_state));
+
+ _range_desc.start_offset = 0;
+ _range_desc.size = 1000;
+ _ranges.push_back(_range_desc);
+ _scan_range.ranges = _ranges;
+ _scan_range.__isset.params = true;
+ _scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
+ _kv_cache.reset(new ShardedKVCache(48));
+ _env = ExecEnv::GetInstance();
+ _env->set_cluster_info(new ClusterInfo());
+ _env->_cluster_info->master_fe_addr.hostname = "host name";
+ _env->_cluster_info->master_fe_addr.port = _backend_id;
+ _env->_cluster_info->backend_id = 1001;
+ _env->set_wal_mgr(WalManager::create_unique(_env, _wal_dir));
+ std::string base_path;
+ auto st = _env->_wal_manager->_init_wal_dirs_info();
+ st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1,
_label_1, base_path,
+ _version_0);
+ std::string src = "./be/test/exec/test_data/wal_scanner/wal_version0";
+ std::string dst = _wal_dir + "/" + std::to_string(_db_id) + "/" +
std::to_string(_tb_id) + "/" +
+ std::to_string(_version_0) + "_" +
std::to_string(_backend_id) + "_" +
+ std::to_string(_txn_id_1) + "_" + _label_1;
+ std::filesystem::copy(src, dst);
+ st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_2,
_label_2, base_path,
+ _version_1);
+ src = "./be/test/exec/test_data/wal_scanner/wal_version1";
+ dst = _wal_dir + "/" + std::to_string(_db_id) + "/" +
std::to_string(_tb_id) + "/" +
+ std::to_string(_version_1) + "_" + std::to_string(_backend_id) + "_"
+
+ std::to_string(_txn_id_2) + "_" + _label_2;
+ std::filesystem::copy(src, dst);
+}
+
+void WalManagerTest::generate_scanner(std::shared_ptr<FileScanner>& scanner) {
+ auto split_source =
std::make_shared<TestSplitSourceConnector>(_scan_range);
+ std::unordered_map<std::string, ColumnValueRangeType>
_colname_to_value_range;
+ std::unordered_map<std::string, int> _colname_to_slot_id;
+ scanner = std::make_shared<FileScanner>(
+ &_runtime_state,
+
&(_runtime_state.get_local_state(0)->cast<pipeline::FileScanLocalState>()), -1,
+ split_source, _profile, _kv_cache.get(), &_colname_to_value_range,
+ &_colname_to_slot_id);
+ scanner->_is_load = false;
+ vectorized::VExprContextSPtrs _conjuncts;
+ WARN_IF_ERROR(scanner->prepare(&_runtime_state, _conjuncts), "fail to
prepare scanner");
+}
+
+TEST_F(WalManagerTest, read_block_normal) {
+ // read wal file with wal_version=0
+ _runtime_state._wal_id = _txn_id_1;
+ std::shared_ptr<FileScanner> scanner = nullptr;
+ generate_scanner(scanner);
+ std::unique_ptr<vectorized::Block> block(new vectorized::Block());
+ bool eof = false;
+ auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
+ ASSERT_TRUE(st.ok());
+ EXPECT_EQ(3, block->rows());
+ block->clear();
+ st = scanner->get_block(&_runtime_state, block.get(), &eof);
+ ASSERT_TRUE(st.ok());
+ EXPECT_EQ(0, block->rows());
+ ASSERT_TRUE(eof);
+ WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
+ // read wal file with wal_version=1
+ eof = false;
+ _runtime_state._wal_id = _txn_id_2;
+ generate_scanner(scanner);
+ st = scanner->get_block(&_runtime_state, block.get(), &eof);
+ ASSERT_TRUE(st.ok());
+ EXPECT_EQ(3, block->rows());
+ block->clear();
+ st = scanner->get_block(&_runtime_state, block.get(), &eof);
+ ASSERT_TRUE(st.ok());
+ EXPECT_EQ(0, block->rows());
+ ASSERT_TRUE(eof);
+ WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
+}
+
+TEST_F(WalManagerTest, read_block_fail_with_not_equal) {
+ auto sp = SyncPoint::get_instance();
+ SyncPoint::CallbackGuard guard1;
+ sp->set_call_back(
+ "WalReader::set_column_id_count",
+ [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 2; },
&guard1);
+ SyncPoint::CallbackGuard guard2;
+ sp->set_call_back(
+ "WalReader::set_out_block_column_size",
+ [](auto&& args) { *try_any_cast<size_t*>(args[0]) = 2; }, &guard2);
+ sp->enable_processing();
+
+ _runtime_state._wal_id = _txn_id_1;
+ std::shared_ptr<FileScanner> scanner = nullptr;
+ generate_scanner(scanner);
+ std::unique_ptr<vectorized::Block> block(new vectorized::Block());
+ bool eof = false;
+ auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
+ ASSERT_FALSE(st.ok());
+ auto msg = st.to_string();
+ auto pos = msg.find("not equal");
+ ASSERT_TRUE(pos != msg.npos);
+ WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
+}
+
+TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
// 1T
size_t available_bytes = 1099511627776;
size_t wal_limit_bytes;
@@ -245,4 +474,6 @@ TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(),
Status::InternalError(""));
EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes);
}
+
+} // namespace vectorized
} // namespace doris
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp
b/be/test/vec/exec/vwal_scanner_test.cpp
deleted file mode 100644
index 69fa000ffcd..00000000000
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ /dev/null
@@ -1,380 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include <gtest/gtest.h>
-
-#include <string>
-#include <vector>
-
-#include "common/object_pool.h"
-#include "cpp/sync_point.h"
-#include "gen_cpp/Descriptors_types.h"
-#include "gen_cpp/PlanNodes_types.h"
-#include "io/fs/local_file_system.h"
-#include "olap/wal/wal_manager.h"
-#include "pipeline/exec/file_scan_operator.h"
-#include "runtime/cluster_info.h"
-#include "runtime/descriptors.h"
-#include "runtime/memory/mem_tracker.h"
-#include "runtime/runtime_state.h"
-#include "runtime/user_function_cache.h"
-#include "vec/exec/scan/file_scanner.h"
-
-namespace doris {
-
-namespace vectorized {
-
-class TestSplitSourceConnector : public SplitSourceConnector {
-private:
- std::mutex _range_lock;
- TFileScanRange _scan_range;
- int _range_index = 0;
-
-public:
- TestSplitSourceConnector(const TFileScanRange& scan_range) :
_scan_range(scan_range) {}
-
- Status get_next(bool* has_next, TFileRangeDesc* range) override {
- std::lock_guard<std::mutex> l(_range_lock);
- if (_range_index < _scan_range.ranges.size()) {
- *has_next = true;
- *range = _scan_range.ranges[_range_index++];
- } else {
- *has_next = false;
- }
- return Status::OK();
- }
-
- int num_scan_ranges() override { return _scan_range.ranges.size(); }
-
- TFileScanRangeParams* get_params() override { return &_scan_range.params; }
-};
-
-class VWalScannerTest : public testing::Test {
-public:
- VWalScannerTest() : _runtime_state(TQueryGlobals()),
_global_profile("<global profile>") {
- _runtime_state.resize_op_id_to_local_state(-1);
- init();
- _profile = _runtime_state.runtime_profile();
- WARN_IF_ERROR(_runtime_state.init(_unique_id, _query_options,
_query_globals, _env),
- "fail to init _runtime_state");
- }
- void init();
- void generate_scanner(std::shared_ptr<FileScanner>& scanner);
-
- void TearDown() override {
- WARN_IF_ERROR(_scan_node->close(&_runtime_state), "fail to close
scan_node")
-
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir),
- fmt::format("fail to delete dir={}", _wal_dir));
- SAFE_STOP(_env->_wal_manager);
- _env->clear_wal_mgr();
- }
-
-protected:
- virtual void SetUp() override {}
-
-private:
- void _init_desc_table();
-
- ExecEnv* _env = nullptr;
- std::string _wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
- int64_t _db_id = 1;
- int64_t _tb_id = 2;
- int64_t _txn_id_1 = 123;
- int64_t _txn_id_2 = 456;
- uint32_t _version_0 = 0;
- uint32_t _version_1 = 1;
- int64_t _backend_id = 1001;
- std::string _label_1 = "test1";
- std::string _label_2 = "test2";
-
- TupleId _dst_tuple_id = 0;
- RuntimeState _runtime_state;
- RuntimeProfile _global_profile;
- RuntimeProfile* _profile;
- ObjectPool _obj_pool;
- DescriptorTbl* _desc_tbl;
- std::vector<TNetworkAddress> _addresses;
- ScannerCounter _counter;
- std::vector<TExpr> _pre_filter;
- TPlanNode _tnode;
- TUniqueId _unique_id;
- TQueryOptions _query_options;
- TQueryGlobals _query_globals;
- std::shared_ptr<pipeline::FileScanOperatorX> _scan_node = nullptr;
- std::vector<TFileRangeDesc> _ranges;
- TFileRangeDesc _range_desc;
- TFileScanRange _scan_range;
- std::unique_ptr<ShardedKVCache> _kv_cache = nullptr;
- std::unique_ptr<ClusterInfo> _cluster_info = nullptr;
-};
-
-void VWalScannerTest::_init_desc_table() {
- TDescriptorTable t_desc_table;
-
- // table descriptors
- TTableDescriptor t_table_desc;
-
- t_table_desc.id = 0;
- t_table_desc.tableType = TTableType::OLAP_TABLE;
- t_table_desc.numCols = 0;
- t_table_desc.numClusteringCols = 0;
- t_desc_table.tableDescriptors.push_back(t_table_desc);
- t_desc_table.__isset.tableDescriptors = true;
-
- int next_slot_id = 1;
- // TSlotDescriptor
- // int offset = 1;
- // int i = 0;
- // c1
- {
- TSlotDescriptor slot_desc;
-
- slot_desc.id = next_slot_id++;
- slot_desc.parent = 0;
- TTypeDesc type;
- {
- TTypeNode node;
- node.__set_type(TTypeNodeType::SCALAR);
- TScalarType scalar_type;
- scalar_type.__set_type(TPrimitiveType::VARCHAR);
- scalar_type.__set_len(32);
- node.__set_scalar_type(scalar_type);
- type.types.push_back(node);
- }
- slot_desc.slotType = type;
- slot_desc.columnPos = 0;
- slot_desc.byteOffset = 0;
- slot_desc.nullIndicatorByte = 0;
- slot_desc.nullIndicatorBit = -1;
- slot_desc.colName = "c1";
- slot_desc.slotIdx = 1;
- slot_desc.col_unique_id = 0;
- slot_desc.isMaterialized = true;
-
- t_desc_table.slotDescriptors.push_back(slot_desc);
- }
- // c2
- {
- TSlotDescriptor slot_desc;
-
- slot_desc.id = next_slot_id++;
- slot_desc.parent = 0;
- TTypeDesc type;
- {
- TTypeNode node;
- node.__set_type(TTypeNodeType::SCALAR);
- TScalarType scalar_type;
- scalar_type.__set_type(TPrimitiveType::VARCHAR);
- scalar_type.__set_len(32);
- node.__set_scalar_type(scalar_type);
- type.types.push_back(node);
- }
- slot_desc.slotType = type;
- slot_desc.columnPos = 1;
- slot_desc.byteOffset = 4;
- slot_desc.nullIndicatorByte = 0;
- slot_desc.nullIndicatorBit = -1;
- slot_desc.colName = "c2";
- slot_desc.slotIdx = 2;
- slot_desc.col_unique_id = 1;
- slot_desc.isMaterialized = true;
-
- t_desc_table.slotDescriptors.push_back(slot_desc);
- }
- // c3
- {
- TSlotDescriptor slot_desc;
-
- slot_desc.id = next_slot_id++;
- slot_desc.parent = 0;
- TTypeDesc type;
- {
- TTypeNode node;
- node.__set_type(TTypeNodeType::SCALAR);
- TScalarType scalar_type;
- scalar_type.__set_type(TPrimitiveType::VARCHAR);
- scalar_type.__set_len(32);
- node.__set_scalar_type(scalar_type);
- type.types.push_back(node);
- }
- slot_desc.slotType = type;
- slot_desc.columnPos = 2;
- slot_desc.byteOffset = 8;
- slot_desc.nullIndicatorByte = 0;
- slot_desc.nullIndicatorBit = -1;
- slot_desc.colName = "c3";
- slot_desc.slotIdx = 3;
- slot_desc.col_unique_id = 2;
- slot_desc.isMaterialized = true;
-
- t_desc_table.slotDescriptors.push_back(slot_desc);
- }
-
- t_desc_table.__isset.slotDescriptors = true;
- {
- // TTupleDescriptor dest
- TTupleDescriptor t_tuple_desc;
- t_tuple_desc.id = 0;
- t_tuple_desc.byteSize = 12;
- t_tuple_desc.numNullBytes = 0;
- t_tuple_desc.tableId = 0;
- t_tuple_desc.__isset.tableId = true;
- t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
- }
-
- auto st = DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
-
- _runtime_state.set_desc_tbl(_desc_tbl);
-}
-
-void VWalScannerTest::init() {
- config::group_commit_wal_max_disk_limit = "100M";
- _init_desc_table();
- WARN_IF_ERROR(io::global_local_filesystem()->create_directory(
- _wal_dir + "/" + std::to_string(_db_id) + "/" +
std::to_string(_tb_id)),
- "fail to creat directory");
-
- // Node Id
- _tnode.node_id = 0;
- _tnode.node_type = TPlanNodeType::FILE_SCAN_NODE;
- _tnode.num_children = 0;
- _tnode.limit = -1;
- _tnode.row_tuples.push_back(0);
- _tnode.nullable_tuples.push_back(false);
- _tnode.file_scan_node.tuple_id = 0;
- _tnode.__isset.file_scan_node = true;
-
- _scan_node =
- std::make_shared<pipeline::FileScanOperatorX>(&_obj_pool, _tnode,
0, *_desc_tbl, 1);
- _scan_node->_output_tuple_desc =
_runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
- WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init
scan_node");
- WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to prepare
scan_node");
-
- auto local_state =
- pipeline::FileScanLocalState::create_unique(&_runtime_state,
_scan_node.get());
- std::vector<TScanRangeParams> scan_ranges;
- std::map<int,
std::pair<std::shared_ptr<pipeline::LocalExchangeSharedState>,
- std::shared_ptr<pipeline::Dependency>>>
- le_state_map;
- pipeline::LocalStateInfo info {&_global_profile, scan_ranges, nullptr,
le_state_map, 0};
- WARN_IF_ERROR(local_state->init(&_runtime_state, info), "fail to init
local_state");
- _runtime_state.emplace_local_state(_scan_node->operator_id(),
std::move(local_state));
-
- _range_desc.start_offset = 0;
- _range_desc.size = 1000;
- _ranges.push_back(_range_desc);
- _scan_range.ranges = _ranges;
- _scan_range.__isset.params = true;
- _scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
- _kv_cache.reset(new ShardedKVCache(48));
-
- _cluster_info.reset(new ClusterInfo());
- _env = ExecEnv::GetInstance();
- _env->_cluster_info = _cluster_info.get();
- _env->_cluster_info->master_fe_addr.hostname = "host name";
- _env->_cluster_info->master_fe_addr.port = _backend_id;
- _env->_cluster_info->backend_id = 1001;
- _env->set_wal_mgr(WalManager::create_unique(_env, _wal_dir));
- std::string base_path;
- auto st = _env->_wal_manager->_init_wal_dirs_info();
- st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1,
_label_1, base_path,
- _version_0);
- std::string src = "./be/test/exec/test_data/wal_scanner/wal_version0";
- std::string dst = _wal_dir + "/" + std::to_string(_db_id) + "/" +
std::to_string(_tb_id) + "/" +
- std::to_string(_version_0) + "_" +
std::to_string(_backend_id) + "_" +
- std::to_string(_txn_id_1) + "_" + _label_1;
- std::filesystem::copy(src, dst);
- st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_2,
_label_2, base_path,
- _version_1);
- src = "./be/test/exec/test_data/wal_scanner/wal_version1";
- dst = _wal_dir + "/" + std::to_string(_db_id) + "/" +
std::to_string(_tb_id) + "/" +
- std::to_string(_version_1) + "_" + std::to_string(_backend_id) + "_"
+
- std::to_string(_txn_id_2) + "_" + _label_2;
- std::filesystem::copy(src, dst);
-}
-
-void VWalScannerTest::generate_scanner(std::shared_ptr<FileScanner>& scanner) {
- auto split_source =
std::make_shared<TestSplitSourceConnector>(_scan_range);
- std::unordered_map<std::string, ColumnValueRangeType>
_colname_to_value_range;
- std::unordered_map<std::string, int> _colname_to_slot_id;
- scanner = std::make_shared<FileScanner>(
- &_runtime_state,
-
&(_runtime_state.get_local_state(0)->cast<pipeline::FileScanLocalState>()), -1,
- split_source, _profile, _kv_cache.get(), &_colname_to_value_range,
- &_colname_to_slot_id);
- scanner->_is_load = false;
- vectorized::VExprContextSPtrs _conjuncts;
- WARN_IF_ERROR(scanner->prepare(&_runtime_state, _conjuncts), "fail to
prepare scanner");
-}
-
-TEST_F(VWalScannerTest, normal) {
- // read wal file with wal_version=0
- _runtime_state._wal_id = _txn_id_1;
- std::shared_ptr<FileScanner> scanner = nullptr;
- generate_scanner(scanner);
- std::unique_ptr<vectorized::Block> block(new vectorized::Block());
- bool eof = false;
- auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
- ASSERT_TRUE(st.ok());
- EXPECT_EQ(3, block->rows());
- block->clear();
- st = scanner->get_block(&_runtime_state, block.get(), &eof);
- ASSERT_TRUE(st.ok());
- EXPECT_EQ(0, block->rows());
- ASSERT_TRUE(eof);
- WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
- // read wal file with wal_version=1
- eof = false;
- _runtime_state._wal_id = _txn_id_2;
- generate_scanner(scanner);
- st = scanner->get_block(&_runtime_state, block.get(), &eof);
- ASSERT_TRUE(st.ok());
- EXPECT_EQ(3, block->rows());
- block->clear();
- st = scanner->get_block(&_runtime_state, block.get(), &eof);
- ASSERT_TRUE(st.ok());
- EXPECT_EQ(0, block->rows());
- ASSERT_TRUE(eof);
- WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
-}
-
-TEST_F(VWalScannerTest, fail_with_not_equal) {
- auto sp = SyncPoint::get_instance();
- SyncPoint::CallbackGuard guard1;
- sp->set_call_back(
- "WalReader::set_column_id_count",
- [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 2; },
&guard1);
- SyncPoint::CallbackGuard guard2;
- sp->set_call_back(
- "WalReader::set_out_block_column_size",
- [](auto&& args) { *try_any_cast<size_t*>(args[0]) = 2; }, &guard2);
- sp->enable_processing();
-
- _runtime_state._wal_id = _txn_id_1;
- std::shared_ptr<FileScanner> scanner = nullptr;
- generate_scanner(scanner);
- std::unique_ptr<vectorized::Block> block(new vectorized::Block());
- bool eof = false;
- auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
- ASSERT_FALSE(st.ok());
- auto msg = st.to_string();
- auto pos = msg.find("not equal");
- ASSERT_TRUE(pos != msg.npos);
- WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
-}
-
-} // namespace vectorized
-} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]