This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a6f0a4616d6 [fix](group_commit)Fix bound checking problem when reading 
wal block (#31112)
a6f0a4616d6 is described below

commit a6f0a4616d656ed60bf796cdf712bbcec5125874
Author: huanghaibin <[email protected]>
AuthorDate: Thu Feb 22 09:09:35 2024 +0800

    [fix](group_commit)Fix bound checking problem when reading wal block 
(#31112)
---
 be/src/vec/exec/format/wal/wal_reader.cpp |  21 ++--
 be/test/vec/exec/vwal_scanner_test.cpp    | 155 ++++++++++++++++++------------
 2 files changed, 106 insertions(+), 70 deletions(-)

diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp 
b/be/src/vec/exec/format/wal/wal_reader.cpp
index cb89dd9bcf0..981e740d05c 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -18,6 +18,7 @@
 #include "wal_reader.h"
 
 #include "common/logging.h"
+#include "common/sync_point.h"
 #include "gutil/strings/split.h"
 #include "olap/wal/wal_manager.h"
 #include "runtime/runtime_state.h"
@@ -61,11 +62,17 @@ Status WalReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
     //convert to dst block
     vectorized::Block dst_block;
     int index = 0;
-    auto columns = block->get_columns_with_type_and_name();
-    if (_column_id_count != columns.size() || columns.size() != 
_tuple_descriptor->slots().size()) {
+    auto output_block_columns = block->get_columns_with_type_and_name();
+    size_t output_block_column_size = output_block_columns.size();
+    TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count", 
&_column_id_count);
+    TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size", 
&output_block_column_size);
+    if (_column_id_count != src_block.columns() ||
+        output_block_column_size != _tuple_descriptor->slots().size()) {
         return Status::InternalError(
-                "not equal _column_id_count={} vs columns size={} vs 
tuple_descriptor size={}",
-                std::to_string(_column_id_count), 
std::to_string(columns.size()),
+                "not equal wal _column_id_count={} vs wal block columns 
size={}, "
+                "output block columns size={} vs tuple_descriptor size={}",
+                std::to_string(_column_id_count), 
std::to_string(src_block.columns()),
+                std::to_string(output_block_column_size),
                 std::to_string(_tuple_descriptor->slots().size()));
     }
     for (auto slot_desc : _tuple_descriptor->slots()) {
@@ -78,9 +85,9 @@ Status WalReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
         if (column_ptr != nullptr && slot_desc->is_nullable()) {
             column_ptr = make_nullable(column_ptr);
         }
-        dst_block.insert(
-                index, 
vectorized::ColumnWithTypeAndName(std::move(column_ptr), columns[index].type,
-                                                         columns[index].name));
+        dst_block.insert(index, vectorized::ColumnWithTypeAndName(
+                                        std::move(column_ptr), 
output_block_columns[index].type,
+                                        output_block_columns[index].name));
         index++;
     }
     block->swap(dst_block);
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp 
b/be/test/vec/exec/vwal_scanner_test.cpp
index 8d4fe6ad75f..af292640368 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -20,6 +20,7 @@
 #include <vector>
 
 #include "common/object_pool.h"
+#include "common/sync_point.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "io/fs/local_file_system.h"
@@ -40,12 +41,16 @@ public:
         init();
         _profile = _runtime_state.runtime_profile();
         _runtime_state.init_mem_trackers();
-        static_cast<void>(_runtime_state.init(unique_id, query_options, 
query_globals, _env));
+        WARN_IF_ERROR(_runtime_state.init(_unique_id, _query_options, 
_query_globals, _env),
+                      "fail to init _runtime_state");
     }
     void init();
+    void generate_scanner(std::shared_ptr<VFileScanner>& scanner);
 
     void TearDown() override {
-        
static_cast<void>(io::global_local_filesystem()->delete_directory(wal_dir));
+        WARN_IF_ERROR(_scan_node->close(&_runtime_state), "fail to close 
scan_node")
+        
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir),
+                      fmt::format("fail to delete dir={}", _wal_dir));
         SAFE_STOP(_env->_wal_manager);
     }
 
@@ -53,16 +58,16 @@ protected:
     virtual void SetUp() override {}
 
 private:
-    void init_desc_table();
+    void _init_desc_table();
 
     ExecEnv* _env = nullptr;
-    std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
-    int64_t db_id = 1;
-    int64_t tb_id = 2;
-    int64_t txn_id = 789;
-    int64_t version = 0;
-    int64_t backend_id = 1001;
-    std::string label = "test";
+    std::string _wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
+    int64_t _db_id = 1;
+    int64_t _tb_id = 2;
+    int64_t _txn_id = 789;
+    int64_t _version = 0;
+    int64_t _backend_id = 1001;
+    std::string _label = "test";
 
     TupleId _dst_tuple_id = 0;
     RuntimeState _runtime_state;
