This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0f65fd0cd02 [case](Cloud) Add chaos storage vault cases (#33730)
0f65fd0cd02 is described below
commit 0f65fd0cd026328dd19f9c387890867cfde8577a
Author: AlexYue <[email protected]>
AuthorDate: Mon Apr 22 19:54:09 2024 +0800
[case](Cloud) Add chaos storage vault cases (#33730)
---
be/src/cloud/injection_point_action.cpp | 56 +++++++++-
be/src/io/fs/hdfs_file_reader.cpp | 5 +
be/src/io/fs/hdfs_file_writer.cpp | 18 +++-
.../inject_hdfs_load_error.groovy | 116 +++++++++++++++++++++
.../inject_hdfs_select_error.groovy | 104 ++++++++++++++++++
5 files changed, 293 insertions(+), 6 deletions(-)
diff --git a/be/src/cloud/injection_point_action.cpp
b/be/src/cloud/injection_point_action.cpp
index adcc6cad708..d99dcfd534d 100644
--- a/be/src/cloud/injection_point_action.cpp
+++ b/be/src/cloud/injection_point_action.cpp
@@ -45,11 +45,57 @@ void register_suites() {
sp->set_call_back("new_cumulative_point", [](auto&& args) {
auto output_rowset = try_any_cast<Rowset*>(args[0]);
auto last_cumulative_point = try_any_cast<int64_t>(args[1]);
- auto pair = try_any_cast<std::pair<int64_t, bool>*>(args.back());
- pair->first = output_rowset->start_version() ==
last_cumulative_point
- ? output_rowset->end_version() + 1
- : last_cumulative_point;
- pair->second = true;
+ auto& [ret_vault, should_ret] = *try_any_cast<std::pair<int64_t,
bool>*>(args.back());
+ ret_vault = output_rowset->start_version() == last_cumulative_point
+ ? output_rowset->end_version() + 1
+ : last_cumulative_point;
+ should_ret = true;
+ });
+ });
+ suite_map.emplace("test_s3_file_writer", [] {
+ auto* sp = SyncPoint::get_instance();
+ sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache",
[](auto&&) {
+ std::srand(static_cast<unsigned int>(std::time(nullptr)));
+ int random_sleep_time_second = std::rand() % 10 + 1;
+
std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second));
+ });
+
sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache_inject",
[](auto&& args) {
+ auto& [ret_status, should_ret] = *try_any_cast<std::pair<Status,
bool>*>(args.back());
+ ret_status =
+ Status::IOError<false>("failed to write into file cache
due to inject error");
+ should_ret = true;
+ });
+ });
+ suite_map.emplace("test_storage_vault", [] {
+ auto* sp = SyncPoint::get_instance();
+ sp->set_call_back("HdfsFileWriter::append_hdfs_file_delay", [](auto&&)
{
+ std::srand(static_cast<unsigned int>(std::time(nullptr)));
+ int random_sleep_time_second = std::rand() % 10 + 1;
+
std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second));
+ });
+ sp->set_call_back("HdfsFileWriter::append_hdfs_file_error", [](auto&&
args) {
+ auto& [_, should_ret] = *try_any_cast<std::pair<Status,
bool>*>(args.back());
+ should_ret = true;
+ });
+ sp->set_call_back("HdfsFileWriter::hdfsFlush", [](auto&& args) {
+ auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status,
bool>*>(args.back());
+ ret_value = Status::InternalError("failed to flush hdfs file");
+ should_ret = true;
+ });
+ sp->set_call_back("HdfsFileWriter::hdfsCloseFile", [](auto&& args) {
+ auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status,
bool>*>(args.back());
+ ret_value = Status::InternalError("failed to flush hdfs file");
+ should_ret = true;
+ });
+ sp->set_call_back("HdfsFileWriter::hdfeSync", [](auto&& args) {
+ auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status,
bool>*>(args.back());
+ ret_value = Status::InternalError("failed to flush hdfs file");
+ should_ret = true;
+ });
+ sp->set_call_back("HdfsFileReader:read_error", [](auto&& args) {
+ auto& [ret_status, should_ret] = *try_any_cast<std::pair<Status,
bool>*>(args.back());
+ ret_status = Status::InternalError("read hdfs error");
+ should_ret = true;
});
});
}
diff --git a/be/src/io/fs/hdfs_file_reader.cpp
b/be/src/io/fs/hdfs_file_reader.cpp
index 45be6ffd60a..358663b65d0 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -26,6 +26,7 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
+#include "common/sync_point.h"
#include "io/fs/err_utils.h"
#include "io/hdfs_util.h"
#include "service/backend_options.h"
@@ -126,6 +127,10 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_r
while (has_read < bytes_req) {
tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset +
has_read,
to + has_read, bytes_req - has_read);
+ {
+ [[maybe_unused]] Status error_ret;
+
TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileReader:read_error", error_ret);
+ }
if (loop_read < 0) {
// invoker maybe just skip Status.NotFound and continue
// so we need distinguish between it and other kinds of errors
diff --git a/be/src/io/fs/hdfs_file_writer.cpp
b/be/src/io/fs/hdfs_file_writer.cpp
index 6f9105a89e7..b2ea4dace94 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -27,6 +27,7 @@
#include "common/logging.h"
#include "common/status.h"
+#include "common/sync_point.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_cache_common.h"
@@ -96,6 +97,8 @@ Status HdfsFileWriter::close() {
ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
#endif
}
+ TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfeSync",
+ Status::InternalError("failed
to sync hdfs file"));
if (ret != 0) {
return Status::InternalError(
@@ -105,13 +108,16 @@ Status HdfsFileWriter::close() {
}
{
- SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
+ SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
// The underlying implementation will invoke `hdfsHFlush` to flush
buffered data and wait for
// the HDFS response, but won't guarantee the synchronization of data
to HDFS.
ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
}
_hdfs_file = nullptr;
+ TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsCloseFile",
+ Status::InternalError("failed to
close hdfs file"));
if (ret != 0) {
+ std::string err_msg = hdfs_error();
return Status::InternalError(
"Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err:
{}, file_size={}",
BackendOptions::get_localhost(), _fs_name, _path.native(),
hdfs_error(),
@@ -182,9 +188,17 @@ Status HdfsFileWriter::append_hdfs_file(std::string_view
content) {
while (!content.empty()) {
int64_t written_bytes;
{
+
TEST_INJECTION_POINT_CALLBACK("HdfsFileWriter::append_hdfs_file_delay");
SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency);
written_bytes =
hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file,
content.data(), content.size());
+ {
+ [[maybe_unused]] Status error_ret = Status::InternalError(
+ "write hdfs failed. fs_name: {}, path: {}, error: size
exceeds", _fs_name,
+ _path.native());
+
TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::append_hdfs_file_error",
+ error_ret);
+ }
}
if (written_bytes < 0) {
return Status::InternalError(
@@ -257,6 +271,8 @@ Status HdfsFileWriter::finalize() {
// Flush buffered data to HDFS without waiting for HDFS response
int ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
+ TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsFlush",
+ Status::InternalError("failed to
flush hdfs file"));
if (ret != 0) {
return Status::InternalError(
"failed to flush hdfs file. fs_name={} path={} : {},
file_size={}", _fs_name,
diff --git
a/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_load_error.groovy
b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_load_error.groovy
new file mode 100755
index 00000000000..df77a6c69c1
--- /dev/null
+++ b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_load_error.groovy
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("inject_hdfs_load_error") {
+ if (!enableStoragevault()) {
+ logger.info("skip create storgage vault case")
+ return
+ }
+
+ sql """
+ CREATE STORAGE VAULT IF NOT EXISTS inject_hdfs_load_error
+ PROPERTIES (
+ "type"="hdfs",
+ "fs.defaultFS"="${getHdfsFs()}",
+ "path_prefix" = "inject_hdfs_load_error"
+ );
+ """
+
+ try {
+ String backendId;
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ backendId = backendId_to_backendIP.keySet()[0]
+ def be_host = backendId_to_backendIP.get(backendId)
+ def be_http_port = backendId_to_backendHttpPort.get(backendId)
+
+ def tableName = "inject_load_error_table"
+
+ // op = {set, apply_suite, clear}, behavior = {sleep, return,
return_ok, return_error}
+ // name = {point_name}, code = {return_error return code}, duration =
{sleep duration}
+ // value is code for return_error, duration for sleep
+ // internal error is 6
+ def triggerInject = { name, op, behavior, value ->
+ // trigger compactions for all tablets in ${tableName}
+ StringBuilder sb = new StringBuilder();
+ // /api/injection_point/{op}/{name}
+ sb.append("curl -X POST http://${be_host}:${be_http_port}")
+
sb.append("/api/injection_point/${op}/${name}/${behavior}/${value}")
+
+ String command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ code = process.waitFor()
+ err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())));
+ out = process.getText()
+ logger.info("enable inject, op ${op}, name ${name}, behavior
${behavior}, value ${value}")
+ assertEquals(code, 0)
+ return out
+ }
+
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ C_CUSTKEY INTEGER NOT NULL,
+ C_NAME INTEGER NOT NULL
+ )
+ DUPLICATE KEY(C_CUSTKEY, C_NAME)
+ DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1", "storage_vault_name" =
"inject_hdfs_load_error"
+ )
+ """
+
+ def load_table = { cnt ->
+ for (int i = 0; i < cnt; i++) {
+ sql """
+ insert into ${tableName} values(${i}, '${i}');
+ """
+ }
+ }
+
+ for (int j = 1; j < 5; j++) {
+ triggerInject("HdfsFileWriter::hdfsFlush", "set", "return_error",
6)
+ expectExceptionLike({
+ load_table(j)
+ }, "failed to flush hdfs file")
+ triggerInject("HdfsFileWriter::hdfsFlush", "clear", "valid", 0)
+ }
+
+ for (int j = 1; j < 5; j++) {
+ triggerInject("HdfsFileWriter::hdfsCloseFile", "set",
"return_error", 6)
+ expectExceptionLike({
+ load_table(j)
+ }, "Write hdfs file failed")
+ triggerInject("HdfsFileWriter::hdfsCloseFile", "clear", "valid",
0)
+ }
+
+ for (int j = 1; j < 5; j++) {
+ triggerInject("HdfsFileWriter::append_hdfs_file_error", "set",
"return_error", 6)
+ expectExceptionLike({
+ load_table(j)
+ }, "write hdfs file failed")
+ triggerInject("HdfsFileWriter::append_hdfs_file_error", "clear",
"valid", 0)
+ }
+
+ } finally {
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_select_error.groovy
b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_select_error.groovy
new file mode 100755
index 00000000000..3b41e78834a
--- /dev/null
+++
b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_select_error.groovy
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("inject_hdfs_load_error") {
+ if (!enableStoragevault()) {
+ logger.info("skip create storgage vault case")
+ return
+ }
+
+ sql """
+ CREATE STORAGE VAULT IF NOT EXISTS inject_hdfs_select_error
+ PROPERTIES (
+ "type"="hdfs",
+ "fs.defaultFS"="${getHdfsFs()}",
+ "path_prefix" = "inject_hdfs_select_error"
+ );
+ """
+
+ try {
+ String backendId;
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ backendId = backendId_to_backendIP.keySet()[0]
+ def be_host = backendId_to_backendIP.get(backendId)
+ def be_http_port = backendId_to_backendHttpPort.get(backendId)
+
+ def tableName = "inject_select_error_table"
+
+ // op = {set, apply_suite, clear}, behavior = {sleep, return,
return_ok, return_error}
+ // name = {point_name}, code = {return_error return code}, duration =
{sleep duration}
+ // value is code for return_error, duration for sleep
+ // internal error is 6
+ def triggerInject = { name, op, behavior, value ->
+ // trigger compactions for all tablets in ${tableName}
+ StringBuilder sb = new StringBuilder();
+ // /api/injection_point/{op}/{name}
+ sb.append("curl -X POST http://${be_host}:${be_http_port}")
+
sb.append("/api/injection_point/${op}/${name}/${behavior}/${value}")
+
+ String command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ code = process.waitFor()
+ err = IOGroovyMethods.getText(new BufferedReader(new
InputStreamReader(process.getErrorStream())));
+ out = process.getText()
+ logger.info("enable inject, op ${op}, name ${name}, behavior
${behavior}, value ${value}")
+ assertEquals(code, 0)
+ return out
+ }
+
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ C_CUSTKEY INTEGER NOT NULL,
+ C_NAME INTEGER NOT NULL
+ )
+ DUPLICATE KEY(C_CUSTKEY, C_NAME)
+ DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1", "storage_vault_name" =
"inject_hdfs_select_error"
+ )
+ """
+
+ def load_table = { cnt ->
+ for (int i = 0; i < cnt; i++) {
+ sql """
+ insert into ${tableName} values(${i}, '${i}');
+ """
+ }
+ }
+
+ for (int j = 1; j <10; j++) {
+ load_table(j)
+ }
+
+ for (int j = 1; j < 5; j++) {
+ triggerInject("HdfsFileReader:read_error", "set", "return_error",
6)
+ expectExceptionLike({
+ load_table(j)
+ }, "Read hdfs file failed")
+ triggerInject("HdfsFileReader:read_error", "clear", "valid", 0)
+ }
+
+ } finally {
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]