github-actions[bot] commented on code in PR #23053: URL: https://github.com/apache/doris/pull/23053#discussion_r1319883637
########## be/test/olap/wal_manager_test.cpp: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "olap/wal_manager.h" + +#include <gtest/gtest.h> + +#include <filesystem> +#include <map> +#include <string> +#include <vector> + +#include "common/config.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "gen_cpp/internal_service.pb.h" +#include "io/fs/local_file_system.h" +#include "runtime/decimalv2_value.h" +#include "runtime/exec_env.h" +#include "runtime/result_queue_mgr.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/types.h" +#include "service/brpc.h" +#include "util/brpc_client_cache.h" +#include "util/cpu_info.h" +#include "util/debug/leakcheck_disabler.h" +#include "util/proto_util.h" + +namespace doris { + +extern TCheckWalResult k_check_wal_result; +extern TLoadTxnBeginResult k_stream_load_begin_result; +extern Status k_stream_load_plan_status; +extern std::string k_response; +extern std::string k_request_line; + +class WalManagerTest : public testing::Test { +public: + WalManagerTest() {} + virtual ~WalManagerTest() {} + void SetUp() override { + prepare(); + _env = ExecEnv::GetInstance(); + _env->_master_info = new TMasterInfo(); + _env->_master_info->network_address.hostname = "host name"; + _env->_master_info->network_address.port = 1234; + _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); + _env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); + _env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); + _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_wal_manager = WalManager::create_shared(_env, wal_dir); + k_check_wal_result = TCheckWalResult(); + k_stream_load_begin_result = TLoadTxnBeginResult(); + k_stream_load_plan_status = Status::OK(); + } + void TearDown() override { + io::global_local_filesystem()->delete_directory(wal_dir); + SAFE_DELETE(_env->_function_client_cache); + SAFE_DELETE(_env->_internal_client_cache); + SAFE_DELETE(_env->_master_info); + } + + void prepare() { io::global_local_filesystem()->create_directory(wal_dir); } + + void createWal(const std::string& wal_path) { + auto wal_writer = WalWriter(wal_path); + wal_writer.init(); + wal_writer.finalize(); + } + +private: + ExecEnv* _env = nullptr; + std::string wal_dir = "./wal_test"; + std::string tmp_dir = "./wal_test/tmp"; +}; + +TEST_F(WalManagerTest, recovery_normal) { + k_response = "OK"; + k_request_line = "\"Status\": \"Success\",\n"; + k_check_wal_result.__set_need_recovery(true); + + std::string db_id = "1"; + std::string tb_1_id = "1"; + std::string wal_100_id = "100"; + std::string wal_101_id = "101"; + std::string tb_2_id = "2"; + std::string wal_200_id = "200"; + std::string wal_201_id = "201"; + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_1_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_100_id; + std::string wal_101 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_101_id; Review Comment: warning: variable 'wal_101' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string wal_101 = 0 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_101_id; ``` ########## be/test/olap/wal_manager_test.cpp: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "olap/wal_manager.h" + +#include <gtest/gtest.h> + +#include <filesystem> +#include <map> +#include <string> +#include <vector> + +#include "common/config.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "gen_cpp/internal_service.pb.h" +#include "io/fs/local_file_system.h" +#include "runtime/decimalv2_value.h" +#include "runtime/exec_env.h" +#include "runtime/result_queue_mgr.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/types.h" +#include "service/brpc.h" +#include "util/brpc_client_cache.h" +#include "util/cpu_info.h" +#include "util/debug/leakcheck_disabler.h" +#include "util/proto_util.h" + +namespace doris { + +extern TCheckWalResult k_check_wal_result; +extern TLoadTxnBeginResult k_stream_load_begin_result; +extern Status k_stream_load_plan_status; +extern std::string k_response; +extern std::string k_request_line; + +class WalManagerTest : public testing::Test { +public: + WalManagerTest() {} + virtual ~WalManagerTest() {} + void SetUp() override { + prepare(); + _env = ExecEnv::GetInstance(); + _env->_master_info = new TMasterInfo(); + _env->_master_info->network_address.hostname = "host name"; + _env->_master_info->network_address.port = 1234; + _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); + _env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); + _env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); + _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_wal_manager = WalManager::create_shared(_env, wal_dir); + k_check_wal_result = TCheckWalResult(); + k_stream_load_begin_result = TLoadTxnBeginResult(); + k_stream_load_plan_status = Status::OK(); + } + void TearDown() override { + io::global_local_filesystem()->delete_directory(wal_dir); + SAFE_DELETE(_env->_function_client_cache); + SAFE_DELETE(_env->_internal_client_cache); + SAFE_DELETE(_env->_master_info); + } + + void prepare() { io::global_local_filesystem()->create_directory(wal_dir); } + + void createWal(const std::string& wal_path) { + auto wal_writer = WalWriter(wal_path); + wal_writer.init(); + wal_writer.finalize(); + } + +private: + ExecEnv* _env = nullptr; + std::string wal_dir = "./wal_test"; + std::string tmp_dir = "./wal_test/tmp"; +}; + +TEST_F(WalManagerTest, recovery_normal) { + k_response = "OK"; + k_request_line = "\"Status\": \"Success\",\n"; + k_check_wal_result.__set_need_recovery(true); + + std::string db_id = "1"; + std::string tb_1_id = "1"; + std::string wal_100_id = "100"; + std::string wal_101_id = "101"; + std::string tb_2_id = "2"; + std::string wal_200_id = "200"; + std::string wal_201_id = "201"; + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_1_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_100_id; Review Comment: warning: variable 'wal_100' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string wal_100 = 0 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_100_id; ``` ########## be/test/olap/wal_reader_writer_test.cpp: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> + +#include <filesystem> + +#include "agent/be_exec_version_manager.h" +#include "common/object_pool.h" +#include "gen_cpp/internal_service.pb.h" +#include "gmock/gmock.h" +#include "io/fs/local_file_system.h" +#include "olap/wal_reader.h" +#include "olap/wal_writer.h" +#include "runtime/exec_env.h" +#include "service/brpc.h" +#include "testutil/test_util.h" +#include "util/proto_util.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" +#include "vec/runtime/vdata_stream_mgr.h" +#include "vec/runtime/vdata_stream_recvr.h" + +using ::testing::_; +using ::testing::Return; +using ::testing::SetArgPointee; +using std::string; + +namespace doris { + +class WalReaderWriterTest : public testing::Test { +public: + // create a mock cgroup folder + virtual void SetUp() { io::global_local_filesystem()->create_directory(_s_test_data_path); } + + // delete the mock cgroup folder + virtual void TearDown() { io::global_local_filesystem()->delete_directory(_s_test_data_path); } + + static std::string _s_test_data_path; +}; + +std::string WalReaderWriterTest::_s_test_data_path = "./log/wal_reader_writer_test"; +size_t block_rows = 1024; + +void covert_block_to_pb( + const vectorized::Block& block, PBlock* pblock, + segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + Status st = block.serialize(BeExecVersionManager::get_newest_version(), pblock, Review Comment: warning: variable 'st' is not initialized [cppcoreguidelines-init-variables] ```suggestion Status st = 0 = block.serialize(BeExecVersionManager::get_newest_version(), pblock, ``` ########## be/test/olap/wal_manager_test.cpp: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "olap/wal_manager.h" + +#include <gtest/gtest.h> + +#include <filesystem> +#include <map> +#include <string> +#include <vector> + +#include "common/config.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "gen_cpp/internal_service.pb.h" +#include "io/fs/local_file_system.h" +#include "runtime/decimalv2_value.h" +#include "runtime/exec_env.h" +#include "runtime/result_queue_mgr.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/types.h" +#include "service/brpc.h" +#include "util/brpc_client_cache.h" +#include "util/cpu_info.h" +#include "util/debug/leakcheck_disabler.h" +#include "util/proto_util.h" + +namespace doris { + +extern TCheckWalResult k_check_wal_result; +extern TLoadTxnBeginResult k_stream_load_begin_result; +extern Status k_stream_load_plan_status; +extern std::string k_response; +extern std::string k_request_line; + +class WalManagerTest : public testing::Test { +public: + WalManagerTest() {} + virtual ~WalManagerTest() {} + void SetUp() override { + prepare(); + _env = ExecEnv::GetInstance(); + _env->_master_info = new TMasterInfo(); + _env->_master_info->network_address.hostname = "host name"; + _env->_master_info->network_address.port = 1234; + _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); + _env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); + _env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); + _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_wal_manager = WalManager::create_shared(_env, wal_dir); + k_check_wal_result = TCheckWalResult(); + k_stream_load_begin_result = TLoadTxnBeginResult(); + k_stream_load_plan_status = Status::OK(); + } + void TearDown() override { + io::global_local_filesystem()->delete_directory(wal_dir); + SAFE_DELETE(_env->_function_client_cache); + SAFE_DELETE(_env->_internal_client_cache); + SAFE_DELETE(_env->_master_info); + } + + void prepare() { io::global_local_filesystem()->create_directory(wal_dir); } + + void createWal(const std::string& wal_path) { + auto wal_writer = WalWriter(wal_path); + wal_writer.init(); + wal_writer.finalize(); + } + +private: + ExecEnv* _env = nullptr; + std::string wal_dir = "./wal_test"; + std::string tmp_dir = "./wal_test/tmp"; +}; + +TEST_F(WalManagerTest, recovery_normal) { + k_response = "OK"; + k_request_line = "\"Status\": \"Success\",\n"; + k_check_wal_result.__set_need_recovery(true); + + std::string db_id = "1"; + std::string tb_1_id = "1"; + std::string wal_100_id = "100"; + std::string wal_101_id = "101"; + std::string tb_2_id = "2"; + std::string wal_200_id = "200"; + std::string wal_201_id = "201"; + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_1_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_100_id; + std::string wal_101 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_101_id; + createWal(wal_100); + createWal(wal_101); + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_2_id); + std::string wal_200 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_200_id; + std::string wal_201 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_201_id; + createWal(wal_200); + createWal(wal_201); + _env->wal_mgr()->init(); + + while (_env->_wal_manager->get_wal_table_size(tb_1_id) > 0 || + _env->_wal_manager->get_wal_table_size(tb_2_id) > 0) { + sleep(1); + continue; + } + ASSERT_TRUE(!std::filesystem::exists(wal_100)); + ASSERT_TRUE(!std::filesystem::exists(wal_101)); + ASSERT_TRUE(!std::filesystem::exists(wal_200)); + ASSERT_TRUE(!std::filesystem::exists(wal_201)); +} + +TEST_F(WalManagerTest, not_need_recovery) { + std::string db_id = "1"; + std::string tb_id = "1"; + std::string wal_id = "100"; + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_id + "/" + wal_id; Review Comment: warning: variable 'wal_100' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string wal_100 = 0 = wal_dir + "/" + db_id + "/" + tb_id + "/" + wal_id; ``` ########## be/test/olap/wal_manager_test.cpp: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "olap/wal_manager.h" + +#include <gtest/gtest.h> + +#include <filesystem> +#include <map> +#include <string> +#include <vector> + +#include "common/config.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "gen_cpp/internal_service.pb.h" +#include "io/fs/local_file_system.h" +#include "runtime/decimalv2_value.h" +#include "runtime/exec_env.h" +#include "runtime/result_queue_mgr.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/types.h" +#include "service/brpc.h" +#include "util/brpc_client_cache.h" +#include "util/cpu_info.h" +#include "util/debug/leakcheck_disabler.h" +#include "util/proto_util.h" + +namespace doris { + +extern TCheckWalResult k_check_wal_result; +extern TLoadTxnBeginResult k_stream_load_begin_result; +extern Status k_stream_load_plan_status; +extern std::string k_response; +extern std::string k_request_line; + +class WalManagerTest : public testing::Test { +public: + WalManagerTest() {} + virtual ~WalManagerTest() {} + void SetUp() override { + prepare(); + _env = ExecEnv::GetInstance(); + _env->_master_info = new TMasterInfo(); + _env->_master_info->network_address.hostname = "host name"; + _env->_master_info->network_address.port = 1234; + _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); + _env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); + _env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); + _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_wal_manager = WalManager::create_shared(_env, wal_dir); + k_check_wal_result = TCheckWalResult(); + k_stream_load_begin_result = TLoadTxnBeginResult(); + k_stream_load_plan_status = Status::OK(); + } + void TearDown() override { + io::global_local_filesystem()->delete_directory(wal_dir); + SAFE_DELETE(_env->_function_client_cache); + SAFE_DELETE(_env->_internal_client_cache); + SAFE_DELETE(_env->_master_info); + } + + void prepare() { io::global_local_filesystem()->create_directory(wal_dir); } + + void createWal(const std::string& wal_path) { + auto wal_writer = WalWriter(wal_path); + wal_writer.init(); + wal_writer.finalize(); + } + +private: + ExecEnv* _env = nullptr; + std::string wal_dir = "./wal_test"; + std::string tmp_dir = "./wal_test/tmp"; +}; + +TEST_F(WalManagerTest, recovery_normal) { + k_response = "OK"; + k_request_line = "\"Status\": \"Success\",\n"; + k_check_wal_result.__set_need_recovery(true); + + std::string db_id = "1"; + std::string tb_1_id = "1"; + std::string wal_100_id = "100"; + std::string wal_101_id = "101"; + std::string tb_2_id = "2"; + std::string wal_200_id = "200"; + std::string wal_201_id = "201"; + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_1_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_100_id; + std::string wal_101 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_101_id; + createWal(wal_100); + createWal(wal_101); + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_2_id); + std::string wal_200 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_200_id; + std::string wal_201 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_201_id; Review Comment: warning: variable 'wal_201' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string wal_201 = 0 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_201_id; ``` ########## be/test/olap/wal_reader_writer_test.cpp: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> + +#include <filesystem> + +#include "agent/be_exec_version_manager.h" +#include "common/object_pool.h" +#include "gen_cpp/internal_service.pb.h" +#include "gmock/gmock.h" +#include "io/fs/local_file_system.h" +#include "olap/wal_reader.h" +#include "olap/wal_writer.h" +#include "runtime/exec_env.h" +#include "service/brpc.h" +#include "testutil/test_util.h" +#include "util/proto_util.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" +#include "vec/runtime/vdata_stream_mgr.h" +#include "vec/runtime/vdata_stream_recvr.h" + +using ::testing::_; +using ::testing::Return; +using ::testing::SetArgPointee; +using std::string; + +namespace doris { + +class WalReaderWriterTest : public testing::Test { +public: + // create a mock cgroup folder + virtual void SetUp() { io::global_local_filesystem()->create_directory(_s_test_data_path); } + + // delete the mock cgroup folder + virtual void TearDown() { io::global_local_filesystem()->delete_directory(_s_test_data_path); } + + static std::string _s_test_data_path; +}; + +std::string WalReaderWriterTest::_s_test_data_path = "./log/wal_reader_writer_test"; +size_t block_rows = 1024; + +void covert_block_to_pb( + const vectorized::Block& block, PBlock* pblock, + segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + Status st = block.serialize(BeExecVersionManager::get_newest_version(), pblock, + &uncompressed_bytes, &compressed_bytes, compression_type); + EXPECT_TRUE(st.ok()); + EXPECT_TRUE(uncompressed_bytes >= compressed_bytes); + EXPECT_EQ(compressed_bytes, pblock->column_values().size()); + + const vectorized::ColumnWithTypeAndName& type_and_name = + block.get_columns_with_type_and_name()[0]; + EXPECT_EQ(type_and_name.name, pblock->column_metas()[0].name()); +} + +void generate_block(PBlock& pblock, int row_index) { + auto vec = vectorized::ColumnVector<int32_t>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < block_rows; ++i) { + data.push_back(i + row_index); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt32>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, "test_int"); + vectorized::Block block({type_and_name}); + covert_block_to_pb(block, &pblock, segment_v2::CompressionTypePB::SNAPPY); +} + +TEST_F(WalReaderWriterTest, TestWriteAndRead1) { + std::string file_name = _s_test_data_path + "/abcd123.txt"; + auto wal_writer = WalWriter(file_name); + wal_writer.init(); + size_t file_len = 0; + int64_t file_size = -1; + // add 1 block + { + PBlock pblock; + generate_block(pblock, 0); + + EXPECT_EQ(Status::OK(), wal_writer.append_blocks(std::vector<PBlock*> {&pblock})); + file_len += pblock.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + io::global_local_filesystem()->file_size(file_name, &file_size); + EXPECT_EQ(file_len, file_size); + } + // add 2 block + { + PBlock pblock; + generate_block(pblock, 1024); + file_len += pblock.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + + PBlock pblock1; + generate_block(pblock1, 2048); + file_len += pblock1.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + + EXPECT_EQ(Status::OK(), wal_writer.append_blocks(std::vector<PBlock*> {&pblock, &pblock1})); + io::global_local_filesystem()->file_size(file_name, &file_size); + EXPECT_EQ(file_len, file_size); + } + wal_writer.finalize(); + // read block + auto wal_reader = WalReader(file_name); + wal_reader.init(); + auto block_count = 0; + while (true) { + doris::PBlock pblock; Review Comment: warning: variable 'pblock' is not initialized [cppcoreguidelines-init-variables] ```suggestion doris::PBlock pblock = 0; ``` ########## be/test/olap/wal_manager_test.cpp: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "olap/wal_manager.h" + +#include <gtest/gtest.h> + +#include <filesystem> +#include <map> +#include <string> +#include <vector> + +#include "common/config.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "gen_cpp/internal_service.pb.h" +#include "io/fs/local_file_system.h" +#include "runtime/decimalv2_value.h" +#include "runtime/exec_env.h" +#include "runtime/result_queue_mgr.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/types.h" +#include "service/brpc.h" +#include "util/brpc_client_cache.h" +#include "util/cpu_info.h" +#include "util/debug/leakcheck_disabler.h" +#include "util/proto_util.h" + +namespace doris { + +extern TCheckWalResult k_check_wal_result; +extern TLoadTxnBeginResult k_stream_load_begin_result; +extern Status k_stream_load_plan_status; +extern std::string k_response; +extern std::string k_request_line; + +class WalManagerTest : public testing::Test { +public: + WalManagerTest() {} + virtual ~WalManagerTest() {} + void SetUp() override { + prepare(); + _env = ExecEnv::GetInstance(); + _env->_master_info = new TMasterInfo(); + _env->_master_info->network_address.hostname = "host name"; + _env->_master_info->network_address.port = 1234; + _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); + _env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); + _env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); + _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_wal_manager = WalManager::create_shared(_env, wal_dir); + k_check_wal_result = TCheckWalResult(); + k_stream_load_begin_result = TLoadTxnBeginResult(); + k_stream_load_plan_status = Status::OK(); + } + void TearDown() override { + io::global_local_filesystem()->delete_directory(wal_dir); + SAFE_DELETE(_env->_function_client_cache); + SAFE_DELETE(_env->_internal_client_cache); + SAFE_DELETE(_env->_master_info); + } + + void prepare() { io::global_local_filesystem()->create_directory(wal_dir); } + + void createWal(const std::string& wal_path) { + auto wal_writer = WalWriter(wal_path); + wal_writer.init(); + wal_writer.finalize(); + } + +private: + ExecEnv* _env = nullptr; + std::string wal_dir = "./wal_test"; + std::string tmp_dir = "./wal_test/tmp"; +}; + +TEST_F(WalManagerTest, recovery_normal) { + k_response = "OK"; + k_request_line = "\"Status\": \"Success\",\n"; + k_check_wal_result.__set_need_recovery(true); + + std::string db_id = "1"; + std::string tb_1_id = "1"; + std::string wal_100_id = "100"; + std::string wal_101_id = "101"; + std::string tb_2_id = "2"; + std::string wal_200_id = "200"; + std::string wal_201_id = "201"; + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_1_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_100_id; + std::string wal_101 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_101_id; + createWal(wal_100); + createWal(wal_101); + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_2_id); + std::string wal_200 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_200_id; Review Comment: warning: variable 'wal_200' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string wal_200 = 0 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_200_id; ``` ########## be/test/olap/wal_reader_writer_test.cpp: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> + +#include <filesystem> + +#include "agent/be_exec_version_manager.h" +#include "common/object_pool.h" +#include "gen_cpp/internal_service.pb.h" +#include "gmock/gmock.h" +#include "io/fs/local_file_system.h" +#include "olap/wal_reader.h" +#include "olap/wal_writer.h" +#include "runtime/exec_env.h" +#include "service/brpc.h" +#include "testutil/test_util.h" +#include "util/proto_util.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" +#include "vec/runtime/vdata_stream_mgr.h" +#include "vec/runtime/vdata_stream_recvr.h" + +using ::testing::_; +using ::testing::Return; +using ::testing::SetArgPointee; +using std::string; + +namespace doris { + +class WalReaderWriterTest : public testing::Test { +public: + // create a mock cgroup folder + virtual void SetUp() { io::global_local_filesystem()->create_directory(_s_test_data_path); } + + // delete the mock cgroup folder + virtual void TearDown() { io::global_local_filesystem()->delete_directory(_s_test_data_path); } + + static std::string _s_test_data_path; +}; + +std::string WalReaderWriterTest::_s_test_data_path = "./log/wal_reader_writer_test"; +size_t block_rows = 1024; + +void covert_block_to_pb( + const vectorized::Block& block, PBlock* pblock, + segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + Status st = block.serialize(BeExecVersionManager::get_newest_version(), pblock, + &uncompressed_bytes, &compressed_bytes, compression_type); + EXPECT_TRUE(st.ok()); + EXPECT_TRUE(uncompressed_bytes >= compressed_bytes); + EXPECT_EQ(compressed_bytes, pblock->column_values().size()); + + const vectorized::ColumnWithTypeAndName& type_and_name = + block.get_columns_with_type_and_name()[0]; + EXPECT_EQ(type_and_name.name, pblock->column_metas()[0].name()); +} + +void generate_block(PBlock& pblock, int row_index) { + auto vec = vectorized::ColumnVector<int32_t>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < block_rows; ++i) { + data.push_back(i + row_index); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt32>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, "test_int"); + vectorized::Block block({type_and_name}); + covert_block_to_pb(block, &pblock, segment_v2::CompressionTypePB::SNAPPY); +} + +TEST_F(WalReaderWriterTest, TestWriteAndRead1) { + std::string file_name = _s_test_data_path + "/abcd123.txt"; + auto wal_writer = WalWriter(file_name); + wal_writer.init(); + size_t file_len = 0; + int64_t file_size = -1; + // add 1 block + { + PBlock pblock; + generate_block(pblock, 0); + + EXPECT_EQ(Status::OK(), wal_writer.append_blocks(std::vector<PBlock*> {&pblock})); + file_len += pblock.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + io::global_local_filesystem()->file_size(file_name, &file_size); + EXPECT_EQ(file_len, file_size); + } + // add 2 block + { + PBlock pblock; + generate_block(pblock, 1024); + file_len += pblock.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + + PBlock pblock1; Review Comment: warning: variable 'pblock1' is not initialized [cppcoreguidelines-init-variables] ```suggestion PBlock pblock1 = 0; ``` ########## be/test/olap/wal_reader_writer_test.cpp: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> + +#include <filesystem> + +#include "agent/be_exec_version_manager.h" +#include "common/object_pool.h" +#include "gen_cpp/internal_service.pb.h" +#include "gmock/gmock.h" +#include "io/fs/local_file_system.h" +#include "olap/wal_reader.h" +#include "olap/wal_writer.h" +#include "runtime/exec_env.h" +#include "service/brpc.h" +#include "testutil/test_util.h" +#include "util/proto_util.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" +#include "vec/runtime/vdata_stream_mgr.h" +#include "vec/runtime/vdata_stream_recvr.h" + +using ::testing::_; +using ::testing::Return; +using ::testing::SetArgPointee; +using std::string; + +namespace doris { + +class WalReaderWriterTest : public testing::Test { +public: + // create a mock cgroup folder + virtual void SetUp() { io::global_local_filesystem()->create_directory(_s_test_data_path); } + + // delete the mock cgroup folder + virtual void TearDown() { io::global_local_filesystem()->delete_directory(_s_test_data_path); } + + static std::string _s_test_data_path; +}; + +std::string WalReaderWriterTest::_s_test_data_path = "./log/wal_reader_writer_test"; +size_t block_rows = 1024; + +void covert_block_to_pb( + const vectorized::Block& block, PBlock* pblock, + segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + Status st = block.serialize(BeExecVersionManager::get_newest_version(), pblock, + &uncompressed_bytes, &compressed_bytes, compression_type); + EXPECT_TRUE(st.ok()); + EXPECT_TRUE(uncompressed_bytes >= compressed_bytes); + EXPECT_EQ(compressed_bytes, pblock->column_values().size()); + + const vectorized::ColumnWithTypeAndName& type_and_name = + block.get_columns_with_type_and_name()[0]; + EXPECT_EQ(type_and_name.name, pblock->column_metas()[0].name()); +} + +void generate_block(PBlock& pblock, int row_index) { + auto vec = vectorized::ColumnVector<int32_t>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < block_rows; ++i) { + data.push_back(i + row_index); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt32>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, "test_int"); + vectorized::Block block({type_and_name}); + covert_block_to_pb(block, &pblock, segment_v2::CompressionTypePB::SNAPPY); +} + +TEST_F(WalReaderWriterTest, TestWriteAndRead1) { + std::string file_name = _s_test_data_path + "/abcd123.txt"; + auto wal_writer = WalWriter(file_name); + wal_writer.init(); + size_t file_len = 0; + int64_t file_size = -1; + // add 1 block + { + PBlock pblock; + generate_block(pblock, 0); + + EXPECT_EQ(Status::OK(), wal_writer.append_blocks(std::vector<PBlock*> {&pblock})); + file_len += pblock.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + io::global_local_filesystem()->file_size(file_name, &file_size); + EXPECT_EQ(file_len, file_size); + } + // add 2 block + { + PBlock pblock; + generate_block(pblock, 1024); + file_len += pblock.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + + PBlock pblock1; + generate_block(pblock1, 2048); + file_len += pblock1.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + + EXPECT_EQ(Status::OK(), wal_writer.append_blocks(std::vector<PBlock*> {&pblock, &pblock1})); + io::global_local_filesystem()->file_size(file_name, &file_size); + EXPECT_EQ(file_len, file_size); + } + wal_writer.finalize(); + // read block + auto wal_reader = WalReader(file_name); + wal_reader.init(); + auto block_count = 0; + while (true) { + doris::PBlock pblock; + Status st = wal_reader.read_block(pblock); + EXPECT_TRUE(st.ok() || st.is<ErrorCode::END_OF_FILE>()); + if (st.ok()) { + ++block_count; + } else if (st.is<ErrorCode::END_OF_FILE>()) { + break; + } + vectorized::Block block; Review Comment: warning: variable 'block' is not initialized [cppcoreguidelines-init-variables] ```suggestion vectorized::Block block = 0; ``` ########## be/test/olap/wal_reader_writer_test.cpp: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> + +#include <filesystem> + +#include "agent/be_exec_version_manager.h" +#include "common/object_pool.h" +#include "gen_cpp/internal_service.pb.h" +#include "gmock/gmock.h" +#include "io/fs/local_file_system.h" +#include "olap/wal_reader.h" +#include "olap/wal_writer.h" +#include "runtime/exec_env.h" +#include "service/brpc.h" +#include "testutil/test_util.h" +#include "util/proto_util.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" +#include "vec/runtime/vdata_stream_mgr.h" +#include "vec/runtime/vdata_stream_recvr.h" + +using ::testing::_; +using ::testing::Return; +using ::testing::SetArgPointee; +using std::string; + +namespace doris { + +class WalReaderWriterTest : public testing::Test { +public: + // create a mock cgroup folder + virtual void SetUp() { io::global_local_filesystem()->create_directory(_s_test_data_path); } + + // delete the mock cgroup folder + virtual void TearDown() { io::global_local_filesystem()->delete_directory(_s_test_data_path); } + + static std::string _s_test_data_path; +}; + +std::string WalReaderWriterTest::_s_test_data_path = "./log/wal_reader_writer_test"; +size_t block_rows = 1024; + +void covert_block_to_pb( + const vectorized::Block& block, PBlock* pblock, + segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + Status st = block.serialize(BeExecVersionManager::get_newest_version(), pblock, + &uncompressed_bytes, &compressed_bytes, compression_type); + EXPECT_TRUE(st.ok()); + EXPECT_TRUE(uncompressed_bytes >= compressed_bytes); + EXPECT_EQ(compressed_bytes, pblock->column_values().size()); + + const vectorized::ColumnWithTypeAndName& type_and_name = + block.get_columns_with_type_and_name()[0]; + EXPECT_EQ(type_and_name.name, pblock->column_metas()[0].name()); +} + +void generate_block(PBlock& pblock, int row_index) { + auto vec = vectorized::ColumnVector<int32_t>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < block_rows; ++i) { + data.push_back(i + row_index); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt32>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, "test_int"); + vectorized::Block block({type_and_name}); + covert_block_to_pb(block, &pblock, segment_v2::CompressionTypePB::SNAPPY); +} + +TEST_F(WalReaderWriterTest, TestWriteAndRead1) { + std::string file_name = _s_test_data_path + "/abcd123.txt"; + auto wal_writer = WalWriter(file_name); + wal_writer.init(); + size_t file_len = 0; + int64_t file_size = -1; + // add 1 block + { + PBlock pblock; + generate_block(pblock, 0); + + EXPECT_EQ(Status::OK(), wal_writer.append_blocks(std::vector<PBlock*> {&pblock})); + file_len += pblock.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + io::global_local_filesystem()->file_size(file_name, &file_size); + EXPECT_EQ(file_len, file_size); + } + // add 2 block + { + PBlock pblock; Review Comment: warning: variable 'pblock' is not initialized [cppcoreguidelines-init-variables] ```suggestion PBlock pblock = 0; ``` ########## be/test/olap/wal_reader_writer_test.cpp: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> + +#include <filesystem> + +#include "agent/be_exec_version_manager.h" +#include "common/object_pool.h" +#include "gen_cpp/internal_service.pb.h" +#include "gmock/gmock.h" +#include "io/fs/local_file_system.h" +#include "olap/wal_reader.h" +#include "olap/wal_writer.h" +#include "runtime/exec_env.h" +#include "service/brpc.h" +#include "testutil/test_util.h" +#include "util/proto_util.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" +#include "vec/runtime/vdata_stream_mgr.h" +#include "vec/runtime/vdata_stream_recvr.h" + +using ::testing::_; +using ::testing::Return; +using ::testing::SetArgPointee; +using std::string; + +namespace doris { + +class WalReaderWriterTest : public testing::Test { +public: + // create a mock cgroup folder + virtual void SetUp() { io::global_local_filesystem()->create_directory(_s_test_data_path); } + + // delete the mock cgroup folder + virtual void TearDown() { io::global_local_filesystem()->delete_directory(_s_test_data_path); } + + static std::string _s_test_data_path; +}; + +std::string WalReaderWriterTest::_s_test_data_path = "./log/wal_reader_writer_test"; +size_t block_rows = 1024; + +void covert_block_to_pb( + const vectorized::Block& block, PBlock* pblock, + segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + Status st = block.serialize(BeExecVersionManager::get_newest_version(), pblock, + &uncompressed_bytes, &compressed_bytes, compression_type); + EXPECT_TRUE(st.ok()); + EXPECT_TRUE(uncompressed_bytes >= compressed_bytes); + EXPECT_EQ(compressed_bytes, pblock->column_values().size()); + + const vectorized::ColumnWithTypeAndName& type_and_name = + block.get_columns_with_type_and_name()[0]; + EXPECT_EQ(type_and_name.name, pblock->column_metas()[0].name()); +} + +void generate_block(PBlock& pblock, int row_index) { + auto vec = vectorized::ColumnVector<int32_t>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < block_rows; ++i) { + data.push_back(i + row_index); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt32>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, "test_int"); + vectorized::Block block({type_and_name}); + covert_block_to_pb(block, &pblock, segment_v2::CompressionTypePB::SNAPPY); +} + +TEST_F(WalReaderWriterTest, TestWriteAndRead1) { + std::string file_name = _s_test_data_path + "/abcd123.txt"; + auto wal_writer = WalWriter(file_name); + wal_writer.init(); + size_t file_len = 0; + int64_t file_size = -1; + // add 1 block + { + PBlock pblock; + generate_block(pblock, 0); + + EXPECT_EQ(Status::OK(), wal_writer.append_blocks(std::vector<PBlock*> {&pblock})); + file_len += pblock.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + io::global_local_filesystem()->file_size(file_name, &file_size); + EXPECT_EQ(file_len, file_size); + } + // add 2 block + { + PBlock pblock; + generate_block(pblock, 1024); + file_len += pblock.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + + PBlock pblock1; + generate_block(pblock1, 2048); + file_len += pblock1.ByteSizeLong() + WalWriter::LENGTH_SIZE + WalWriter::CHECKSUM_SIZE; + + EXPECT_EQ(Status::OK(), wal_writer.append_blocks(std::vector<PBlock*> {&pblock, &pblock1})); + io::global_local_filesystem()->file_size(file_name, &file_size); + EXPECT_EQ(file_len, file_size); + } + wal_writer.finalize(); + // read block + auto wal_reader = WalReader(file_name); + wal_reader.init(); + auto block_count = 0; + while (true) { + doris::PBlock pblock; + Status st = wal_reader.read_block(pblock); Review Comment: warning: variable 'st' is not initialized [cppcoreguidelines-init-variables] ```suggestion Status st = 0 = wal_reader.read_block(pblock); ``` ########## be/test/olap/wal_manager_test.cpp: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "olap/wal_manager.h" + +#include <gtest/gtest.h> + +#include <filesystem> +#include <map> +#include <string> +#include <vector> + +#include "common/config.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "gen_cpp/internal_service.pb.h" +#include "io/fs/local_file_system.h" +#include "runtime/decimalv2_value.h" +#include "runtime/exec_env.h" +#include "runtime/result_queue_mgr.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/types.h" +#include "service/brpc.h" +#include "util/brpc_client_cache.h" +#include "util/cpu_info.h" +#include "util/debug/leakcheck_disabler.h" +#include "util/proto_util.h" + +namespace doris { + +extern TCheckWalResult k_check_wal_result; +extern TLoadTxnBeginResult k_stream_load_begin_result; +extern Status k_stream_load_plan_status; +extern std::string k_response; +extern std::string k_request_line; + +class WalManagerTest : public testing::Test { +public: + WalManagerTest() {} + virtual ~WalManagerTest() {} + void SetUp() override { + prepare(); + _env = ExecEnv::GetInstance(); + _env->_master_info = new TMasterInfo(); + _env->_master_info->network_address.hostname = "host name"; + _env->_master_info->network_address.port = 1234; + _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); + _env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); + _env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); + _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_wal_manager = WalManager::create_shared(_env, wal_dir); + k_check_wal_result = TCheckWalResult(); + k_stream_load_begin_result = TLoadTxnBeginResult(); + k_stream_load_plan_status = Status::OK(); + } + void TearDown() override { + io::global_local_filesystem()->delete_directory(wal_dir); + SAFE_DELETE(_env->_function_client_cache); + SAFE_DELETE(_env->_internal_client_cache); + SAFE_DELETE(_env->_master_info); + } + + void prepare() { io::global_local_filesystem()->create_directory(wal_dir); } + + void createWal(const std::string& wal_path) { + auto wal_writer = WalWriter(wal_path); + wal_writer.init(); + wal_writer.finalize(); + } + +private: + ExecEnv* _env = nullptr; + std::string wal_dir = "./wal_test"; + std::string tmp_dir = "./wal_test/tmp"; +}; + +TEST_F(WalManagerTest, recovery_normal) { + k_response = "OK"; + k_request_line = "\"Status\": \"Success\",\n"; + k_check_wal_result.__set_need_recovery(true); + + std::string db_id = "1"; + std::string tb_1_id = "1"; + std::string wal_100_id = "100"; + std::string wal_101_id = "101"; + std::string tb_2_id = "2"; + std::string wal_200_id = "200"; + std::string wal_201_id = "201"; + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_1_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_100_id; + std::string wal_101 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_101_id; + createWal(wal_100); + createWal(wal_101); + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_2_id); + std::string wal_200 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_200_id; + std::string wal_201 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_201_id; + createWal(wal_200); + createWal(wal_201); + _env->wal_mgr()->init(); + + while (_env->_wal_manager->get_wal_table_size(tb_1_id) > 0 || + _env->_wal_manager->get_wal_table_size(tb_2_id) > 0) { + sleep(1); + continue; + } + ASSERT_TRUE(!std::filesystem::exists(wal_100)); + ASSERT_TRUE(!std::filesystem::exists(wal_101)); + ASSERT_TRUE(!std::filesystem::exists(wal_200)); + ASSERT_TRUE(!std::filesystem::exists(wal_201)); +} + +TEST_F(WalManagerTest, not_need_recovery) { + std::string db_id = "1"; + std::string tb_id = "1"; + std::string wal_id = "100"; + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_id + "/" + wal_id; + createWal(wal_100); + + _env->wal_mgr()->init(); + + while (_env->_wal_manager->get_wal_table_size("1") > 0) { + sleep(1); + continue; + } + ASSERT_TRUE(!std::filesystem::exists(wal_100)); +} + +TEST_F(WalManagerTest, recover_fail) { + k_request_line = "\"Status\": \"Fail\",\n"; + k_check_wal_result.__set_need_recovery(true); + config::group_commit_replay_wal_retry_num = 3; + + std::string db_id = "1"; + std::string tb_id = "1"; + std::string wal_id = "100"; + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_id + "/" + wal_id; Review Comment: warning: variable 'wal_100' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string wal_100 = 0 = wal_dir + "/" + db_id + "/" + tb_id + "/" + wal_id; ``` ########## be/test/olap/wal_reader_writer_test.cpp: ########## @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <gtest/gtest.h> + +#include <filesystem> + +#include "agent/be_exec_version_manager.h" +#include "common/object_pool.h" +#include "gen_cpp/internal_service.pb.h" +#include "gmock/gmock.h" +#include "io/fs/local_file_system.h" +#include "olap/wal_reader.h" +#include "olap/wal_writer.h" +#include "runtime/exec_env.h" +#include "service/brpc.h" +#include "testutil/test_util.h" +#include "util/proto_util.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" +#include "vec/runtime/vdata_stream_mgr.h" +#include "vec/runtime/vdata_stream_recvr.h" + +using ::testing::_; +using ::testing::Return; +using ::testing::SetArgPointee; +using std::string; + +namespace doris { + +class WalReaderWriterTest : public testing::Test { +public: + // create a mock cgroup folder + virtual void SetUp() { io::global_local_filesystem()->create_directory(_s_test_data_path); } + + // delete the mock cgroup folder + virtual void TearDown() { io::global_local_filesystem()->delete_directory(_s_test_data_path); } + + static std::string _s_test_data_path; +}; + +std::string WalReaderWriterTest::_s_test_data_path = "./log/wal_reader_writer_test"; +size_t block_rows = 1024; + +void covert_block_to_pb( + const vectorized::Block& block, PBlock* pblock, + segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + Status st = block.serialize(BeExecVersionManager::get_newest_version(), pblock, + &uncompressed_bytes, &compressed_bytes, compression_type); + EXPECT_TRUE(st.ok()); + EXPECT_TRUE(uncompressed_bytes >= compressed_bytes); + EXPECT_EQ(compressed_bytes, pblock->column_values().size()); + + const vectorized::ColumnWithTypeAndName& type_and_name = + block.get_columns_with_type_and_name()[0]; + EXPECT_EQ(type_and_name.name, pblock->column_metas()[0].name()); +} + +void generate_block(PBlock& pblock, int row_index) { + auto vec = vectorized::ColumnVector<int32_t>::create(); + auto& data = vec->get_data(); + for (int i = 0; i < block_rows; ++i) { + data.push_back(i + row_index); + } + vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt32>()); + vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, "test_int"); + vectorized::Block block({type_and_name}); + covert_block_to_pb(block, &pblock, segment_v2::CompressionTypePB::SNAPPY); +} + +TEST_F(WalReaderWriterTest, TestWriteAndRead1) { + std::string file_name = _s_test_data_path + "/abcd123.txt"; + auto wal_writer = WalWriter(file_name); + wal_writer.init(); + size_t file_len = 0; + int64_t file_size = -1; + // add 1 block + { + PBlock pblock; Review Comment: warning: variable 'pblock' is not initialized [cppcoreguidelines-init-variables] ```suggestion PBlock pblock = 0; ``` ########## be/test/olap/wal_manager_test.cpp: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "olap/wal_manager.h" + +#include <gtest/gtest.h> + +#include <filesystem> +#include <map> +#include <string> +#include <vector> + +#include "common/config.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "gen_cpp/internal_service.pb.h" +#include "io/fs/local_file_system.h" +#include "runtime/decimalv2_value.h" +#include "runtime/exec_env.h" +#include "runtime/result_queue_mgr.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/types.h" +#include "service/brpc.h" +#include "util/brpc_client_cache.h" +#include "util/cpu_info.h" +#include "util/debug/leakcheck_disabler.h" +#include "util/proto_util.h" + +namespace doris { + +extern TCheckWalResult k_check_wal_result; +extern TLoadTxnBeginResult k_stream_load_begin_result; +extern Status k_stream_load_plan_status; +extern std::string k_response; +extern std::string k_request_line; + +class WalManagerTest : public testing::Test { +public: + WalManagerTest() {} + virtual ~WalManagerTest() {} + void SetUp() override { + prepare(); + _env = ExecEnv::GetInstance(); + _env->_master_info = new TMasterInfo(); + _env->_master_info->network_address.hostname = "host name"; + _env->_master_info->network_address.port = 1234; + _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); + _env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); + _env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); + _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_wal_manager = WalManager::create_shared(_env, wal_dir); + k_check_wal_result = TCheckWalResult(); + k_stream_load_begin_result = TLoadTxnBeginResult(); + k_stream_load_plan_status = Status::OK(); + } + void TearDown() override { + io::global_local_filesystem()->delete_directory(wal_dir); + SAFE_DELETE(_env->_function_client_cache); + SAFE_DELETE(_env->_internal_client_cache); + SAFE_DELETE(_env->_master_info); + } + + void prepare() { io::global_local_filesystem()->create_directory(wal_dir); } + + void createWal(const std::string& wal_path) { + auto wal_writer = WalWriter(wal_path); + wal_writer.init(); + wal_writer.finalize(); + } + +private: + ExecEnv* _env = nullptr; + std::string wal_dir = "./wal_test"; + std::string tmp_dir = "./wal_test/tmp"; +}; + +TEST_F(WalManagerTest, recovery_normal) { + k_response = "OK"; + k_request_line = "\"Status\": \"Success\",\n"; + k_check_wal_result.__set_need_recovery(true); + + std::string db_id = "1"; + std::string tb_1_id = "1"; + std::string wal_100_id = "100"; + std::string wal_101_id = "101"; + std::string tb_2_id = "2"; + std::string wal_200_id = "200"; + std::string wal_201_id = "201"; + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_1_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_100_id; + std::string wal_101 = wal_dir + "/" + db_id + "/" + tb_1_id + "/" + wal_101_id; + createWal(wal_100); + createWal(wal_101); + + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_2_id); + std::string wal_200 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_200_id; + std::string wal_201 = wal_dir + "/" + db_id + "/" + tb_2_id + "/" + wal_201_id; + createWal(wal_200); + createWal(wal_201); + _env->wal_mgr()->init(); + + while (_env->_wal_manager->get_wal_table_size(tb_1_id) > 0 || + _env->_wal_manager->get_wal_table_size(tb_2_id) > 0) { + sleep(1); + continue; + } + ASSERT_TRUE(!std::filesystem::exists(wal_100)); + ASSERT_TRUE(!std::filesystem::exists(wal_101)); + ASSERT_TRUE(!std::filesystem::exists(wal_200)); + ASSERT_TRUE(!std::filesystem::exists(wal_201)); +} + +TEST_F(WalManagerTest, not_need_recovery) { + std::string db_id = "1"; + std::string tb_id = "1"; + std::string wal_id = "100"; + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_id + "/" + wal_id; + createWal(wal_100); + + _env->wal_mgr()->init(); + + while (_env->_wal_manager->get_wal_table_size("1") > 0) { + sleep(1); + continue; + } + ASSERT_TRUE(!std::filesystem::exists(wal_100)); +} + +TEST_F(WalManagerTest, recover_fail) { + k_request_line = "\"Status\": \"Fail\",\n"; + k_check_wal_result.__set_need_recovery(true); + config::group_commit_replay_wal_retry_num = 3; + + std::string db_id = "1"; + std::string tb_id = "1"; + std::string wal_id = "100"; + std::filesystem::create_directory(wal_dir + "/" + db_id); + std::filesystem::create_directory(wal_dir + "/" + db_id + "/" + tb_id); + std::string wal_100 = wal_dir + "/" + db_id + "/" + tb_id + "/" + wal_id; + createWal(wal_100); + + _env->wal_mgr()->init(); + + while (_env->_wal_manager->get_wal_table_size("1") > 0) { + sleep(1); + continue; + } + std::string tmp_file = tmp_dir + "/" + db_id + "_" + tb_id + "_" + wal_id; Review Comment: warning: variable 'tmp_file' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::string tmp_file = 0 = tmp_dir + "/" + db_id + "_" + tb_id + "_" + wal_id; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