@@ -73,12 +78,18 @@ private:
     ScannerCounter _counter;
     std::vector<TExpr> _pre_filter;
     TPlanNode _tnode;
-    TUniqueId unique_id;
-    TQueryOptions query_options;
-    TQueryGlobals query_globals;
+    TUniqueId _unique_id;
+    TQueryOptions _query_options;
+    TQueryGlobals _query_globals;
+    std::shared_ptr<NewFileScanNode> _scan_node = nullptr;
+    std::vector<TFileRangeDesc> _ranges;
+    TFileRangeDesc _range_desc;
+    TFileScanRange _scan_range;
+    std::unique_ptr<ShardedKVCache> _kv_cache = nullptr;
+    std::unique_ptr<TMasterInfo> _master_info = nullptr;
 };
 
-void VWalScannerTest::init_desc_table() {
+void VWalScannerTest::_init_desc_table() {
     TDescriptorTable t_desc_table;
 
     // table descriptors
@@ -118,6 +129,7 @@ void VWalScannerTest::init_desc_table() {
         slot_desc.nullIndicatorBit = -1;
         slot_desc.colName = "c1";
         slot_desc.slotIdx = 1;
+        slot_desc.col_unique_id = 0;
         slot_desc.isMaterialized = true;
 
         t_desc_table.slotDescriptors.push_back(slot_desc);
@@ -145,6 +157,7 @@ void VWalScannerTest::init_desc_table() {
         slot_desc.nullIndicatorBit = -1;
         slot_desc.colName = "c2";
         slot_desc.slotIdx = 2;
+        slot_desc.col_unique_id = 1;
         slot_desc.isMaterialized = true;
 
         t_desc_table.slotDescriptors.push_back(slot_desc);
@@ -172,6 +185,7 @@ void VWalScannerTest::init_desc_table() {
         slot_desc.nullIndicatorBit = -1;
         slot_desc.colName = "c3";
         slot_desc.slotIdx = 3;
+        slot_desc.col_unique_id = 2;
         slot_desc.isMaterialized = true;
 
         t_desc_table.slotDescriptors.push_back(slot_desc);
@@ -196,9 +210,10 @@ void VWalScannerTest::init_desc_table() {
 
 void VWalScannerTest::init() {
     config::group_commit_wal_max_disk_limit = "100M";
-    init_desc_table();
-    static_cast<void>(io::global_local_filesystem()->create_directory(
-            wal_dir + "/" + std::to_string(db_id) + "/" + 
std::to_string(tb_id)));
+    _init_desc_table();
+    WARN_IF_ERROR(io::global_local_filesystem()->create_directory(
+                          _wal_dir + "/" + std::to_string(_db_id) + "/" + 
std::to_string(_tb_id)),
+                  "fail to creat directory");
 
     // Node Id
     _tnode.node_id = 0;
@@ -210,73 +225,87 @@ void VWalScannerTest::init() {
     _tnode.file_scan_node.tuple_id = 0;
     _tnode.__isset.file_scan_node = true;
 
+    _scan_node = std::make_shared<NewFileScanNode>(&_obj_pool, _tnode, 
*_desc_tbl);
+    _scan_node->_output_tuple_desc = 
_runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
+    WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init 
scan_node");
+    WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to prepare 
scan_node");
+
+    _range_desc.start_offset = 0;
+    _range_desc.size = 1000;
+    _ranges.push_back(_range_desc);
+    _scan_range.ranges = _ranges;
+    _scan_range.__isset.params = true;
+    _scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
+    _kv_cache.reset(new ShardedKVCache(48));
+
+    _runtime_state._wal_id = _txn_id;
+
+    _master_info.reset(new TMasterInfo());
     _env = ExecEnv::GetInstance();
-    _env->_master_info = new TMasterInfo();
+    _env->_master_info = _master_info.get();
     _env->_master_info->network_address.hostname = "host name";
-    _env->_master_info->network_address.port = backend_id;
+    _env->_master_info->network_address.port = _backend_id;
     _env->_master_info->backend_id = 1001;
-    _env->_wal_manager = WalManager::create_shared(_env, wal_dir);
+    _env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
     std::string base_path;
     auto st = _env->_wal_manager->_init_wal_dirs_info();
-    st = _env->_wal_manager->create_wal_path(db_id, tb_id, txn_id, label, 
base_path);
+    st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id, _label, 
base_path);
     std::string src = "./be/test/exec/test_data/wal_scanner/wal";
-    std::string dst = wal_dir + "/" + std::to_string(db_id) + "/" + 
std::to_string(tb_id) + "/" +
-                      std::to_string(version) + "_" + 
std::to_string(backend_id) + "_" +
-                      std::to_string(txn_id) + "_" + label;
+    std::string dst = _wal_dir + "/" + std::to_string(_db_id) + "/" + 
std::to_string(_tb_id) + "/" +
+                      std::to_string(_version) + "_" + 
std::to_string(_backend_id) + "_" +
+                      std::to_string(_txn_id) + "_" + _label;
     std::filesystem::copy(src, dst);
 }
 
-TEST_F(VWalScannerTest, normal) {
-    std::vector<size_t> index_vector;
-    index_vector.emplace_back(0);
-    index_vector.emplace_back(1);
-    index_vector.emplace_back(2);
-    //    config::group_commit_replay_wal_dir = wal_dir;
-    NewFileScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
-    scan_node._output_tuple_desc = 
_runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
-    static_cast<void>(scan_node.init(_tnode, &_runtime_state));
-    auto status = scan_node.prepare(&_runtime_state);
-    EXPECT_TRUE(status.ok());
-
-    std::vector<TFileRangeDesc> ranges;
-    TFileRangeDesc range_desc;
-    {
-        range_desc.start_offset = 0;
-        range_desc.size = 1000;
-    }
-    ranges.push_back(range_desc);
-    TFileScanRange scan_range;
-    scan_range.ranges = ranges;
-    scan_range.__isset.params = true;
-    scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
-    std::unique_ptr<ShardedKVCache> _kv_cache;
-    _kv_cache.reset(new ShardedKVCache(48));
-    _runtime_state._wal_id = txn_id;
-    VFileScanner scanner(&_runtime_state, &scan_node, -1, scan_range, 
_profile, _kv_cache.get());
-    scanner._is_load = false;
+void VWalScannerTest::generate_scanner(std::shared_ptr<VFileScanner>& scanner) 
{
+    scanner = std::make_shared<VFileScanner>(&_runtime_state, 
_scan_node.get(), -1, _scan_range,
+                                             _profile, _kv_cache.get());
+    scanner->_is_load = false;
     vectorized::VExprContextSPtrs _conjuncts;
     std::unordered_map<std::string, ColumnValueRangeType> 
_colname_to_value_range;
     std::unordered_map<std::string, int> _colname_to_slot_id;
-    static_cast<void>(scanner.prepare(_conjuncts, &_colname_to_value_range, 
&_colname_to_slot_id));
+    WARN_IF_ERROR(scanner->prepare(_conjuncts, &_colname_to_value_range, 
&_colname_to_slot_id),
+                  "fail to prepare scanner");
+}
 
+TEST_F(VWalScannerTest, normal) {
+    std::shared_ptr<VFileScanner> scanner = nullptr;
+    generate_scanner(scanner);
     std::unique_ptr<vectorized::Block> block(new vectorized::Block());
     bool eof = false;
-    auto st = scanner.get_block(&_runtime_state, block.get(), &eof);
-    EXPECT_EQ(3, block->rows());
+    auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
     ASSERT_TRUE(st.ok());
+    EXPECT_EQ(3, block->rows());
     block->clear();
-    st = scanner.get_block(&_runtime_state, block.get(), &eof);
+    st = scanner->get_block(&_runtime_state, block.get(), &eof);
     ASSERT_TRUE(st.ok());
     EXPECT_EQ(0, block->rows());
     ASSERT_TRUE(eof);
-    static_cast<void>(scanner.close(&_runtime_state));
-    static_cast<void>(scan_node.close(&_runtime_state));
+    WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
+}
 
-    {
-        std::stringstream ss;
-        scan_node.runtime_profile()->pretty_print(&ss);
-        LOG(INFO) << ss.str();
-    }
+TEST_F(VWalScannerTest, fail_with_not_equal) {
+    auto sp = SyncPoint::get_instance();
+    Defer defer {[sp] {
+        sp->clear_call_back("WalReader::set_column_id_count");
+        sp->clear_call_back("WalReader::set_out_block_column_size");
+    }};
+    sp->set_call_back("WalReader::set_column_id_count",
+                      [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 2; 
});
+    sp->set_call_back("WalReader::set_out_block_column_size",
+                      [](auto&& args) { *try_any_cast<size_t*>(args[0]) = 2; 
});
+    sp->enable_processing();
+
+    std::shared_ptr<VFileScanner> scanner = nullptr;
+    generate_scanner(scanner);
+    std::unique_ptr<vectorized::Block> block(new vectorized::Block());
+    bool eof = false;
+    auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
+    ASSERT_FALSE(st.ok());
+    auto msg = st.to_string();
+    auto pos = msg.find("not equal");
+    ASSERT_TRUE(pos != msg.npos);
+    WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
 }
 
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to