This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b31494b18c8 [test](regression) add fault injection cases for
LoadStream (#29101)
b31494b18c8 is described below
commit b31494b18c8dd0fff14e1841a14236db28ef6552
Author: zhengyu <[email protected]>
AuthorDate: Thu Dec 28 16:16:26 2023 +0800
[test](regression) add fault injection cases for LoadStream (#29101)
Signed-off-by: freemandealer <[email protected]>
---
be/src/io/fs/file_writer.h | 6 +-
be/src/io/fs/local_file_system.cpp | 5 +
be/src/io/fs/local_file_writer.cpp | 6 ++
be/src/io/fs/stream_sink_file_writer.cpp | 1 +
be/src/olap/rowset_builder.cpp | 2 +
be/src/runtime/load_stream.cpp | 15 +++
be/src/runtime/load_stream_writer.cpp | 15 ++-
be/src/runtime/load_stream_writer.h | 2 -
.../test_load_stream_fault_injection.groovy | 119 +++++++++++++++++++++
9 files changed, 165 insertions(+), 6 deletions(-)
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index b8f44c2e339..51339afebcd 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -22,6 +22,7 @@
#include "common/status.h"
#include "gutil/macros.h"
#include "io/fs/path.h"
+#include "util/debug_points.h"
#include "util/slice.h"
namespace doris {
@@ -63,7 +64,10 @@ public:
const Path& path() const { return _path; }
- size_t bytes_appended() const { return _bytes_appended; }
+ size_t bytes_appended() const {
+ DBUG_EXECUTE_IF("FileWriter.bytes_appended.zero_bytes_appended", {
return 0; });
+ return _bytes_appended;
+ }
std::shared_ptr<FileSystem> fs() const { return _fs; }
diff --git a/be/src/io/fs/local_file_system.cpp
b/be/src/io/fs/local_file_system.cpp
index a71708ac72c..d7e1372ffe9 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -40,6 +40,7 @@
#include "io/fs/local_file_writer.h"
#include "runtime/thread_context.h"
#include "util/async_io.h" // IWYU pragma: keep
+#include "util/debug_points.h"
#include "util/defer_op.h"
namespace doris {
@@ -57,6 +58,10 @@ LocalFileSystem::~LocalFileSystem() = default;
Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr*
writer,
const FileWriterOptions* opts) {
int fd = ::open(file.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC,
0666);
+ DBUG_EXECUTE_IF("LocalFileSystem.create_file_impl.open_file_failed", {
+ ::close(fd);
+ fd = -1;
+ });
if (-1 == fd) {
return Status::IOError("failed to open {}: {}", file.native(),
errno_to_str());
}
diff --git a/be/src/io/fs/local_file_writer.cpp
b/be/src/io/fs/local_file_writer.cpp
index af913d60a98..3915cf91b6d 100644
--- a/be/src/io/fs/local_file_writer.cpp
+++ b/be/src/io/fs/local_file_writer.cpp
@@ -38,6 +38,7 @@
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
+#include "util/debug_points.h"
#include "util/doris_metrics.h"
namespace doris {
@@ -203,6 +204,11 @@ Status LocalFileWriter::_close(bool sync) {
if (0 != ::close(_fd)) {
return Status::IOError("cannot close {}: {}", _path.native(),
std::strerror(errno));
}
+
+ DBUG_EXECUTE_IF("LocalFileWriter.close.failed", {
+ return Status::IOError("cannot close {}: {}", _path.native(),
std::strerror(errno));
+ });
+
return Status::OK();
}
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp
b/be/src/io/fs/stream_sink_file_writer.cpp
index 484be9e07e9..25a4f5b27d5 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -21,6 +21,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/beta_rowset_writer.h"
+#include "util/debug_points.h"
#include "util/uid_util.h"
#include "vec/sink/load_stream_stub.h"
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 5fda708cf4e..e7fb520c0bf 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -149,6 +149,8 @@ Status RowsetBuilder::check_tablet_version_count() {
<< st;
}
int version_count = tablet()->version_count();
+
DBUG_EXECUTE_IF("RowsetBuilder.check_tablet_version_count.too_many_version",
+ { version_count = INT_MAX; });
if (version_count > config::max_tablet_version_num) {
return Status::Error<TOO_MANY_VERSION>(
"failed to init rowset builder. version count: {}, exceed
limit: {}, "
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 836d4da147a..d2ea41f1c35 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -37,10 +37,13 @@
#include "runtime/load_channel.h"
#include "runtime/load_stream_mgr.h"
#include "runtime/load_stream_writer.h"
+#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
+#define UNKNOWN_ID_FOR_TEST 0x7c00
+
namespace doris {
bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
@@ -169,6 +172,7 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
int64_t src_id = header.src_id();
uint32_t segid = header.segment_id();
uint32_t new_segid;
+ DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid =
UNKNOWN_ID_FOR_TEST; });
{
std::lock_guard lock_guard(_lock);
if (!_segids_mapping.contains(src_id)) {
@@ -440,6 +444,8 @@ Status LoadStream::_append_data(const PStreamHeader&
header, butil::IOBuf* data)
IndexStreamSharedPtr index_stream;
int64_t index_id = header.index_id();
+ DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_indexid",
+ { index_id = UNKNOWN_ID_FOR_TEST; });
auto it = _index_streams_map.find(index_id);
if (it == _index_streams_map.end()) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>("unknown index_id
{}", index_id);
@@ -479,6 +485,15 @@ int LoadStream::on_received_messages(StreamId id,
butil::IOBuf* const messages[]
void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr,
butil::IOBuf* data) {
VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " <<
hdr.src_id()
<< " with tablet " << hdr.tablet_id();
+ DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", {
+ PUniqueId& load_id = const_cast<PUniqueId&>(hdr.load_id());
+ load_id.set_hi(UNKNOWN_ID_FOR_TEST);
+ load_id.set_lo(UNKNOWN_ID_FOR_TEST);
+ });
+ DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", {
+ PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
+ t_hdr.set_src_id(UNKNOWN_ID_FOR_TEST);
+ });
if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid load id {}, expected {}", print_id(hdr.load_id()),
print_id(_load_id));
diff --git a/be/src/runtime/load_stream_writer.cpp
b/be/src/runtime/load_stream_writer.cpp
index 0a339e854aa..27ca047d687 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -59,6 +59,7 @@
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
+#include "util/debug_points.h"
#include "util/mem_info.h"
#include "util/ref_count_closure.h"
#include "util/stopwatch.hpp"
@@ -87,9 +88,7 @@ Status LoadStreamWriter::append_data(uint32_t segid, uint64_t
offset, butil::IOB
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
- if (!_is_init) {
- RETURN_IF_ERROR(init());
- }
+ DCHECK(_is_init);
if (segid >= _segment_file_writers.size()) {
for (size_t i = _segment_file_writers.size(); i <= segid; i++) {
Status st;
@@ -106,6 +105,7 @@ Status LoadStreamWriter::append_data(uint32_t segid,
uint64_t offset, butil::IOB
// TODO: IOBuf to Slice
file_writer = _segment_file_writers[segid].get();
}
+ DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", {
file_writer = nullptr; });
VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
if (file_writer == nullptr) {
return Status::Corruption("append_data failed, file writer {} is
destoryed", segid);
@@ -122,14 +122,18 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
+ DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.uninited_writer", {
_is_init = false; });
if (!_is_init) {
return Status::Corruption("close_segment failed, LoadStreamWriter
is not inited");
}
+ DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.bad_segid",
+ { segid = _segment_file_writers.size(); });
if (segid >= _segment_file_writers.size()) {
return Status::Corruption("close_segment failed, segment {} is
never opened", segid);
}
file_writer = _segment_file_writers[segid].get();
}
+ DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.null_file_writer", {
file_writer = nullptr; });
if (file_writer == nullptr) {
return Status::Corruption("close_segment failed, file writer {} is
destoryed", segid);
}
@@ -151,14 +155,18 @@ Status LoadStreamWriter::add_segment(uint32_t segid,
const SegmentStatistics& st
io::FileWriter* file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
+ DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.uninited_writer", {
_is_init = false; });
if (!_is_init) {
return Status::Corruption("add_segment failed, LoadStreamWriter is
not inited");
}
+ DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
+ { segid = _segment_file_writers.size(); });
if (segid >= _segment_file_writers.size()) {
return Status::Corruption("add_segment failed, segment {} is never
opened", segid);
}
file_writer = _segment_file_writers[segid].get();
}
+ DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.null_file_writer", {
file_writer = nullptr; });
if (file_writer == nullptr) {
return Status::Corruption("add_segment failed, file writer {} is
destoryed", segid);
}
@@ -177,6 +185,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const
SegmentStatistics& st
Status LoadStreamWriter::close() {
std::lock_guard<std::mutex> l(_lock);
+ DBUG_EXECUTE_IF("LoadStreamWriter.close.uninited_writer", { _is_init =
false; });
if (!_is_init) {
// if this delta writer is not initialized, but close() is called.
// which means this tablet has no data loaded, but at least one tablet
diff --git a/be/src/runtime/load_stream_writer.h
b/be/src/runtime/load_stream_writer.h
index ab6530bf60d..4f35950a93f 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -70,8 +70,6 @@ public:
// wait for all memtables to be flushed.
Status close();
- int64_t tablet_id() const { return _req.tablet_id; }
-
private:
bool _is_init = false;
bool _is_canceled = false;
diff --git
a/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
new file mode 100644
index 00000000000..09f271fead5
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_load_stream_fault_injection.groovy
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("load_stream_fault_injection", "nonConcurrent") {
+ // init query case data
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ streamLoad {
+ table "baseall"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+
+ def load_with_injection = { injection, expect_errmsg ->
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injection)
+ sql "insert into test select * from baseall where k1 <= 3"
+ } catch(Exception e) {
+ // assertTrue(e.getMessage().contains("Process has no memory
available")) // the msg should contain the root cause
+ logger.info(e.getMessage())
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injection)
+ }
+ }
+
+ // LoadStreamWriter create file failed
+ load_with_injection("LocalFileSystem.create_file_impl.open_file_failed",
"")
+ // LoadStreamWriter append_data meet null file writer error
+ load_with_injection("LoadStreamWriter.append_data.null_file_writer", "")
+ // LoadStreamWriter append_data meet bytes_appended and real file size not
match error
+ load_with_injection("FileWriter.bytes_appended.zero_bytes_appended", "")
+ // LoadStreamWriter close_segment meet not inited error
+ load_with_injection("LoadStreamWriter.close_segment.uninited_writer", "")
+ // LoadStreamWriter close_segment meet not bad segid error
+ load_with_injection("LoadStreamWriter.close_segment.bad_segid", "")
+ // LoadStreamWriter close_segment meet null file writer error
+ load_with_injection("LoadStreamWriter.close_segment.null_file_writer", "")
+ // LoadStreamWriter close_segment meet file writer failed to close error
+ load_with_injection("LocalFileWriter.close.failed", "")
+ // LoadStreamWriter close_segment meet bytes_appended and real file size
not match error
+ load_with_injection("FileWriter.close_segment.zero_bytes_appended", "")
+ // LoadStreamWriter add_segment meet not inited error
+ load_with_injection("LoadStreamWriter.add_segment.uninited_writer", "")
+ // LoadStreamWriter add_segment meet not bad segid error
+ load_with_injection("LoadStreamWriter.add_segment.bad_segid", "")
+ // LoadStreamWriter add_segment meet null file writer error
+ load_with_injection("LoadStreamWriter.add_segment.null_file_writer", "")
+ // LoadStreamWriter add_segment meet bytes_appended and real file size not
match error
+ load_with_injection("FileWriter.add_segment.zero_bytes_appended", "")
+ // LoadStreamWriter close meet not inited error
+ load_with_injection("LoadStreamWriter.close.uninited_writer", "")
+ // LoadStream init failed coz LoadStreamWriter init failed
+
load_with_injection("RowsetBuilder.check_tablet_version_count.too_many_version",
"")
+ // LoadStream add_segment meet unknown segid in request header
+ load_with_injection("TabletStream.add_segment.unknown_segid", "")
+ // LoadStream append_data meet unknown index id in request header
+ load_with_injection("abletStream.add_segment.unknown_indexid", "")
+ // LoadStream dispatch meet unknown load id
+ load_with_injection("LoadStream._dispatch.unknown_loadid", "")
+ // LoadStream dispatch meet unknown src id
+ load_with_injection("LoadStream._dispatch.unknown_srcid", "")
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]