This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a6f0a4616d6 [fix](group_commit)Fix bound checking problem when reading
wal block (#31112)
a6f0a4616d6 is described below
commit a6f0a4616d656ed60bf796cdf712bbcec5125874
Author: huanghaibin <[email protected]>
AuthorDate: Thu Feb 22 09:09:35 2024 +0800
[fix](group_commit)Fix bound checking problem when reading wal block
(#31112)
---
be/src/vec/exec/format/wal/wal_reader.cpp | 21 ++--
be/test/vec/exec/vwal_scanner_test.cpp | 155 ++++++++++++++++++------------
2 files changed, 106 insertions(+), 70 deletions(-)
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp
b/be/src/vec/exec/format/wal/wal_reader.cpp
index cb89dd9bcf0..981e740d05c 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -18,6 +18,7 @@
#include "wal_reader.h"
#include "common/logging.h"
+#include "common/sync_point.h"
#include "gutil/strings/split.h"
#include "olap/wal/wal_manager.h"
#include "runtime/runtime_state.h"
@@ -61,11 +62,17 @@ Status WalReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
//convert to dst block
vectorized::Block dst_block;
int index = 0;
- auto columns = block->get_columns_with_type_and_name();
- if (_column_id_count != columns.size() || columns.size() !=
_tuple_descriptor->slots().size()) {
+ auto output_block_columns = block->get_columns_with_type_and_name();
+ size_t output_block_column_size = output_block_columns.size();
+ TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count",
&_column_id_count);
+ TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size",
&output_block_column_size);
+ if (_column_id_count != src_block.columns() ||
+ output_block_column_size != _tuple_descriptor->slots().size()) {
return Status::InternalError(
- "not equal _column_id_count={} vs columns size={} vs
tuple_descriptor size={}",
- std::to_string(_column_id_count),
std::to_string(columns.size()),
+ "not equal wal _column_id_count={} vs wal block columns
size={}, "
+ "output block columns size={} vs tuple_descriptor size={}",
+ std::to_string(_column_id_count),
std::to_string(src_block.columns()),
+ std::to_string(output_block_column_size),
std::to_string(_tuple_descriptor->slots().size()));
}
for (auto slot_desc : _tuple_descriptor->slots()) {
@@ -78,9 +85,9 @@ Status WalReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
if (column_ptr != nullptr && slot_desc->is_nullable()) {
column_ptr = make_nullable(column_ptr);
}
- dst_block.insert(
- index,
vectorized::ColumnWithTypeAndName(std::move(column_ptr), columns[index].type,
- columns[index].name));
+ dst_block.insert(index, vectorized::ColumnWithTypeAndName(
+ std::move(column_ptr),
output_block_columns[index].type,
+ output_block_columns[index].name));
index++;
}
block->swap(dst_block);
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp
b/be/test/vec/exec/vwal_scanner_test.cpp
index 8d4fe6ad75f..af292640368 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -20,6 +20,7 @@
#include <vector>
#include "common/object_pool.h"
+#include "common/sync_point.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "io/fs/local_file_system.h"
@@ -40,12 +41,16 @@ public:
init();
_profile = _runtime_state.runtime_profile();
_runtime_state.init_mem_trackers();
- static_cast<void>(_runtime_state.init(unique_id, query_options,
query_globals, _env));
+ WARN_IF_ERROR(_runtime_state.init(_unique_id, _query_options,
_query_globals, _env),
+ "fail to init _runtime_state");
}
void init();
+ void generate_scanner(std::shared_ptr<VFileScanner>& scanner);
void TearDown() override {
-
static_cast<void>(io::global_local_filesystem()->delete_directory(wal_dir));
+ WARN_IF_ERROR(_scan_node->close(&_runtime_state), "fail to close
scan_node")
+
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir),
+ fmt::format("fail to delete dir={}", _wal_dir));
SAFE_STOP(_env->_wal_manager);
}
@@ -53,16 +58,16 @@ protected:
virtual void SetUp() override {}
private:
- void init_desc_table();
+ void _init_desc_table();
ExecEnv* _env = nullptr;
- std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
- int64_t db_id = 1;
- int64_t tb_id = 2;
- int64_t txn_id = 789;
- int64_t version = 0;
- int64_t backend_id = 1001;
- std::string label = "test";
+ std::string _wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
+ int64_t _db_id = 1;
+ int64_t _tb_id = 2;
+ int64_t _txn_id = 789;
+ int64_t _version = 0;
+ int64_t _backend_id = 1001;
+ std::string _label = "test";
TupleId _dst_tuple_id = 0;
RuntimeState _runtime_state;
@@ -73,12 +78,18 @@ private:
ScannerCounter _counter;
std::vector<TExpr> _pre_filter;
TPlanNode _tnode;
- TUniqueId unique_id;
- TQueryOptions query_options;
- TQueryGlobals query_globals;
+ TUniqueId _unique_id;
+ TQueryOptions _query_options;
+ TQueryGlobals _query_globals;
+ std::shared_ptr<NewFileScanNode> _scan_node = nullptr;
+ std::vector<TFileRangeDesc> _ranges;
+ TFileRangeDesc _range_desc;
+ TFileScanRange _scan_range;
+ std::unique_ptr<ShardedKVCache> _kv_cache = nullptr;
+ std::unique_ptr<TMasterInfo> _master_info = nullptr;
};
-void VWalScannerTest::init_desc_table() {
+void VWalScannerTest::_init_desc_table() {
TDescriptorTable t_desc_table;
// table descriptors
@@ -118,6 +129,7 @@ void VWalScannerTest::init_desc_table() {
slot_desc.nullIndicatorBit = -1;
slot_desc.colName = "c1";
slot_desc.slotIdx = 1;
+ slot_desc.col_unique_id = 0;
slot_desc.isMaterialized = true;
t_desc_table.slotDescriptors.push_back(slot_desc);
@@ -145,6 +157,7 @@ void VWalScannerTest::init_desc_table() {
slot_desc.nullIndicatorBit = -1;
slot_desc.colName = "c2";
slot_desc.slotIdx = 2;
+ slot_desc.col_unique_id = 1;
slot_desc.isMaterialized = true;
t_desc_table.slotDescriptors.push_back(slot_desc);
@@ -172,6 +185,7 @@ void VWalScannerTest::init_desc_table() {
slot_desc.nullIndicatorBit = -1;
slot_desc.colName = "c3";
slot_desc.slotIdx = 3;
+ slot_desc.col_unique_id = 2;
slot_desc.isMaterialized = true;
t_desc_table.slotDescriptors.push_back(slot_desc);
@@ -196,9 +210,10 @@ void VWalScannerTest::init_desc_table() {
void VWalScannerTest::init() {
config::group_commit_wal_max_disk_limit = "100M";
- init_desc_table();
- static_cast<void>(io::global_local_filesystem()->create_directory(
- wal_dir + "/" + std::to_string(db_id) + "/" +
std::to_string(tb_id)));
+ _init_desc_table();
+ WARN_IF_ERROR(io::global_local_filesystem()->create_directory(
+ _wal_dir + "/" + std::to_string(_db_id) + "/" +
std::to_string(_tb_id)),
+ "fail to creat directory");
// Node Id
_tnode.node_id = 0;
@@ -210,73 +225,87 @@ void VWalScannerTest::init() {
_tnode.file_scan_node.tuple_id = 0;
_tnode.__isset.file_scan_node = true;
+ _scan_node = std::make_shared<NewFileScanNode>(&_obj_pool, _tnode,
*_desc_tbl);
+ _scan_node->_output_tuple_desc =
_runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
+ WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init
scan_node");
+ WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to prepare
scan_node");
+
+ _range_desc.start_offset = 0;
+ _range_desc.size = 1000;
+ _ranges.push_back(_range_desc);
+ _scan_range.ranges = _ranges;
+ _scan_range.__isset.params = true;
+ _scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
+ _kv_cache.reset(new ShardedKVCache(48));
+
+ _runtime_state._wal_id = _txn_id;
+
+ _master_info.reset(new TMasterInfo());
_env = ExecEnv::GetInstance();
- _env->_master_info = new TMasterInfo();
+ _env->_master_info = _master_info.get();
_env->_master_info->network_address.hostname = "host name";
- _env->_master_info->network_address.port = backend_id;
+ _env->_master_info->network_address.port = _backend_id;
_env->_master_info->backend_id = 1001;
- _env->_wal_manager = WalManager::create_shared(_env, wal_dir);
+ _env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
std::string base_path;
auto st = _env->_wal_manager->_init_wal_dirs_info();
- st = _env->_wal_manager->create_wal_path(db_id, tb_id, txn_id, label,
base_path);
+ st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id, _label,
base_path);
std::string src = "./be/test/exec/test_data/wal_scanner/wal";
- std::string dst = wal_dir + "/" + std::to_string(db_id) + "/" +
std::to_string(tb_id) + "/" +
- std::to_string(version) + "_" +
std::to_string(backend_id) + "_" +
- std::to_string(txn_id) + "_" + label;
+ std::string dst = _wal_dir + "/" + std::to_string(_db_id) + "/" +
std::to_string(_tb_id) + "/" +
+ std::to_string(_version) + "_" +
std::to_string(_backend_id) + "_" +
+ std::to_string(_txn_id) + "_" + _label;
std::filesystem::copy(src, dst);
}
-TEST_F(VWalScannerTest, normal) {
- std::vector<size_t> index_vector;
- index_vector.emplace_back(0);
- index_vector.emplace_back(1);
- index_vector.emplace_back(2);
- // config::group_commit_replay_wal_dir = wal_dir;
- NewFileScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
- scan_node._output_tuple_desc =
_runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
- static_cast<void>(scan_node.init(_tnode, &_runtime_state));
- auto status = scan_node.prepare(&_runtime_state);
- EXPECT_TRUE(status.ok());
-
- std::vector<TFileRangeDesc> ranges;
- TFileRangeDesc range_desc;
- {
- range_desc.start_offset = 0;
- range_desc.size = 1000;
- }
- ranges.push_back(range_desc);
- TFileScanRange scan_range;
- scan_range.ranges = ranges;
- scan_range.__isset.params = true;
- scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
- std::unique_ptr<ShardedKVCache> _kv_cache;
- _kv_cache.reset(new ShardedKVCache(48));
- _runtime_state._wal_id = txn_id;
- VFileScanner scanner(&_runtime_state, &scan_node, -1, scan_range,
_profile, _kv_cache.get());
- scanner._is_load = false;
+void VWalScannerTest::generate_scanner(std::shared_ptr<VFileScanner>& scanner)
{
+ scanner = std::make_shared<VFileScanner>(&_runtime_state,
_scan_node.get(), -1, _scan_range,
+ _profile, _kv_cache.get());
+ scanner->_is_load = false;
vectorized::VExprContextSPtrs _conjuncts;
std::unordered_map<std::string, ColumnValueRangeType>
_colname_to_value_range;
std::unordered_map<std::string, int> _colname_to_slot_id;
- static_cast<void>(scanner.prepare(_conjuncts, &_colname_to_value_range,
&_colname_to_slot_id));
+ WARN_IF_ERROR(scanner->prepare(_conjuncts, &_colname_to_value_range,
&_colname_to_slot_id),
+ "fail to prepare scanner");
+}
+TEST_F(VWalScannerTest, normal) {
+ std::shared_ptr<VFileScanner> scanner = nullptr;
+ generate_scanner(scanner);
std::unique_ptr<vectorized::Block> block(new vectorized::Block());
bool eof = false;
- auto st = scanner.get_block(&_runtime_state, block.get(), &eof);
- EXPECT_EQ(3, block->rows());
+ auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
ASSERT_TRUE(st.ok());
+ EXPECT_EQ(3, block->rows());
block->clear();
- st = scanner.get_block(&_runtime_state, block.get(), &eof);
+ st = scanner->get_block(&_runtime_state, block.get(), &eof);
ASSERT_TRUE(st.ok());
EXPECT_EQ(0, block->rows());
ASSERT_TRUE(eof);
- static_cast<void>(scanner.close(&_runtime_state));
- static_cast<void>(scan_node.close(&_runtime_state));
+ WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
+}
- {
- std::stringstream ss;
- scan_node.runtime_profile()->pretty_print(&ss);
- LOG(INFO) << ss.str();
- }
+TEST_F(VWalScannerTest, fail_with_not_equal) {
+ auto sp = SyncPoint::get_instance();
+ Defer defer {[sp] {
+ sp->clear_call_back("WalReader::set_column_id_count");
+ sp->clear_call_back("WalReader::set_out_block_column_size");
+ }};
+ sp->set_call_back("WalReader::set_column_id_count",
+ [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 2;
});
+ sp->set_call_back("WalReader::set_out_block_column_size",
+ [](auto&& args) { *try_any_cast<size_t*>(args[0]) = 2;
});
+ sp->enable_processing();
+
+ std::shared_ptr<VFileScanner> scanner = nullptr;
+ generate_scanner(scanner);
+ std::unique_ptr<vectorized::Block> block(new vectorized::Block());
+ bool eof = false;
+ auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
+ ASSERT_FALSE(st.ok());
+ auto msg = st.to_string();
+ auto pos = msg.find("not equal");
+ ASSERT_TRUE(pos != msg.npos);
+ WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
}
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]