This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a90c1cd57c [improve](move-memtable)add multi replica fault injection 
(#29348)
9a90c1cd57c is described below

commit 9a90c1cd57c7073b4a597beec5d1dcc0e7cf8acb
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sun Dec 31 16:30:27 2023 +0800

    [improve](move-memtable)add multi replica fault injection (#29348)
---
 be/src/io/fs/stream_sink_file_writer.cpp           |  28 ++++--
 .../test_multi_replica_fault_injection.groovy      | 100 +++++++++++++++++++++
 2 files changed, 120 insertions(+), 8 deletions(-)

diff --git a/be/src/io/fs/stream_sink_file_writer.cpp 
b/be/src/io/fs/stream_sink_file_writer.cpp
index 74f34b44e6d..b7cd4ed0bbf 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -53,25 +53,37 @@ Status StreamSinkFileWriter::appendv(const Slice* data, 
size_t data_cnt) {
     std::span<const Slice> slices {data, data_cnt};
     size_t stream_index = 0;
     bool ok = false;
+    bool skip_stream = false;
+    Status st;
     for (auto& stream : _streams) {
-        auto st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id,
-                                      _bytes_appended, slices);
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 {
+            if (stream_index >= 2) {
+                skip_stream = true;
+            }
+        });
+        
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 {
+            if (stream_index >= 1) {
+                skip_stream = true;
+            }
+        });
+        if (!skip_stream) {
+            st = stream->append_data(_partition_id, _index_id, _tablet_id, 
_segment_id,
+                                     _bytes_appended, slices);
+        }
         
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 {
             if (stream_index >= 2) {
                 st = Status::InternalError("stream sink file writer append 
data failed");
-            } else {
-                stream_index++;
             }
+            stream_index++;
+            skip_stream = false;
         });
-
         
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 {
             if (stream_index >= 1) {
                 st = Status::InternalError("stream sink file writer append 
data failed");
-            } else {
-                stream_index++;
             }
+            stream_index++;
+            skip_stream = false;
         });
-
         
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
 {
             st = Status::InternalError("stream sink file writer append data 
failed");
         });
diff --git 
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
new file mode 100644
index 00000000000..aff1318a52f
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_multi_replica_fault_injection", "nonConcurrent") {
+    def res = sql"show backends;"
+    logger.info(res.toString())
+    def beNums = 0;
+    res.each { item ->
+        beNums++;
+        logger.info(item.toString())
+    }
+    if (beNums == 3){
+        sql """ set enable_memtable_on_sink_node=true """
+        sql """
+            CREATE TABLE IF NOT EXISTS `baseall` (
+                `k0` boolean null comment "",
+                `k1` tinyint(4) null comment "",
+                `k2` smallint(6) null comment "",
+                `k3` int(11) null comment "",
+                `k4` bigint(20) null comment "",
+                `k5` decimal(9, 3) null comment "",
+                `k6` char(5) null comment "",
+                `k10` date null comment "",
+                `k11` datetime null comment "",
+                `k7` varchar(20) null comment "",
+                `k8` double max null comment "",
+                `k9` float sum null comment "",
+                `k12` string replace null comment "",
+                `k13` largeint(40) replace null comment ""
+            ) engine=olap
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = 
"3")
+            """
+        sql """
+            CREATE TABLE IF NOT EXISTS `test` (
+                `k0` boolean null comment "",
+                `k1` tinyint(4) null comment "",
+                `k2` smallint(6) null comment "",
+                `k3` int(11) null comment "",
+                `k4` bigint(20) null comment "",
+                `k5` decimal(9, 3) null comment "",
+                `k6` char(5) null comment "",
+                `k10` date null comment "",
+                `k11` datetime null comment "",
+                `k7` varchar(20) null comment "",
+                `k8` double max null comment "",
+                `k9` float sum null comment "",
+                `k12` string replace_if_not_null null comment "",
+                `k13` largeint(40) replace null comment ""
+            ) engine=olap
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = 
"3")
+            """
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        streamLoad {
+            table "baseall"
+            db "regression_test_fault_injection_p0"
+            set 'column_separator', ','
+            file "baseall.txt"
+        }
+
+        def load_with_injection = { injection, error_msg->
+            try {
+                GetDebugPoint().enableDebugPointForAllBEs(injection)
+                sql "insert into test select * from baseall where k1 <= 3"
+            } catch(Exception e) {
+                logger.info(e.getMessage())
+                assertTrue(e.getMessage().contains(error_msg))
+            } finally {
+                GetDebugPoint().disableDebugPointForAllBEs(injection)
+            }
+        }
+
+        // StreamSinkFileWriter appendv write segment failed one replica
+        // success
+        
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
 "sucess")
+        // StreamSinkFileWriter appendv write segment failed two replica
+        
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
 "replica num 1 < load required replica num 2")
+        // StreamSinkFileWriter appendv write segment failed all replica
+        
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
 "failed to write any replicas")
+
+        sql """ set enable_memtable_on_sink_node=false """
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to