This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f883d1acfaf [enhancement](regression) fault injection for 
segcompaction test (#25709)
f883d1acfaf is described below

commit f883d1acfaf569c93df5c5cfe34d0125a6248827
Author: zhengyu <[email protected]>
AuthorDate: Mon Oct 30 17:36:17 2023 +0800

    [enhancement](regression) fault injection for segcompaction test (#25709)
    
    1. generalized debug point facilities from docker suites for
       fault-injection/stubbing cases
    2. add segcompaction fault-injection cases for demonstration
    3. add -238 TOO_MANY_SEGMENTS fault-injection case for good
---
 be/src/olap/rowset/beta_rowset_writer.cpp          |   3 +
 be/src/olap/rowset/segcompaction.cpp               |   5 +
 .../test_segcompaction_fault_injection.out         |   7 +
 .../test_too_many_segments_fault_injection.out     |   3 +
 .../test_segcompaction_fault_injection.out         |   3 +
 .../org/apache/doris/regression/suite/Suite.groovy |  21 ++-
 .../doris/regression/suite/SuiteCluster.groovy     |  40 ++----
 .../apache/doris/regression/util/DebugPoint.groovy | 139 ++++++++++++++++++++
 regression-test/pipeline/p0/conf/be.conf           |   9 +-
 .../pipeline/p0/conf/regression-conf.groovy        |   2 +-
 .../test_segcompaction_fault_injection.groovy      | 143 +++++++++++++++++++++
 .../test_too_many_segments_fault_injection.groovy  | 124 ++++++++++++++++++
 .../test_min_load_replica_num_complicate.groovy    |   2 +-
 13 files changed, 458 insertions(+), 43 deletions(-)

diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index c4a8dbb568a..5ef14743492 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -49,6 +49,7 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet_schema.h"
 #include "runtime/thread_context.h"
+#include "util/debug_points.h"
 #include "util/slice.h"
 #include "util/time.h"
 #include "vec/columns/column.h"
@@ -677,6 +678,8 @@ Status 
BetaRowsetWriter::_create_segment_writer_for_segcompaction(
 
 Status BetaRowsetWriter::_check_segment_number_limit() {
     size_t total_segment_num = _num_segment - _segcompacted_point + 1 + 
_num_segcompacted;
+    
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
+                    { total_segment_num = dp->param("segnum", 1024); });
     if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
         return Status::Error<TOO_MANY_SEGMENTS>(
                 "too many segments in rowset. tablet_id:{}, rowset_id:{}, 
max:{}, _num_segment:{}, "
diff --git a/be/src/olap/rowset/segcompaction.cpp 
b/be/src/olap/rowset/segcompaction.cpp
index a1136385603..eabf0e830d3 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -58,6 +58,7 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet_schema.h"
 #include "runtime/thread_context.h"
+#include "util/debug_points.h"
 #include "util/mem_info.h"
 #include "util/time.h"
 #include "vec/olap/vertical_block_reader.h"
@@ -167,6 +168,7 @@ Status 
SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
         }
     }
 
+    
DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { 
sum_src_row++; });
     if (raw_rows_read != sum_src_row) {
         return Status::Error<CHECK_LINES_ERROR>(
                 "segcompaction read row num does not match source. expect read 
row:{}, actual read "
@@ -174,12 +176,15 @@ Status 
SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
                 sum_src_row, raw_rows_read);
     }
 
+    
DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { 
merged_rows++; });
     if ((output_rows + merged_rows) != raw_rows_read) {
         return Status::Error<CHECK_LINES_ERROR>(
                 "segcompaction total row num does not match after merge. 
expect total row:{},  "
                 "actual total row:{}, (output_rows:{},merged_rows:{})",
                 raw_rows_read, output_rows + merged_rows, output_rows, 
merged_rows);
     }
