This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit f1111fd03066dbc702c8dcd686ca0c4bd4765c57 Author: AlexYue <[email protected]> AuthorDate: Mon Apr 22 19:54:09 2024 +0800 [case](Cloud) Add chaos storage vault cases (#33730) --- be/src/cloud/injection_point_action.cpp | 56 +++++++++- be/src/io/fs/hdfs_file_reader.cpp | 5 + be/src/io/fs/hdfs_file_writer.cpp | 18 +++- .../inject_hdfs_load_error.groovy | 116 +++++++++++++++++++++ .../inject_hdfs_select_error.groovy | 104 ++++++++++++++++++ 5 files changed, 293 insertions(+), 6 deletions(-) diff --git a/be/src/cloud/injection_point_action.cpp b/be/src/cloud/injection_point_action.cpp index adcc6cad708..d99dcfd534d 100644 --- a/be/src/cloud/injection_point_action.cpp +++ b/be/src/cloud/injection_point_action.cpp @@ -45,11 +45,57 @@ void register_suites() { sp->set_call_back("new_cumulative_point", [](auto&& args) { auto output_rowset = try_any_cast<Rowset*>(args[0]); auto last_cumulative_point = try_any_cast<int64_t>(args[1]); - auto pair = try_any_cast<std::pair<int64_t, bool>*>(args.back()); - pair->first = output_rowset->start_version() == last_cumulative_point - ? output_rowset->end_version() + 1 - : last_cumulative_point; - pair->second = true; + auto& [ret_vault, should_ret] = *try_any_cast<std::pair<int64_t, bool>*>(args.back()); + ret_vault = output_rowset->start_version() == last_cumulative_point + ? output_rowset->end_version() + 1 + : last_cumulative_point; + should_ret = true; + }); + }); + suite_map.emplace("test_s3_file_writer", [] { + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache", [](auto&&) { + std::srand(static_cast<unsigned int>(std::time(nullptr))); + int random_sleep_time_second = std::rand() % 10 + 1; + std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second)); + }); + sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache_inject", [](auto&& args) { + auto& [ret_status, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); + ret_status = + Status::IOError<false>("failed to write into file cache due to inject error"); + should_ret = true; + }); + }); + suite_map.emplace("test_storage_vault", [] { + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("HdfsFileWriter::append_hdfs_file_delay", [](auto&&) { + std::srand(static_cast<unsigned int>(std::time(nullptr))); + int random_sleep_time_second = std::rand() % 10 + 1; + std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second)); + }); + sp->set_call_back("HdfsFileWriter::append_hdfs_file_error", [](auto&& args) { + auto& [_, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); + should_ret = true; + }); + sp->set_call_back("HdfsFileWriter::hdfsFlush", [](auto&& args) { + auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); + ret_value = Status::InternalError("failed to flush hdfs file"); + should_ret = true; + }); + sp->set_call_back("HdfsFileWriter::hdfsCloseFile", [](auto&& args) { + auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); + ret_value = Status::InternalError("failed to flush hdfs file"); + should_ret = true; + }); + sp->set_call_back("HdfsFileWriter::hdfeSync", [](auto&& args) { + auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); + ret_value = Status::InternalError("failed to flush hdfs file"); + should_ret = true; + }); + sp->set_call_back("HdfsFileReader:read_error", [](auto&& args) { + auto& [ret_status, should_ret] = *try_any_cast<std::pair<Status, bool>*>(args.back()); + ret_status = Status::InternalError("read hdfs error"); + should_ret = true; }); }); } diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 45be6ffd60a..358663b65d0 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -26,6 +26,7 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/logging.h" +#include "common/sync_point.h" #include "io/fs/err_utils.h" #include "io/hdfs_util.h" #include "service/backend_options.h" @@ -126,6 +127,10 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r while (has_read < bytes_req) { tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + has_read, to + has_read, bytes_req - has_read); + { + [[maybe_unused]] Status error_ret; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileReader:read_error", error_ret); + } if (loop_read < 0) { // invoker maybe just skip Status.NotFound and continue // so we need distinguish between it and other kinds of errors diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index 6f9105a89e7..b2ea4dace94 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -27,6 +27,7 @@ #include "common/logging.h" #include "common/status.h" +#include "common/sync_point.h" #include "io/cache/block_file_cache.h" #include "io/cache/block_file_cache_factory.h" #include "io/cache/file_cache_common.h" @@ -96,6 +97,8 @@ Status HdfsFileWriter::close() { ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); #endif } + TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfeSync", + Status::InternalError("failed to sync hdfs file")); if (ret != 0) { return Status::InternalError( @@ -105,13 +108,16 @@ Status HdfsFileWriter::close() { } { - SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency); + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for // the HDFS response, but won't guarantee the synchronization of data to HDFS. ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); } _hdfs_file = nullptr; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsCloseFile", + Status::InternalError("failed to close hdfs file")); if (ret != 0) { + std::string err_msg = hdfs_error(); return Status::InternalError( "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}, file_size={}", BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error(), @@ -182,9 +188,17 @@ Status HdfsFileWriter::append_hdfs_file(std::string_view content) { while (!content.empty()) { int64_t written_bytes; { + TEST_INJECTION_POINT_CALLBACK("HdfsFileWriter::append_hdfs_file_delay"); SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency); written_bytes = hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, content.data(), content.size()); + { + [[maybe_unused]] Status error_ret = Status::InternalError( + "write hdfs failed. fs_name: {}, path: {}, error: size exceeds", _fs_name, + _path.native()); + TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::append_hdfs_file_error", + error_ret); + } } if (written_bytes < 0) { return Status::InternalError( @@ -257,6 +271,8 @@ Status HdfsFileWriter::finalize() { // Flush buffered data to HDFS without waiting for HDFS response int ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file); + TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsFlush", + Status::InternalError("failed to flush hdfs file")); if (ret != 0) { return Status::InternalError( "failed to flush hdfs file. fs_name={} path={} : {}, file_size={}", _fs_name, diff --git a/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_load_error.groovy b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_load_error.groovy new file mode 100755 index 00000000000..df77a6c69c1 --- /dev/null +++ b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_load_error.groovy @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("inject_hdfs_load_error") { + if (!enableStoragevault()) { + logger.info("skip create storgage vault case") + return + } + + sql """ + CREATE STORAGE VAULT IF NOT EXISTS inject_hdfs_load_error + PROPERTIES ( + "type"="hdfs", + "fs.defaultFS"="${getHdfsFs()}", + "path_prefix" = "inject_hdfs_load_error" + ); + """ + + try { + String backendId; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backendId = backendId_to_backendIP.keySet()[0] + def be_host = backendId_to_backendIP.get(backendId) + def be_http_port = backendId_to_backendHttpPort.get(backendId) + + def tableName = "inject_load_error_table" + + // op = {set, apply_suite, clear}, behavior = {sleep, return, return_ok, return_error} + // name = {point_name}, code = {return_error return code}, duration = {sleep duration} + // value is code for return_error, duration for sleep + // internal error is 6 + def triggerInject = { name, op, behavior, value -> + // trigger compactions for all tablets in ${tableName} + StringBuilder sb = new StringBuilder(); + // /api/injection_point/{op}/{name} + sb.append("curl -X POST http://${be_host}:${be_http_port}") + sb.append("/api/injection_point/${op}/${name}/${behavior}/${value}") + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + out = process.getText() + logger.info("enable inject, op ${op}, name ${name}, behavior ${behavior}, value ${value}") + assertEquals(code, 0) + return out + } + + sql """ DROP TABLE IF EXISTS ${tableName}; """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + C_CUSTKEY INTEGER NOT NULL, + C_NAME INTEGER NOT NULL + ) + DUPLICATE KEY(C_CUSTKEY, C_NAME) + DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", "storage_vault_name" = "inject_hdfs_load_error" + ) + """ + + def load_table = { cnt -> + for (int i = 0; i < cnt; i++) { + sql """ + insert into ${tableName} values(${i}, '${i}'); + """ + } + } + + for (int j = 1; j < 5; j++) { + triggerInject("HdfsFileWriter::hdfsFlush", "set", "return_error", 6) + expectExceptionLike({ + load_table(j) + }, "failed to flush hdfs file") + triggerInject("HdfsFileWriter::hdfsFlush", "clear", "valid", 0) + } + + for (int j = 1; j < 5; j++) { + triggerInject("HdfsFileWriter::hdfsCloseFile", "set", "return_error", 6) + expectExceptionLike({ + load_table(j) + }, "Write hdfs file failed") + triggerInject("HdfsFileWriter::hdfsCloseFile", "clear", "valid", 0) + } + + for (int j = 1; j < 5; j++) { + triggerInject("HdfsFileWriter::append_hdfs_file_error", "set", "return_error", 6) + expectExceptionLike({ + load_table(j) + }, "write hdfs file failed") + triggerInject("HdfsFileWriter::append_hdfs_file_error", "clear", "valid", 0) + } + + } finally { + } +} \ No newline at end of file diff --git a/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_select_error.groovy b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_select_error.groovy new file mode 100755 index 00000000000..3b41e78834a --- /dev/null +++ b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_select_error.groovy @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("inject_hdfs_load_error") { + if (!enableStoragevault()) { + logger.info("skip create storgage vault case") + return + } + + sql """ + CREATE STORAGE VAULT IF NOT EXISTS inject_hdfs_select_error + PROPERTIES ( + "type"="hdfs", + "fs.defaultFS"="${getHdfsFs()}", + "path_prefix" = "inject_hdfs_select_error" + ); + """ + + try { + String backendId; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backendId = backendId_to_backendIP.keySet()[0] + def be_host = backendId_to_backendIP.get(backendId) + def be_http_port = backendId_to_backendHttpPort.get(backendId) + + def tableName = "inject_select_error_table" + + // op = {set, apply_suite, clear}, behavior = {sleep, return, return_ok, return_error} + // name = {point_name}, code = {return_error return code}, duration = {sleep duration} + // value is code for return_error, duration for sleep + // internal error is 6 + def triggerInject = { name, op, behavior, value -> + // trigger compactions for all tablets in ${tableName} + StringBuilder sb = new StringBuilder(); + // /api/injection_point/{op}/{name} + sb.append("curl -X POST http://${be_host}:${be_http_port}") + sb.append("/api/injection_point/${op}/${name}/${behavior}/${value}") + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + out = process.getText() + logger.info("enable inject, op ${op}, name ${name}, behavior ${behavior}, value ${value}") + assertEquals(code, 0) + return out + } + + sql """ DROP TABLE IF EXISTS ${tableName}; """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + C_CUSTKEY INTEGER NOT NULL, + C_NAME INTEGER NOT NULL + ) + DUPLICATE KEY(C_CUSTKEY, C_NAME) + DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", "storage_vault_name" = "inject_hdfs_select_error" + ) + """ + + def load_table = { cnt -> + for (int i = 0; i < cnt; i++) { + sql """ + insert into ${tableName} values(${i}, '${i}'); + """ + } + } + + for (int j = 1; j <10; j++) { + load_table(j) + } + + for (int j = 1; j < 5; j++) { + triggerInject("HdfsFileReader:read_error", "set", "return_error", 6) + expectExceptionLike({ + load_table(j) + }, "Read hdfs file failed") + triggerInject("HdfsFileReader:read_error", "clear", "valid", 0) + } + + } finally { + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
