This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 41d907912a6 [improve](move-memtable) add fault injection in load
stream stub (#29105)
41d907912a6 is described below
commit 41d907912a668f3de934b52593ec41463e59da3b
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sat Dec 30 01:29:28 2023 +0800
[improve](move-memtable) add fault injection in load stream stub (#29105)
---
be/src/io/fs/stream_sink_file_writer.cpp | 20 ++++
be/src/vec/sink/load_stream_stub.cpp | 2 +
be/src/vec/sink/load_stream_stub_pool.cpp | 6 ++
.../test_load_stream_stub_failure_injection.groovy | 102 +++++++++++++++++++++
4 files changed, 130 insertions(+)
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp
b/be/src/io/fs/stream_sink_file_writer.cpp
index 2fce7c6baa9..74f34b44e6d 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -51,10 +51,30 @@ Status StreamSinkFileWriter::appendv(const Slice* data,
size_t data_cnt) {
<< ", data_length: " << bytes_req;
std::span<const Slice> slices {data, data_cnt};
+ size_t stream_index = 0;
bool ok = false;
for (auto& stream : _streams) {
auto st = stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id,
_bytes_appended, slices);
+
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
{
+ if (stream_index >= 2) {
+ st = Status::InternalError("stream sink file writer append
data failed");
+ } else {
+ stream_index++;
+ }
+ });
+
+
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
{
+ if (stream_index >= 1) {
+ st = Status::InternalError("stream sink file writer append
data failed");
+ } else {
+ stream_index++;
+ }
+ });
+
+
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
{
+ st = Status::InternalError("stream sink file writer append data
failed");
+ });
ok = ok || st.ok();
}
if (!ok) {
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 78358765967..4b557cae93d 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -21,6 +21,7 @@
#include "olap/rowset/rowset_writer.h"
#include "util/brpc_client_cache.h"
+#include "util/debug_points.h"
#include "util/network_util.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
@@ -330,6 +331,7 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
ret = brpc::StreamWrite(_stream_id, buf);
}
+ DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed",
{ ret = EPIPE; });
switch (ret) {
case 0:
return Status::OK();
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp
b/be/src/vec/sink/load_stream_stub_pool.cpp
index bc19bad532c..f7eeb96bb04 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -17,6 +17,7 @@
#include "vec/sink/load_stream_stub_pool.h"
+#include "util/debug_points.h"
#include "vec/sink/load_stream_stub.h"
namespace doris {
@@ -29,16 +30,21 @@ LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id,
int num_use, LoadStre
void LoadStreams::release() {
int num_use = --_use_cnt;
+ DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
if (num_use == 0) {
LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id="
<< _dst_id;
for (auto& stream : _streams) {
auto st = stream->close_stream();
+ DBUG_EXECUTE_IF("LoadStreams.release.close_stream_failed",
+ { st = Status::InternalError("stream close
failed"); });
if (!st.ok()) {
LOG(WARNING) << "close stream failed " << st;
}
}
for (auto& stream : _streams) {
auto st = stream->close_wait();
+ DBUG_EXECUTE_IF("LoadStreams.release.close_wait_failed",
+ { st = Status::InternalError("stream close wait
timeout"); });
if (!st.ok()) {
LOG(WARNING) << "close wait failed " << st;
}
diff --git
a/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
new file mode 100644
index 00000000000..ed02d3e7ac1
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_stream_stub_fault_injection", "nonConcurrent") {
+ sql """ set enable_memtable_on_sink_node=true """
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ streamLoad {
+ table "baseall"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+
+ def load_with_injection = { injection, error_msg->
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injection)
+ sql "insert into test select * from baseall where k1 <= 3"
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains(error_msg))
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injection)
+ }
+ }
+
+ // StreamSinkFileWriter appendv write segment failed one replica
+
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
"")
+ // StreamSinkFileWriter appendv write segment failed two replica
+
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
"")
+ // StreamSinkFileWriter appendv write segment failed all replica
+
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
"stream sink file writer append data failed")
+ // LoadStreams stream wait failed
+ load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed",
"StreamWrite failed, err=32")
+ // LoadStreams keeping stream when release
+ load_with_injection("LoadStreams.release.keeping_streams", "")
+ // LoadStreams close stream failed
+ load_with_injection("LoadStreams.release.close_stream_failed", "")
+ // LoadStreams close wait failed
+ load_with_injection("LoadStreams.release.close_wait_failed", "")
+
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """ set enable_memtable_on_sink_node=false """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]