+    
DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows",
+                    { filtered_rows++; });
     if (filtered_rows != 0) {
         return Status::Error<CHECK_LINES_ERROR>(
                 "segcompaction should not have filtered rows but actual 
filtered rows:{}",
diff --git 
a/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out
 
b/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out
new file mode 100644
index 00000000000..e25bd2c3f92
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_default --
+
+-- !select_default --
+
+-- !select_default --
+
diff --git 
a/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out
 
b/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out
new file mode 100644
index 00000000000..afeab4c41d0
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out
@@ -0,0 +1,3 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_default --
+
diff --git 
a/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out 
b/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out
new file mode 100644
index 00000000000..afeab4c41d0
--- /dev/null
+++ 
b/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out
@@ -0,0 +1,3 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_default --
+
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 5065f0e3e39..11e41e9bcf8 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -38,6 +38,7 @@ import org.apache.doris.regression.action.HttpCliAction
 import org.apache.doris.regression.util.JdbcUtils
 import org.apache.doris.regression.util.Hdfs
 import org.apache.doris.regression.util.SuiteUtils
+import org.apache.doris.regression.util.DebugPoint
 import org.junit.jupiter.api.Assertions
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
@@ -69,12 +70,14 @@ class Suite implements GroovyInterceptable {
     final List<Future> lazyCheckFutures = new Vector<>()
 
     SuiteCluster cluster
+    DebugPoint debugPoint
 
     Suite(String name, String group, SuiteContext context) {
         this.name = name
         this.group = group
         this.context = context
         this.cluster = null
+        this.debugPoint = new DebugPoint(this)
     }
 
     String getConf(String key, String defaultValue = null) {
@@ -476,7 +479,7 @@ class Suite implements GroovyInterceptable {
         String s3Url = "http://${s3BucketName}.${s3Endpoint}";
         return s3Url
     }
-    
+
     void scpFiles(String username, String host, String files, String filePath, 
boolean fromDst=true) {
         String cmd = "scp -r ${username}@${host}:${files} ${filePath}"
         if (!fromDst) {
@@ -487,7 +490,7 @@ class Suite implements GroovyInterceptable {
         def code = process.waitFor()
         Assert.assertEquals(0, code)
     }
-    
+
     void sshExec(String username, String host, String cmd) {
         String command = "ssh ${username}@${host} '${cmd}'"
         def cmds = ["/bin/bash", "-c", command]
@@ -499,7 +502,7 @@ class Suite implements GroovyInterceptable {
         assert errMsg.length() == 0: "error occurred!" + errMsg
         assert p.exitValue() == 0
     }
-    
+
 
     void getBackendIpHttpPort(Map<String, String> backendId_to_backendIP, 
Map<String, String> backendId_to_backendHttpPort) {
         List<List<Object>> backends = sql("show backends");
@@ -509,7 +512,7 @@ class Suite implements GroovyInterceptable {
             backendId_to_backendHttpPort.put(String.valueOf(backend[0]), 
String.valueOf(backend[4]));
         }
         return;
-    } 
+    }
 
     int getTotalLine(String filePath) {
         def file = new File(filePath)
@@ -693,14 +696,14 @@ class Suite implements GroovyInterceptable {
             String cleanedSqlStr = sql.replaceAll("\\s*;\\s*\$", "")
             sql = cleanedSqlStr
         }
-        quickRunTest(tag, sql, isOrder) 
+        quickRunTest(tag, sql, isOrder)
     }
 
     void quickExecute(String tag, PreparedStatement stmt) {
         logger.info("Execute tag: ${tag}, sql: ${stmt}".toString())
-        quickRunTest(tag, stmt) 
+        quickRunTest(tag, stmt)
     }
-    
+
     @Override
     Object invokeMethod(String name, Object args) {
         // qt: quick test
@@ -761,5 +764,9 @@ class Suite implements GroovyInterceptable {
         // set server side prepared statement url
         return "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + database + 
"?&useServerPrepStmts=true"
     }
+
+    DebugPoint GetDebugPoint() {
+        return debugPoint
+    }
 }
 
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index f6f5002365f..29935caf256 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -18,6 +18,8 @@ package org.apache.doris.regression.suite
 
 import org.apache.doris.regression.Config
 import org.apache.doris.regression.util.Http
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
 
 import com.google.common.collect.Maps
 import org.slf4j.Logger
@@ -62,13 +64,6 @@ class ListHeader {
 
 }
 
-enum NodeType {
-
-    FE,
-    BE,
-
-}
-
 class ServerNode {
 
     int index
@@ -83,38 +78,23 @@ class ServerNode {
         node.alive = fields.get(header.indexOf('alive')) == 'true'
     }
 
-    String getHttpAddress() {
-        return 'http://' + host + ':' + httpPort
+    def getHttpAddress() {
+        return [host, httpPort]
     }
 
     void enableDebugPoint(String name, Map<String, String> params = null) {
-        def url = getHttpAddress() + '/api/debug_point/add/' + name
-        if (params != null && params.size() > 0) {
-            url += '?' + params.collect((k, v) -> k + '=' + v).join('&')
-        }
-        def result = Http.http_post(url, null, true)
-        checkHttpResult(result)
+        def (host, port) = getHttpAddress()
+        DebugPoint.enableDebugPoint(host, port, getNodeType(), name, params)
     }
 
     void disableDebugPoint(String name) {
-        def url = getHttpAddress() + '/api/debug_point/remove/' + name
-        def result = Http.http_post(url, null, true)
-        checkHttpResult(result)
+        def (host, port) = getHttpAddress()
+        DebugPoint.disableDebugPoint(host, port, getNodeType(), name)
     }
 
     void clearDebugPoints() {
-        def url = getHttpAddress() + '/api/debug_point/clear'
-        def result = Http.http_post(url, null, true)
-        checkHttpResult(result)
-    }
-
-    private void checkHttpResult(Object result) {
-        def type = getNodeType()
-        if (type == NodeType.FE) {
-            assert result.code == 0 : result.toString()
-        } else if (type == NodeType.BE) {
-            assert result.status == 'OK' : result.toString()
-        }
+        def (host, port) = getHttpAddress()
+        DebugPoint.clearDebugPoints(host, port, getNodeType())
     }
 
     NodeType getNodeType() {
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
new file mode 100644
index 00000000000..c30f7fbbcbe
--- /dev/null
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.regression.util
+import org.apache.doris.regression.util.Http
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.suite.Suite
+
+enum NodeType {
+    FE,
+    BE,
+}
+
+class DebugPoint {
+    Suite suite
+
+    DebugPoint(Suite suite) {
+        this.suite = suite
+    }
+
+    /* Enable debug point in regression
+     * Note: set BE config::enable_debug_points = true to take effect
+     * Parameters:
+     *    host:        hostname or ip of target node
+     *    httpPort:    http port of target node
+     *    type:        NodeType.BE or NodeType.FE
+     *    name:        debug point name
+     *    params:      timeout, execute, or other customized input params
+     */
+    static def enableDebugPoint(String host, String httpPort, NodeType type, 
String name, Map<String, String> params = null) {
+        def url = 'http://' + host + ':' + httpPort + '/api/debug_point/add/' 
+ name
+        if (params != null && params.size() > 0) {
+            url += '?' + params.collect((k, v) -> k + '=' + v).join('&')
+        }
+        def result = Http.http_post(url, null, true)
+        checkHttpResult(result, type)
+    }
+
+    /* Disable debug point in regression
+     * Parameters:
+     *    host:        hostname or ip of target node
+     *    httpPort:    http port of target node
+     *    type:        NodeType.BE or NodeType.FE
+     *    name:        debug point name
+     */
+    static def disableDebugPoint(String host, String httpPort, NodeType type, 
String name) {
+        def url = 'http://' + host + ':' + httpPort + 
'/api/debug_point/remove/' + name
+        def result = Http.http_post(url, null, true)
+        checkHttpResult(result, type)
+    }
+
+    /* Disable all debug points in regression
+     * Parameters:
+     *    host:        hostname or ip of target node
+     *    httpPort:    http port of target node
+     *    type:        NodeType.BE or NodeType.FE
+     */
+    static def clearDebugPoints(String host, String httpPort, NodeType type) {
+        def url = 'http://' + host + ':' + httpPort + '/api/debug_point/clear'
+        def result = Http.http_post(url, null, true)
+        checkHttpResult(result, type)
+    }
+
+    def operateDebugPointForAllBEs(Closure closure) {
+        def ipList = [:]
+        def portList = [:]
+        (ipList, portList) = getBEHostAndHTTPPort()
+        ipList.each { beid, ip ->
+            closure.call(ip, portList[beid])
+        }
+    }
+
+    /* Enable specific debug point for all BE node in cluster */
+    def enableDebugPointForAllBEs(String name, Map<String, String> params = 
null) {
+        operateDebugPointForAllBEs({ host, port ->
+            println "enable debug point $name for BE $host:$port"
+            enableDebugPoint(host, port, NodeType.BE, name, params)
+        })
+    }
+
+    /* Disable specific debug point for all BE node in cluster */
+    def disableDebugPointForAllBEs(String name) {
+        operateDebugPointForAllBEs { host, port ->
+            disableDebugPoint(host, port, NodeType.BE, name)
+        }
+    }
+
+    /* Disable all debug points for all BE node in cluster */
+    def clearDebugPointsForAllBEs() {
+        operateDebugPointForAllBEs { host, port ->
+            clearDebugPoints(host, port, NodeType.BE)
+        }
+    }
+
+    def getBEHostAndHTTPPort() {
+        def ipList = [:]
+        def portList = [:]
+        suite.getBackendIpHttpPort(ipList, portList)
+        return [ipList, portList]
+    }
+
+    def getFEHostAndHTTPPort() {
+        assert false : 'not implemented yet'
+    }
+
+    def enableDebugPointForAllFEs(String name, Map<String, String> params = 
null) {
+        assert false : 'not implemented yet'
+    }
+
+    def disableDebugPointForAllFEs(String name) {
+        assert false : 'not implemented yet'
+    }
+
+    def clearDebugPointsForAllFEs() {
+        assert false : 'not implemented yet'
+    }
+
+    static void checkHttpResult(Object result, NodeType type) {
+        if (type == NodeType.FE) {
+            assert result.code == 0 : result.toString()
+        } else if (type == NodeType.BE) {
+            assert result.status == 'OK' : result.toString()
+        }
+    }
+}
+
diff --git a/regression-test/pipeline/p0/conf/be.conf 
b/regression-test/pipeline/p0/conf/be.conf
index f022ba2dec8..df9b08157f4 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -20,7 +20,7 @@ PPROF_TMPDIR="$DORIS_HOME/log/"
 # INFO, WARNING, ERROR, FATAL
 sys_log_level = INFO
 
-# ports for admin, web, heartbeat service 
+# ports for admin, web, heartbeat service
 be_port = 9161
 webserver_port = 8141
 heartbeat_service_port = 9151
@@ -36,7 +36,7 @@ buffer_pool_limit = 2%
 storage_page_cache_limit = 0%
 disable_storage_page_cache = true
 chunk_reserved_bytes_limit = 134217728
-# Choose one if there are more than one ip except loopback address. 
+# Choose one if there are more than one ip except loopback address.
 # Note that there should at most one ip match this list.
 # If no ip match this rule, will choose one randomly.
 # use CIDR format, e.g. 10.10.10.0/24
@@ -48,7 +48,7 @@ chunk_reserved_bytes_limit = 134217728
 # you can add capacity limit at the end of each root path, seperate by ','
 # eg:
 # /home/disk2/doris, capacity limit is disk capacity, HDD(default)
-# 
+#
 # you also can specify the properties by setting '<property>:<value>', 
seperate by ','
 # property 'medium' has a higher priority than the extension of path
 #
@@ -61,7 +61,7 @@ chunk_reserved_bytes_limit = 134217728
 # sys_log_verbose_modules = *
 log_buffer_level = -1
 enable_stream_load_record = true
-# palo_cgroups 
+# palo_cgroups
 
#storage_root_path=/mnt/hdd01/doris.SSD/NON_VEC_RELEASE;/mnt/hdd01/doris.HDD/NON_VEC_RELEASE;/mnt/hdd02/doris.SSD/NON_VEC_RELEASE;/mnt/hdd02/doris.HDD/NON_VEC_RELEASE;/mnt/hdd03/doris.SSD/NON_VEC_RELEASE;/mnt/hdd03/doris.HDD/NON_VEC_RELEASE;/mnt/hdd04/doris.SSD/NON_VEC_RELEASE;/mnt/hdd04/doris.HDD/NON_VEC_RELEASE;/mnt/hdd05/doris.SSD/NON_VEC_RELEASE;/mnt/hdd05/doris.HDD/NON_VEC_RELEASE;/mnt/hdd06/doris.SSD/NON_VEC_RELEASE;/mnt/hdd06/doris.HDD/NON_VEC_RELEASE;
 
 storage_root_path=/mnt/ssd01/cluster_storage/doris.SSD/P0/cluster1
@@ -75,3 +75,4 @@ enable_set_in_bitmap_value=true
 enable_feature_binlog=true
 max_sys_mem_available_low_water_mark_bytes=69206016
 user_files_secure_path=/
+enable_debug_points=true
diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy 
b/regression-test/pipeline/p0/conf/regression-conf.groovy
index fecda4db1fb..e0e7be6ed4c 100644
--- a/regression-test/pipeline/p0/conf/regression-conf.groovy
+++ b/regression-test/pipeline/p0/conf/regression-conf.groovy
@@ -58,7 +58,7 @@ excludeGroups = ""
 excludeSuites = 
"test_sql_block_rule,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter"
 
 // this directories will not be executed
-excludeDirectories = "workload_manager_p1"
+excludeDirectories = "workload_manager_p1,fault_injection_p0"
 
 customConf1 = "test_custom_conf_value"
 
diff --git 
a/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy
new file mode 100644
index 00000000000..2f601f13d11
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+def tableName = "segcompaction_correctness_test"
+def create_table_sql = """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` 
VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20),
+                `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` 
VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20),
+                `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` 
VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20),
+                `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` 
VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20),
+                `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` 
VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20),
+                `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` 
VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20),
+                `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` 
VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20),
+                `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` 
VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20),
+                `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` 
VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20),
+                `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` 
VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20)
+                )
+            DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1
+            PROPERTIES ( "replication_num" = "1" );
+        """
+def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, 
col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, 
col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, 
col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, 
col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, 
col_49"
+
+suite("test_segcompaction_correctness") {
+    def runLoadWithSegcompaction = {
+        String ak = getS3AK()
+        String sk = getS3SK()
+        String endpoint = getS3Endpoint()
+        String region = getS3Region()
+        String bucket = getS3BucketName()
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        String backend_id;
+        try {
+            getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+            backend_id = backendId_to_backendIP.keySet()[0]
+            def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+
+            logger.info("Show config: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+            def configList = parseJson(out.trim())
+            assert configList instanceof List
+
+            boolean disableAutoCompaction = true
+            for (Object ele in (List) configList) {
+                assert ele instanceof List<String>
+                if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                    disableAutoCompaction = 
Boolean.parseBoolean(((List<String>) ele)[2])
+                }
+            }
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+            sql "${create_table_sql}"
+
+            def uuid = UUID.randomUUID().toString().replace("-", "0")
+            String columns_str = ("$columns" != "") ? "($columns)" : "";
+
+            sql """
+            LOAD LABEL $uuid (
+                DATA 
INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc")
+                INTO TABLE ${tableName}
+                FORMAT AS "ORC"
+                $columns_str
+            )
+            WITH S3 (
+                "AWS_ACCESS_KEY" = "$ak",
+                "AWS_SECRET_KEY" = "$sk",
+                "AWS_ENDPOINT" = "$endpoint",
+                "AWS_REGION" = "$region"
+            )
+            properties(
+                "use_new_load_scan_node" = "true"
+            )
+            """
+
+            def max_try_milli_secs = 3600000
+            String [][] result = ''
+            while (max_try_milli_secs > 0) {
+                result = sql """ show load where label="$uuid" order by 
createtime desc limit 1; """
+                if (result[0][2].equals("FINISHED")) {
+                    logger.info("Load FINISHED " + " $uuid")
+                    break;
+                }
+                if (result[0][2].equals("CANCELLED")) {
+                    logger.info("Load CANCELLED " + " $uuid")
+                    break;
+                }
+                Thread.sleep(1000)
+                max_try_milli_secs -= 1000
+                if(max_try_milli_secs <= 0) {
+                    assertTrue(1 == 2, "load Timeout: $uuid")
+                }
+            }
+            assertTrue(result[0][2].equals("CANCELLED"))
+
+            result = sql """ show load where label="$uuid" order by createtime 
desc limit 1; """
+            qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47 
order by col_1, col_2; """
+            tablets = sql """ show tablets from ${tableName}; """
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${tableName}")
+        }
+    }
+
+    // wrong_sum_src_row
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_sum_src_row")
+        runLoadWithSegcompaction()
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_sum_src_row")
+    }
+
+    // wrong_merged_rows
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_merged_rows")
+        runLoadWithSegcompaction()
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_merged_rows")
+    }
+
+    // wrong_filtered_rows
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_filtered_rows")
+        runLoadWithSegcompaction()
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_filtered_rows")
+    }
+}
+
diff --git 
a/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy
new file mode 100644
index 00000000000..b68e324ee98
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+def tableName = "too_many_segments_test"
+def create_table_sql = """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` 
VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20),
+                `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` 
VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20),
+                `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` 
VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20),
+                `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` 
VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20),
+                `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` 
VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20),
+                `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` 
VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20),
+                `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` 
VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20),
+                `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` 
VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20),
+                `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` 
VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20),
+                `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` 
VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20)
+                )
+            DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1
+            PROPERTIES ( "replication_num" = "1" );
+        """
+def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, 
col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, 
col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, 
col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, 
col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, 
col_49"
+
+suite("test_too_many_segments") { // the epic -238 case
+    def runLoadWithTooManySegments = {
+        String ak = getS3AK()
+        String sk = getS3SK()
+        String endpoint = getS3Endpoint()
+        String region = getS3Region()
+        String bucket = getS3BucketName()
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        String backend_id;
+        try {
+            getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+            backend_id = backendId_to_backendIP.keySet()[0]
+            def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+
+            logger.info("Show config: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+            def configList = parseJson(out.trim())
+            assert configList instanceof List
+
+            boolean disableAutoCompaction = true
+            for (Object ele in (List) configList) {
+                assert ele instanceof List<String>
+                if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                    disableAutoCompaction = 
Boolean.parseBoolean(((List<String>) ele)[2])
+                }
+            }
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+            sql "${create_table_sql}"
+
+            def uuid = UUID.randomUUID().toString().replace("-", "0")
+            String columns_str = ("$columns" != "") ? "($columns)" : "";
+
+            sql """
+            LOAD LABEL $uuid (
+                DATA 
INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc")
+                INTO TABLE ${tableName}
+                FORMAT AS "ORC"
+                $columns_str
+            )
+            WITH S3 (
+                "AWS_ACCESS_KEY" = "$ak",
+                "AWS_SECRET_KEY" = "$sk",
+                "AWS_ENDPOINT" = "$endpoint",
+                "AWS_REGION" = "$region"
+            )
+            properties(
+                "use_new_load_scan_node" = "true"
+            )
+            """
+
+            Thread.sleep(2000)
+            
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
+                                                      ["segnum":2000])
+
+            def max_try_milli_secs = 3600000
+            String [][] result = ''
+            while (max_try_milli_secs > 0) {
+                result = sql """ show load where label="$uuid" order by 
createtime desc limit 1; """
+                if (result[0][2].equals("FINISHED")) {
+                    logger.info("Load FINISHED " + " $uuid")
+                    break;
+                }
+                if (result[0][2].equals("CANCELLED")) {
+                    logger.info("Load CANCELLED " + " $uuid")
+                    break;
+                }
+                Thread.sleep(1000)
+                max_try_milli_secs -= 1000
+                if(max_try_milli_secs <= 0) {
+                    assertTrue(1 == 2, "load Timeout: $uuid")
+                }
+            }
+            assertTrue(result[0][7].contains("-238")) // EPIC!
+
+            result = sql """ show load where label="$uuid" order by createtime 
desc limit 1; """
+            qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47 
order by col_1, col_2; """
+            tablets = sql """ show tablets from ${tableName}; """
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${tableName}")
+            
GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
+        }
+    }
+    runLoadWithTooManySegments()
+}
diff --git 
a/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy
 
b/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy
index 81fd2f2661f..8c3efa5aa59 100644
--- 
a/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy
+++ 
b/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy
@@ -16,7 +16,7 @@
 // under the License.
 
 import org.apache.doris.regression.suite.ClusterOptions
-import org.apache.doris.regression.suite.NodeType
+import org.apache.doris.regression.util.NodeType
 import org.apache.doris.regression.suite.SuiteCluster
 
 class InjectCase {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to