This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9a90c1cd57c [improve](move-memtable)add multi replica fault injection
(#29348)
9a90c1cd57c is described below
commit 9a90c1cd57c7073b4a597beec5d1dcc0e7cf8acb
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sun Dec 31 16:30:27 2023 +0800
[improve](move-memtable)add multi replica fault injection (#29348)
---
be/src/io/fs/stream_sink_file_writer.cpp | 28 ++++--
.../test_multi_replica_fault_injection.groovy | 100 +++++++++++++++++++++
2 files changed, 120 insertions(+), 8 deletions(-)
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp
b/be/src/io/fs/stream_sink_file_writer.cpp
index 74f34b44e6d..b7cd4ed0bbf 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -53,25 +53,37 @@ Status StreamSinkFileWriter::appendv(const Slice* data,
size_t data_cnt) {
std::span<const Slice> slices {data, data_cnt};
size_t stream_index = 0;
bool ok = false;
+ bool skip_stream = false;
+ Status st;
for (auto& stream : _streams) {
- auto st = stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id,
- _bytes_appended, slices);
+
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
{
+ if (stream_index >= 2) {
+ skip_stream = true;
+ }
+ });
+
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
{
+ if (stream_index >= 1) {
+ skip_stream = true;
+ }
+ });
+ if (!skip_stream) {
+ st = stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id,
+ _bytes_appended, slices);
+ }
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
{
if (stream_index >= 2) {
st = Status::InternalError("stream sink file writer append
data failed");
- } else {
- stream_index++;
}
+ stream_index++;
+ skip_stream = false;
});
-
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
{
if (stream_index >= 1) {
st = Status::InternalError("stream sink file writer append
data failed");
- } else {
- stream_index++;
}
+ stream_index++;
+ skip_stream = false;
});
-
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
{
st = Status::InternalError("stream sink file writer append data
failed");
});
diff --git
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
new file mode 100644
index 00000000000..aff1318a52f
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_multi_replica_fault_injection", "nonConcurrent") {
+ def res = sql"show backends;"
+ logger.info(res.toString())
+ def beNums = 0;
+ res.each { item ->
+ beNums++;
+ logger.info(item.toString())
+ }
+ if (beNums == 3){
+ sql """ set enable_memtable_on_sink_node=true """
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" =
"3")
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" =
"3")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ streamLoad {
+ table "baseall"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+
+ def load_with_injection = { injection, error_msg->
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(injection)
+ sql "insert into test select * from baseall where k1 <= 3"
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains(error_msg))
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injection)
+ }
+ }
+
+ // StreamSinkFileWriter appendv write segment failed one replica
+ // success
+
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
"sucess")
+ // StreamSinkFileWriter appendv write segment failed two replica
+
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
"replica num 1 < load required replica num 2")
+ // StreamSinkFileWriter appendv write segment failed all replica
+
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
"failed to write any replicas")
+
+ sql """ set enable_memtable_on_sink_node=false """
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]