This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b31494b18c8 [test](regression) add fault injection cases for 
LoadStream (#29101)
b31494b18c8 is described below

commit b31494b18c8dd0fff14e1841a14236db28ef6552
Author: zhengyu <[email protected]>
AuthorDate: Thu Dec 28 16:16:26 2023 +0800

    [test](regression) add fault injection cases for LoadStream (#29101)
    
    Signed-off-by: freemandealer <[email protected]>
---
 be/src/io/fs/file_writer.h                         |   6 +-
 be/src/io/fs/local_file_system.cpp                 |   5 +
 be/src/io/fs/local_file_writer.cpp                 |   6 ++
 be/src/io/fs/stream_sink_file_writer.cpp           |   1 +
 be/src/olap/rowset_builder.cpp                     |   2 +
 be/src/runtime/load_stream.cpp                     |  15 +++
 be/src/runtime/load_stream_writer.cpp              |  15 ++-
 be/src/runtime/load_stream_writer.h                |   2 -
 .../test_load_stream_fault_injection.groovy        | 119 +++++++++++++++++++++
 9 files changed, 165 insertions(+), 6 deletions(-)

diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index b8f44c2e339..51339afebcd 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -22,6 +22,7 @@
 #include "common/status.h"
 #include "gutil/macros.h"
 #include "io/fs/path.h"
+#include "util/debug_points.h"
 #include "util/slice.h"
 
 namespace doris {
@@ -63,7 +64,10 @@ public:
 
     const Path& path() const { return _path; }
 
-    size_t bytes_appended() const { return _bytes_appended; }
+    size_t bytes_appended() const {
+        DBUG_EXECUTE_IF("FileWriter.bytes_appended.zero_bytes_appended", { 
return 0; });
+        return _bytes_appended;
+    }
 
     std::shared_ptr<FileSystem> fs() const { return _fs; }
 
diff --git a/be/src/io/fs/local_file_system.cpp 
b/be/src/io/fs/local_file_system.cpp
index a71708ac72c..d7e1372ffe9 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -40,6 +40,7 @@
 #include "io/fs/local_file_writer.h"
 #include "runtime/thread_context.h"
 #include "util/async_io.h" // IWYU pragma: keep
+#include "util/debug_points.h"
 #include "util/defer_op.h"
 
 namespace doris {
@@ -57,6 +58,10 @@ LocalFileSystem::~LocalFileSystem() = default;
 Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr* 
writer,
                                          const FileWriterOptions* opts) {
     int fd = ::open(file.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC, 
0666);
+    DBUG_EXECUTE_IF("LocalFileSystem.create_file_impl.open_file_failed", {
+        ::close(fd);
+        fd = -1;
+    });
     if (-1 == fd) {
         return Status::IOError("failed to open {}: {}", file.native(), 
errno_to_str());
     }
diff --git a/be/src/io/fs/local_file_writer.cpp 
b/be/src/io/fs/local_file_writer.cpp
index af913d60a98..3915cf91b6d 100644
--- a/be/src/io/fs/local_file_writer.cpp
+++ b/be/src/io/fs/local_file_writer.cpp
@@ -38,6 +38,7 @@
 #include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
 #include "io/fs/path.h"
+#include "util/debug_points.h"
 #include "util/doris_metrics.h"
 
 namespace doris {
@@ -203,6 +204,11 @@ Status LocalFileWriter::_close(bool sync) {
     if (0 != ::close(_fd)) {
         return Status::IOError("cannot close {}: {}", _path.native(), 
std::strerror(errno));
     }
+
+    DBUG_EXECUTE_IF("LocalFileWriter.close.failed", {
+        return Status::IOError("cannot close {}: {}", _path.native(), 
std::strerror(errno));
+    });
+
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp 
b/be/src/io/fs/stream_sink_file_writer.cpp
index 484be9e07e9..25a4f5b27d5 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -21,6 +21,7 @@
 
 #include "olap/olap_common.h"
 #include "olap/rowset/beta_rowset_writer.h"
+#include "util/debug_points.h"
 #include "util/uid_util.h"
 #include "vec/sink/load_stream_stub.h"
 
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 5fda708cf4e..e7fb520c0bf 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -149,6 +149,8 @@ Status RowsetBuilder::check_tablet_version_count() {
                      << st;
     }
     int version_count = tablet()->version_count();
+    
DBUG_EXECUTE_IF("RowsetBuilder.check_tablet_version_count.too_many_version",
+                    { version_count = INT_MAX; });
     if (version_count > config::max_tablet_version_num) {
         return Status::Error<TOO_MANY_VERSION>(
                 "failed to init rowset builder. version count: {}, exceed 
limit: {}, "
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 836d4da147a..d2ea41f1c35 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -37,10 +37,13 @@
 #include "runtime/load_channel.h"
 #include "runtime/load_stream_mgr.h"
 #include "runtime/load_stream_writer.h"
+#include "util/debug_points.h"
 #include "util/runtime_profile.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
 
+#define UNKNOWN_ID_FOR_TEST 0x7c00
+
 namespace doris {
 
 bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
@@ -169,6 +172,7 @@ Status TabletStream::add_segment(const PStreamHeader& 
header, butil::IOBuf* data
     int64_t src_id = header.src_id();
     uint32_t segid = header.segment_id();
     uint32_t new_segid;
+    DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = 
UNKNOWN_ID_FOR_TEST; });
     {
         std::lock_guard lock_guard(_lock);
         if (!_segids_mapping.contains(src_id)) {
@@ -440,6 +444,8 @@ Status LoadStream::_append_data(const PStreamHeader& 
header, butil::IOBuf* data)
     IndexStreamSharedPtr index_stream;
 
     int64_t index_id = header.index_id();
+    DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_indexid",
+                    { index_id = UNKNOWN_ID_FOR_TEST; });
     auto it = _index_streams_map.find(index_id);
     if (it == _index_streams_map.end()) {
         return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id 
{}", index_id);
@@ -479,6 +485,15 @@ int LoadStream::on_received_messages(StreamId id, 
butil::IOBuf* const messages[]
 void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, 
butil::IOBuf* data) {
     VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << 
hdr.src_id()
                << " with tablet " << hdr.tablet_id();
+    DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", {
+        PUniqueId& load_id = const_cast<PUniqueId&>(hdr.load_id());
+        load_id.set_hi(UNKNOWN_ID_FOR_TEST);
+        load_id.set_lo(UNKNOWN_ID_FOR_TEST);
+    });
+    DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", {
+        PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
+        t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST);
+    });
     if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
         Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>(
                 "invalid load id {}, expected {}", print_id(hdr.load_id()), 
print_id(_load_id));
diff --git a/be/src/runtime/load_stream_writer.cpp 
b/be/src/runtime/load_stream_writer.cpp
index 0a339e854aa..27ca047d687 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -59,6 +59,7 @@
 #include "runtime/memory/mem_tracker.h"
 #include "service/backend_options.h"
 #include "util/brpc_client_cache.h"
+#include "util/debug_points.h"
 #include "util/mem_info.h"
 #include "util/ref_count_closure.h"
 #include "util/stopwatch.hpp"
@@ -87,9 +88,7 @@ Status LoadStreamWriter::append_data(uint32_t segid, uint64_t 
offset, butil::IOB
     io::FileWriter* file_writer = nullptr;
     {
         std::lock_guard lock_guard(_lock);
-        if (!_is_init) {
-            RETURN_IF_ERROR(init());
-        }
+        DCHECK(_is_init);
         if (segid >= _segment_file_writers.size()) {
             for (size_t i = _segment_file_writers.size(); i <= segid; i++) {
                 Status st;
@@ -106,6 +105,7 @@ Status LoadStreamWriter::append_data(uint32_t segid, 
uint64_t offset, butil::IOB
         // TODO: IOBuf to Slice
         file_writer = _segment_file_writers[segid].get();
     }
+    DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", { 
file_writer = nullptr; });
     VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
     if (file_writer == nullptr) {
         return Status::Corruption("append_data failed, file writer {} is 
destoryed", segid);
@@ -122,14 +122,18 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
     io::FileWriter* file_writer = nullptr;
     {
         std::lock_guard lock_guard(_lock);
+        DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.uninited_writer", { 
_is_init = false; });
         if (!_is_init) {
             return Status::Corruption("close_segment failed, LoadStreamWriter 
is not inited");
         }
+        DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.bad_segid",
+                        { segid = _segment_file_writers.size(); });
         if (segid >= _segment_file_writers.size()) {
             return Status::Corruption("close_segment failed, segment {} is 
never opened", segid);
         }
         file_writer = _segment_file_writers[segid].get();
     }
+    DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.null_file_writer", { 
file_writer = nullptr; });
     if (file_writer == nullptr) {
         return Status::Corruption("close_segment failed, file writer {} is 
destoryed", segid);
     }
@@ -151,14 +155,18 @@ Status LoadStreamWriter::add_segment(uint32_t segid, 
const SegmentStatistics& st
     io::FileWriter* file_writer = nullptr;
     {
         std::lock_guard lock_guard(_lock);
+        DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.uninited_writer", { 
_is_init = false; });
         if (!_is_init) {
             return Status::Corruption("add_segment failed, LoadStreamWriter is 
not inited");
         }
+        DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
+                        { segid = _segment_file_writers.size(); });
         if (segid >= _segment_file_writers.size()) {
             return Status::Corruption("add_segment failed, segment {} is never 
opened", segid);
         }
         file_writer = _segment_file_writers[segid].get();
     }
+    DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.null_file_writer", { 
file_writer = nullptr; });
     if (file_writer == nullptr) {
         return Status::Corruption("add_segment failed, file writer {} is 
destoryed", segid);
     }
@@ -177,6 +185,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const 
SegmentStatistics& st
 
 Status LoadStreamWriter::close() {
     std::lock_guard<std::mutex> l(_lock);
+    DBUG_EXECUTE_IF("LoadStreamWriter.close.uninited_writer", { _is_init = 
false; });
     if (!_is_init) {
         // if this delta writer is not initialized, but close() is called.
         // which means this tablet has no data loaded, but at least one tablet
diff --git a/be/src/runtime/load_stream_writer.h 
b/be/src/runtime/load_stream_writer.h
index ab6530bf60d..4f35950a93f 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -70,8 +70,6 @@ public:
     // wait for all memtables to be flushed.
     Status close();
 
-    int64_t tablet_id() const { return _req.tablet_id; }
-
 private:
     bool _is_init = false;
     bool _is_canceled = false;
diff --git 
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
new file mode 100644
index 00000000000..09f271fead5
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("load_stream_fault_injection", "nonConcurrent") {
+    // init query case data
+    sql """
+        CREATE TABLE IF NOT EXISTS `baseall` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+    sql """
+        CREATE TABLE IF NOT EXISTS `test` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace_if_not_null null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    streamLoad {
+        table "baseall"
+        db "regression_test_fault_injection_p0"
+        set 'column_separator', ','
+        file "baseall.txt"
+    }
+
+    def load_with_injection = { injection, expect_errmsg ->
+        try {
+            GetDebugPoint().enableDebugPointForAllBEs(injection)
+            sql "insert into test select * from baseall where k1 <= 3"
+        } catch(Exception e) {
+            // assertTrue(e.getMessage().contains("Process has no memory 
available"))  // the msg should contain the root cause
+            logger.info(e.getMessage())
+        } finally {
+            GetDebugPoint().disableDebugPointForAllBEs(injection)
+        }
+    }
+
+    // LoadStreamWriter create file failed
+    load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", 
"")
+    // LoadStreamWriter append_data meet null file writer error
+    load_with_injection("LoadStreamWriter.append_data.null_file_writer", "")
+    // LoadStreamWriter append_data meet bytes_appended and real file size not 
match error
+    load_with_injection("FileWriter.bytes_appended.zero_bytes_appended", "")
+    // LoadStreamWriter close_segment meet not inited error
+    load_with_injection("LoadStreamWriter.close_segment.uninited_writer", "")
+    // LoadStreamWriter close_segment meet not bad segid error
+    load_with_injection("LoadStreamWriter.close_segment.bad_segid", "")
+    // LoadStreamWriter close_segment meet null file writer error
+    load_with_injection("LoadStreamWriter.close_segment.null_file_writer", "")
+    // LoadStreamWriter close_segment meet file writer failed to close error
+    load_with_injection("LocalFileWriter.close.failed", "")
+    // LoadStreamWriter close_segment meet bytes_appended and real file size 
not match error
+    load_with_injection("FileWriter.close_segment.zero_bytes_appended", "")
+    // LoadStreamWriter add_segment meet not inited error
+    load_with_injection("LoadStreamWriter.add_segment.uninited_writer", "")
+    // LoadStreamWriter add_segment meet not bad segid error
+    load_with_injection("LoadStreamWriter.add_segment.bad_segid", "")
+    // LoadStreamWriter add_segment meet null file writer error
+    load_with_injection("LoadStreamWriter.add_segment.null_file_writer", "")
+    // LoadStreamWriter add_segment meet bytes_appended and real file size not 
match error
+    load_with_injection("FileWriter.add_segment.zero_bytes_appended", "")
+    // LoadStreamWriter close meet not inited error
+    load_with_injection("LoadStreamWriter.close.uninited_writer", "")
+    // LoadStream init failed coz LoadStreamWriter init failed
+    
load_with_injection("RowsetBuilder.check_tablet_version_count.too_many_version",
 "")
+    // LoadStream add_segment meet unknown segid in request header
+    load_with_injection("TabletStream.add_segment.unknown_segid", "")
+    // LoadStream append_data meet unknown index id in request header
+    load_with_injection("abletStream.add_segment.unknown_indexid", "")
+    // LoadStream dispatch meet unknown load id
+    load_with_injection("LoadStream._dispatch.unknown_loadid", "")
+    // LoadStream dispatch meet unknown src id
+    load_with_injection("LoadStream._dispatch.unknown_srcid", "")
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to