This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f1111fd03066dbc702c8dcd686ca0c4bd4765c57
Author: AlexYue <[email protected]>
AuthorDate: Mon Apr 22 19:54:09 2024 +0800

    [case](Cloud) Add chaos storage vault cases (#33730)
---
 be/src/cloud/injection_point_action.cpp            |  56 +++++++++-
 be/src/io/fs/hdfs_file_reader.cpp                  |   5 +
 be/src/io/fs/hdfs_file_writer.cpp                  |  18 +++-
 .../inject_hdfs_load_error.groovy                  | 116 +++++++++++++++++++++
 .../inject_hdfs_select_error.groovy                | 104 ++++++++++++++++++
 5 files changed, 293 insertions(+), 6 deletions(-)

diff --git a/be/src/cloud/injection_point_action.cpp 
b/be/src/cloud/injection_point_action.cpp
index adcc6cad708..d99dcfd534d 100644
--- a/be/src/cloud/injection_point_action.cpp
+++ b/be/src/cloud/injection_point_action.cpp
@@ -45,11 +45,57 @@ void register_suites() {
         sp->set_call_back("new_cumulative_point", [](auto&& args) {
             auto output_rowset = try_any_cast<Rowset*>(args[0]);
             auto last_cumulative_point = try_any_cast<int64_t>(args[1]);
-            auto pair = try_any_cast<std::pair<int64_t, bool>*>(args.back());
-            pair->first = output_rowset->start_version() == 
last_cumulative_point
-                                  ? output_rowset->end_version() + 1
-                                  : last_cumulative_point;
-            pair->second = true;
+            auto& [ret_vault, should_ret] = *try_any_cast<std::pair<int64_t, 
bool>*>(args.back());
+            ret_vault = output_rowset->start_version() == last_cumulative_point
+                                ? output_rowset->end_version() + 1
+                                : last_cumulative_point;
+            should_ret = true;
+        });
+    });
+    suite_map.emplace("test_s3_file_writer", [] {
+        auto* sp = SyncPoint::get_instance();
+        sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache", 
[](auto&&) {
+            std::srand(static_cast<unsigned int>(std::time(nullptr)));
+            int random_sleep_time_second = std::rand() % 10 + 1;
+            
std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second));
+        });
+        
sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache_inject", 
[](auto&& args) {
+            auto& [ret_status, should_ret] = *try_any_cast<std::pair<Status, 
bool>*>(args.back());
+            ret_status =
+                    Status::IOError<false>("failed to write into file cache 
due to inject error");
+            should_ret = true;
+        });
+    });
+    suite_map.emplace("test_storage_vault", [] {
+        auto* sp = SyncPoint::get_instance();
+        sp->set_call_back("HdfsFileWriter::append_hdfs_file_delay", [](auto&&) 
{
+            std::srand(static_cast<unsigned int>(std::time(nullptr)));
+            int random_sleep_time_second = std::rand() % 10 + 1;
+            
std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second));
+        });
+        sp->set_call_back("HdfsFileWriter::append_hdfs_file_error", [](auto&& 
args) {
+            auto& [_, should_ret] = *try_any_cast<std::pair<Status, 
bool>*>(args.back());
+            should_ret = true;
+        });
+        sp->set_call_back("HdfsFileWriter::hdfsFlush", [](auto&& args) {
+            auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, 
bool>*>(args.back());
+            ret_value = Status::InternalError("failed to flush hdfs file");
+            should_ret = true;
+        });
+        sp->set_call_back("HdfsFileWriter::hdfsCloseFile", [](auto&& args) {
+            auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, 
bool>*>(args.back());
+            ret_value = Status::InternalError("failed to flush hdfs file");
+            should_ret = true;
+        });
+        sp->set_call_back("HdfsFileWriter::hdfeSync", [](auto&& args) {
+            auto& [ret_value, should_ret] = *try_any_cast<std::pair<Status, 
bool>*>(args.back());
+            ret_value = Status::InternalError("failed to flush hdfs file");
+            should_ret = true;
+        });
+        sp->set_call_back("HdfsFileReader:read_error", [](auto&& args) {
+            auto& [ret_status, should_ret] = *try_any_cast<std::pair<Status, 
bool>*>(args.back());
+            ret_status = Status::InternalError("read hdfs error");
+            should_ret = true;
         });
     });
 }
diff --git a/be/src/io/fs/hdfs_file_reader.cpp 
b/be/src/io/fs/hdfs_file_reader.cpp
index 45be6ffd60a..358663b65d0 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -26,6 +26,7 @@
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/logging.h"
+#include "common/sync_point.h"
 #include "io/fs/err_utils.h"
 #include "io/hdfs_util.h"
 #include "service/backend_options.h"
@@ -126,6 +127,10 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
     while (has_read < bytes_req) {
         tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + 
has_read,
                                     to + has_read, bytes_req - has_read);
+        {
+            [[maybe_unused]] Status error_ret;
+            
TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileReader:read_error", error_ret);
+        }
         if (loop_read < 0) {
             // invoker maybe just skip Status.NotFound and continue
             // so we need distinguish between it and other kinds of errors
diff --git a/be/src/io/fs/hdfs_file_writer.cpp 
b/be/src/io/fs/hdfs_file_writer.cpp
index 6f9105a89e7..b2ea4dace94 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -27,6 +27,7 @@
 
 #include "common/logging.h"
 #include "common/status.h"
+#include "common/sync_point.h"
 #include "io/cache/block_file_cache.h"
 #include "io/cache/block_file_cache_factory.h"
 #include "io/cache/file_cache_common.h"
@@ -96,6 +97,8 @@ Status HdfsFileWriter::close() {
             ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
 #endif
         }
+        TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfeSync",
+                                               Status::InternalError("failed 
to sync hdfs file"));
 
         if (ret != 0) {
             return Status::InternalError(
@@ -105,13 +108,16 @@ Status HdfsFileWriter::close() {
     }
 
     {
-        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
+        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
         // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
         // the HDFS response, but won't guarantee the synchronization of data 
to HDFS.
         ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
     }
     _hdfs_file = nullptr;
+    TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsCloseFile",
+                                           Status::InternalError("failed to 
close hdfs file"));
     if (ret != 0) {
+        std::string err_msg = hdfs_error();
         return Status::InternalError(
                 "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: 
{}, file_size={}",
                 BackendOptions::get_localhost(), _fs_name, _path.native(), 
hdfs_error(),
@@ -182,9 +188,17 @@ Status HdfsFileWriter::append_hdfs_file(std::string_view 
content) {
     while (!content.empty()) {
         int64_t written_bytes;
         {
+            
TEST_INJECTION_POINT_CALLBACK("HdfsFileWriter::append_hdfs_file_delay");
             SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency);
             written_bytes =
                     hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, 
content.data(), content.size());
+            {
+                [[maybe_unused]] Status error_ret = Status::InternalError(
+                        "write hdfs failed. fs_name: {}, path: {}, error: size 
exceeds", _fs_name,
+                        _path.native());
+                
TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::append_hdfs_file_error",
+                                                       error_ret);
+            }
         }
         if (written_bytes < 0) {
             return Status::InternalError(
@@ -257,6 +271,8 @@ Status HdfsFileWriter::finalize() {
 
     // Flush buffered data to HDFS without waiting for HDFS response
     int ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
+    TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsFlush",
+                                           Status::InternalError("failed to 
flush hdfs file"));
     if (ret != 0) {
         return Status::InternalError(
                 "failed to flush hdfs file. fs_name={} path={} : {}, 
file_size={}", _fs_name,
diff --git 
a/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_load_error.groovy 
b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_load_error.groovy
new file mode 100755
index 00000000000..df77a6c69c1
--- /dev/null
+++ b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_load_error.groovy
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("inject_hdfs_load_error") {
+    if (!enableStoragevault()) {
+        logger.info("skip create storgage vault case")
+        return
+    }
+
+    sql """
+        CREATE STORAGE VAULT IF NOT EXISTS inject_hdfs_load_error
+        PROPERTIES (
+        "type"="hdfs",
+        "fs.defaultFS"="${getHdfsFs()}",
+        "path_prefix" = "inject_hdfs_load_error"
+        );
+    """
+
+    try {
+        String backendId;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backendId = backendId_to_backendIP.keySet()[0]
+        def be_host = backendId_to_backendIP.get(backendId)
+        def be_http_port = backendId_to_backendHttpPort.get(backendId)
+
+        def tableName = "inject_load_error_table"
+
+        // op = {set, apply_suite, clear}, behavior = {sleep, return, 
return_ok, return_error}
+        // name = {point_name}, code = {return_error return code}, duration = 
{sleep duration}
+        // value is code for return_error, duration for sleep
+        // internal error is 6
+        def triggerInject = { name, op, behavior, value ->
+            // trigger compactions for all tablets in ${tableName}
+            StringBuilder sb = new StringBuilder();
+            // /api/injection_point/{op}/{name}
+            sb.append("curl -X POST http://${be_host}:${be_http_port}";)
+            
sb.append("/api/injection_point/${op}/${name}/${behavior}/${value}")
+
+            String command = sb.toString()
+            logger.info(command)
+            process = command.execute()
+            code = process.waitFor()
+            err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+            out = process.getText()
+            logger.info("enable inject, op ${op}, name ${name}, behavior 
${behavior}, value ${value}")
+            assertEquals(code, 0)
+            return out
+        }
+
+        sql """ DROP TABLE IF EXISTS ${tableName}; """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+            C_CUSTKEY     INTEGER NOT NULL,
+            C_NAME        INTEGER NOT NULL
+            )
+            DUPLICATE KEY(C_CUSTKEY, C_NAME)
+            DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1
+            PROPERTIES (
+            "replication_num" = "1", "storage_vault_name" = 
"inject_hdfs_load_error"
+            )
+        """
+
+        def load_table = { cnt ->
+            for (int i = 0; i < cnt; i++) {
+                sql """
+                    insert into ${tableName} values(${i}, '${i}');
+                """
+            }
+        }
+
+        for (int j = 1; j < 5; j++) {
+            triggerInject("HdfsFileWriter::hdfsFlush", "set", "return_error", 
6)
+            expectExceptionLike({
+                load_table(j)
+            }, "failed to flush hdfs file")
+            triggerInject("HdfsFileWriter::hdfsFlush", "clear", "valid", 0)
+        }
+
+        for (int j = 1; j < 5; j++) {
+            triggerInject("HdfsFileWriter::hdfsCloseFile", "set", 
"return_error", 6)
+            expectExceptionLike({
+                load_table(j)
+            }, "Write hdfs file failed")
+            triggerInject("HdfsFileWriter::hdfsCloseFile", "clear",  "valid", 
0)
+        }
+
+        for (int j = 1; j < 5; j++) {
+            triggerInject("HdfsFileWriter::append_hdfs_file_error", "set", 
"return_error", 6)
+            expectExceptionLike({
+                load_table(j)
+            }, "write hdfs file failed")
+            triggerInject("HdfsFileWriter::append_hdfs_file_error", "clear",  
"valid", 0)
+        }
+
+    } finally {
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_select_error.groovy 
b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_select_error.groovy
new file mode 100755
index 00000000000..3b41e78834a
--- /dev/null
+++ 
b/regression-test/suites/inject_hdfs_vault_p0/inject_hdfs_select_error.groovy
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("inject_hdfs_load_error") {
+    if (!enableStoragevault()) {
+        logger.info("skip create storgage vault case")
+        return
+    }
+
+    sql """
+        CREATE STORAGE VAULT IF NOT EXISTS inject_hdfs_select_error
+        PROPERTIES (
+        "type"="hdfs",
+        "fs.defaultFS"="${getHdfsFs()}",
+        "path_prefix" = "inject_hdfs_select_error"
+        );
+    """
+
+    try {
+        String backendId;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backendId = backendId_to_backendIP.keySet()[0]
+        def be_host = backendId_to_backendIP.get(backendId)
+        def be_http_port = backendId_to_backendHttpPort.get(backendId)
+
+        def tableName = "inject_select_error_table"
+        
+        // op = {set, apply_suite, clear}, behavior = {sleep, return, 
return_ok, return_error}
+        // name = {point_name}, code = {return_error return code}, duration = 
{sleep duration}
+        // value is code for return_error, duration for sleep
+        // internal error is 6
+        def triggerInject = { name, op, behavior, value ->
+            // trigger compactions for all tablets in ${tableName}
+            StringBuilder sb = new StringBuilder();
+            // /api/injection_point/{op}/{name}
+            sb.append("curl -X POST http://${be_host}:${be_http_port}";)
+            
sb.append("/api/injection_point/${op}/${name}/${behavior}/${value}")
+
+            String command = sb.toString()
+            logger.info(command)
+            process = command.execute()
+            code = process.waitFor()
+            err = IOGroovyMethods.getText(new BufferedReader(new 
InputStreamReader(process.getErrorStream())));
+            out = process.getText()
+            logger.info("enable inject, op ${op}, name ${name}, behavior 
${behavior}, value ${value}")
+            assertEquals(code, 0)
+            return out
+        }
+
+        sql """ DROP TABLE IF EXISTS ${tableName}; """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+            C_CUSTKEY     INTEGER NOT NULL,
+            C_NAME        INTEGER NOT NULL
+            )
+            DUPLICATE KEY(C_CUSTKEY, C_NAME)
+            DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1
+            PROPERTIES (
+            "replication_num" = "1", "storage_vault_name" = 
"inject_hdfs_select_error"
+            )
+        """
+
+        def load_table = { cnt ->
+            for (int i = 0; i < cnt; i++) {
+                sql """
+                    insert into ${tableName} values(${i}, '${i}');
+                """
+            }
+        }
+
+        for (int j = 1; j <10; j++) {
+            load_table(j)
+        }
+
+        for (int j = 1; j < 5; j++) {
+            triggerInject("HdfsFileReader:read_error", "set", "return_error", 
6)
+            expectExceptionLike({
+                load_table(j)
+            }, "Read hdfs file failed")
+            triggerInject("HdfsFileReader:read_error", "clear", "valid", 0)
+        }
+
+    } finally {
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to