This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f883d1acfaf [enhancement](regression) fault injection for
segcompaction test (#25709)
f883d1acfaf is described below
commit f883d1acfaf569c93df5c5cfe34d0125a6248827
Author: zhengyu <[email protected]>
AuthorDate: Mon Oct 30 17:36:17 2023 +0800
[enhancement](regression) fault injection for segcompaction test (#25709)
1. generalized debug point facilities from docker suites for
fault-injection/stubbing cases
2. add segcompaction fault-injection cases for demonstration
3. add -238 TOO_MANY_SEGMENTS fault-injection case for good
---
be/src/olap/rowset/beta_rowset_writer.cpp | 3 +
be/src/olap/rowset/segcompaction.cpp | 5 +
.../test_segcompaction_fault_injection.out | 7 +
.../test_too_many_segments_fault_injection.out | 3 +
.../test_segcompaction_fault_injection.out | 3 +
.../org/apache/doris/regression/suite/Suite.groovy | 21 ++-
.../doris/regression/suite/SuiteCluster.groovy | 40 ++----
.../apache/doris/regression/util/DebugPoint.groovy | 139 ++++++++++++++++++++
regression-test/pipeline/p0/conf/be.conf | 9 +-
.../pipeline/p0/conf/regression-conf.groovy | 2 +-
.../test_segcompaction_fault_injection.groovy | 143 +++++++++++++++++++++
.../test_too_many_segments_fault_injection.groovy | 124 ++++++++++++++++++
.../test_min_load_replica_num_complicate.groovy | 2 +-
13 files changed, 458 insertions(+), 43 deletions(-)
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index c4a8dbb568a..5ef14743492 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -49,6 +49,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/thread_context.h"
+#include "util/debug_points.h"
#include "util/slice.h"
#include "util/time.h"
#include "vec/columns/column.h"
@@ -677,6 +678,8 @@ Status
BetaRowsetWriter::_create_segment_writer_for_segcompaction(
Status BetaRowsetWriter::_check_segment_number_limit() {
size_t total_segment_num = _num_segment - _segcompacted_point + 1 +
_num_segcompacted;
+
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
+ { total_segment_num = dp->param("segnum", 1024); });
if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
return Status::Error<TOO_MANY_SEGMENTS>(
"too many segments in rowset. tablet_id:{}, rowset_id:{},
max:{}, _num_segment:{}, "
diff --git a/be/src/olap/rowset/segcompaction.cpp
b/be/src/olap/rowset/segcompaction.cpp
index a1136385603..eabf0e830d3 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -58,6 +58,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/thread_context.h"
+#include "util/debug_points.h"
#include "util/mem_info.h"
#include "util/time.h"
#include "vec/olap/vertical_block_reader.h"
@@ -167,6 +168,7 @@ Status
SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
}
}
+
DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", {
sum_src_row++; });
if (raw_rows_read != sum_src_row) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction read row num does not match source. expect read
row:{}, actual read "
@@ -174,12 +176,15 @@ Status
SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
sum_src_row, raw_rows_read);
}
+
DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", {
merged_rows++; });
if ((output_rows + merged_rows) != raw_rows_read) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction total row num does not match after merge.
expect total row:{}, "
"actual total row:{}, (output_rows:{},merged_rows:{})",
raw_rows_read, output_rows + merged_rows, output_rows,
merged_rows);
}
+
DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows",
+ { filtered_rows++; });
if (filtered_rows != 0) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction should not have filtered rows but actual
filtered rows:{}",
diff --git
a/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out
b/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out
new file mode 100644
index 00000000000..e25bd2c3f92
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_default --
+
+-- !select_default --
+
+-- !select_default --
+
diff --git
a/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out
b/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out
new file mode 100644
index 00000000000..afeab4c41d0
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out
@@ -0,0 +1,3 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_default --
+
diff --git
a/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out
b/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out
new file mode 100644
index 00000000000..afeab4c41d0
--- /dev/null
+++
b/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out
@@ -0,0 +1,3 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_default --
+
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 5065f0e3e39..11e41e9bcf8 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -38,6 +38,7 @@ import org.apache.doris.regression.action.HttpCliAction
import org.apache.doris.regression.util.JdbcUtils
import org.apache.doris.regression.util.Hdfs
import org.apache.doris.regression.util.SuiteUtils
+import org.apache.doris.regression.util.DebugPoint
import org.junit.jupiter.api.Assertions
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -69,12 +70,14 @@ class Suite implements GroovyInterceptable {
final List<Future> lazyCheckFutures = new Vector<>()
SuiteCluster cluster
+ DebugPoint debugPoint
Suite(String name, String group, SuiteContext context) {
this.name = name
this.group = group
this.context = context
this.cluster = null
+ this.debugPoint = new DebugPoint(this)
}
String getConf(String key, String defaultValue = null) {
@@ -476,7 +479,7 @@ class Suite implements GroovyInterceptable {
String s3Url = "http://${s3BucketName}.${s3Endpoint}"
return s3Url
}
-
+
void scpFiles(String username, String host, String files, String filePath,
boolean fromDst=true) {
String cmd = "scp -r ${username}@${host}:${files} ${filePath}"
if (!fromDst) {
@@ -487,7 +490,7 @@ class Suite implements GroovyInterceptable {
def code = process.waitFor()
Assert.assertEquals(0, code)
}
-
+
void sshExec(String username, String host, String cmd) {
String command = "ssh ${username}@${host} '${cmd}'"
def cmds = ["/bin/bash", "-c", command]
@@ -499,7 +502,7 @@ class Suite implements GroovyInterceptable {
assert errMsg.length() == 0: "error occurred!" + errMsg
assert p.exitValue() == 0
}
-
+
void getBackendIpHttpPort(Map<String, String> backendId_to_backendIP,
Map<String, String> backendId_to_backendHttpPort) {
List<List<Object>> backends = sql("show backends");
@@ -509,7 +512,7 @@ class Suite implements GroovyInterceptable {
backendId_to_backendHttpPort.put(String.valueOf(backend[0]),
String.valueOf(backend[4]));
}
return;
- }
+ }
int getTotalLine(String filePath) {
def file = new File(filePath)
@@ -693,14 +696,14 @@ class Suite implements GroovyInterceptable {
String cleanedSqlStr = sql.replaceAll("\\s*;\\s*\$", "")
sql = cleanedSqlStr
}
- quickRunTest(tag, sql, isOrder)
+ quickRunTest(tag, sql, isOrder)
}
void quickExecute(String tag, PreparedStatement stmt) {
logger.info("Execute tag: ${tag}, sql: ${stmt}".toString())
- quickRunTest(tag, stmt)
+ quickRunTest(tag, stmt)
}
-
+
@Override
Object invokeMethod(String name, Object args) {
// qt: quick test
@@ -761,5 +764,9 @@ class Suite implements GroovyInterceptable {
// set server side prepared statement url
return "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + database +
"?&useServerPrepStmts=true"
}
+
+ DebugPoint GetDebugPoint() {
+ return debugPoint
+ }
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index f6f5002365f..29935caf256 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -18,6 +18,8 @@ package org.apache.doris.regression.suite
import org.apache.doris.regression.Config
import org.apache.doris.regression.util.Http
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
import com.google.common.collect.Maps
import org.slf4j.Logger
@@ -62,13 +64,6 @@ class ListHeader {
}
-enum NodeType {
-
- FE,
- BE,
-
-}
-
class ServerNode {
int index
@@ -83,38 +78,23 @@ class ServerNode {
node.alive = fields.get(header.indexOf('alive')) == 'true'
}
- String getHttpAddress() {
- return 'http://' + host + ':' + httpPort
+ def getHttpAddress() {
+ return [host, httpPort]
}
void enableDebugPoint(String name, Map<String, String> params = null) {
- def url = getHttpAddress() + '/api/debug_point/add/' + name
- if (params != null && params.size() > 0) {
- url += '?' + params.collect((k, v) -> k + '=' + v).join('&')
- }
- def result = Http.http_post(url, null, true)
- checkHttpResult(result)
+ def (host, port) = getHttpAddress()
+ DebugPoint.enableDebugPoint(host, port, getNodeType(), name, params)
}
void disableDebugPoint(String name) {
- def url = getHttpAddress() + '/api/debug_point/remove/' + name
- def result = Http.http_post(url, null, true)
- checkHttpResult(result)
+ def (host, port) = getHttpAddress()
+ DebugPoint.disableDebugPoint(host, port, getNodeType(), name)
}
void clearDebugPoints() {
- def url = getHttpAddress() + '/api/debug_point/clear'
- def result = Http.http_post(url, null, true)
- checkHttpResult(result)
- }
-
- private void checkHttpResult(Object result) {
- def type = getNodeType()
- if (type == NodeType.FE) {
- assert result.code == 0 : result.toString()
- } else if (type == NodeType.BE) {
- assert result.status == 'OK' : result.toString()
- }
+ def (host, port) = getHttpAddress()
+ DebugPoint.clearDebugPoints(host, port, getNodeType())
}
NodeType getNodeType() {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
new file mode 100644
index 00000000000..c30f7fbbcbe
--- /dev/null
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.regression.util
+import org.apache.doris.regression.util.Http
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.suite.Suite
+
+enum NodeType {
+ FE,
+ BE,
+}
+
+class DebugPoint {
+ Suite suite
+
+ DebugPoint(Suite suite) {
+ this.suite = suite
+ }
+
+ /* Enable debug point in regression
+ * Note: set BE config::enable_debug_points = true to take effect
+ * Parameters:
+ * host: hostname or ip of target node
+ * httpPort: http port of target node
+ * type: NodeType.BE or NodeType.FE
+ * name: debug point name
+ * params: timeout, execute, or other customized input params
+ */
+ static def enableDebugPoint(String host, String httpPort, NodeType type,
String name, Map<String, String> params = null) {
+ def url = 'http://' + host + ':' + httpPort + '/api/debug_point/add/'
+ name
+ if (params != null && params.size() > 0) {
+ url += '?' + params.collect((k, v) -> k + '=' + v).join('&')
+ }
+ def result = Http.http_post(url, null, true)
+ checkHttpResult(result, type)
+ }
+
+ /* Disable debug point in regression
+ * Parameters:
+ * host: hostname or ip of target node
+ * httpPort: http port of target node
+ * type: NodeType.BE or NodeType.FE
+ * name: debug point name
+ */
+ static def disableDebugPoint(String host, String httpPort, NodeType type,
String name) {
+ def url = 'http://' + host + ':' + httpPort +
'/api/debug_point/remove/' + name
+ def result = Http.http_post(url, null, true)
+ checkHttpResult(result, type)
+ }
+
+ /* Disable all debug points in regression
+ * Parameters:
+ * host: hostname or ip of target node
+ * httpPort: http port of target node
+ * type: NodeType.BE or NodeType.FE
+ */
+ static def clearDebugPoints(String host, String httpPort, NodeType type) {
+ def url = 'http://' + host + ':' + httpPort + '/api/debug_point/clear'
+ def result = Http.http_post(url, null, true)
+ checkHttpResult(result, type)
+ }
+
+ def operateDebugPointForAllBEs(Closure closure) {
+ def ipList = [:]
+ def portList = [:]
+ (ipList, portList) = getBEHostAndHTTPPort()
+ ipList.each { beid, ip ->
+ closure.call(ip, portList[beid])
+ }
+ }
+
+ /* Enable specific debug point for all BE node in cluster */
+ def enableDebugPointForAllBEs(String name, Map<String, String> params =
null) {
+ operateDebugPointForAllBEs({ host, port ->
+ println "enable debug point $name for BE $host:$port"
+ enableDebugPoint(host, port, NodeType.BE, name, params)
+ })
+ }
+
+ /* Disable specific debug point for all BE node in cluster */
+ def disableDebugPointForAllBEs(String name) {
+ operateDebugPointForAllBEs { host, port ->
+ disableDebugPoint(host, port, NodeType.BE, name)
+ }
+ }
+
+ /* Disable all debug points for all BE node in cluster */
+ def clearDebugPointsForAllBEs() {
+ operateDebugPointForAllBEs { host, port ->
+ clearDebugPoints(host, port, NodeType.BE)
+ }
+ }
+
+ def getBEHostAndHTTPPort() {
+ def ipList = [:]
+ def portList = [:]
+ suite.getBackendIpHttpPort(ipList, portList)
+ return [ipList, portList]
+ }
+
+ def getFEHostAndHTTPPort() {
+ assert false : 'not implemented yet'
+ }
+
+ def enableDebugPointForAllFEs(String name, Map<String, String> params =
null) {
+ assert false : 'not implemented yet'
+ }
+
+ def disableDebugPointForAllFEs(String name) {
+ assert false : 'not implemented yet'
+ }
+
+ def clearDebugPointsForAllFEs() {
+ assert false : 'not implemented yet'
+ }
+
+ static void checkHttpResult(Object result, NodeType type) {
+ if (type == NodeType.FE) {
+ assert result.code == 0 : result.toString()
+ } else if (type == NodeType.BE) {
+ assert result.status == 'OK' : result.toString()
+ }
+ }
+}
+
diff --git a/regression-test/pipeline/p0/conf/be.conf
b/regression-test/pipeline/p0/conf/be.conf
index f022ba2dec8..df9b08157f4 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -20,7 +20,7 @@ PPROF_TMPDIR="$DORIS_HOME/log/"
# INFO, WARNING, ERROR, FATAL
sys_log_level = INFO
-# ports for admin, web, heartbeat service
+# ports for admin, web, heartbeat service
be_port = 9161
webserver_port = 8141
heartbeat_service_port = 9151
@@ -36,7 +36,7 @@ buffer_pool_limit = 2%
storage_page_cache_limit = 0%
disable_storage_page_cache = true
chunk_reserved_bytes_limit = 134217728
-# Choose one if there are more than one ip except loopback address.
+# Choose one if there are more than one ip except loopback address.
# Note that there should at most one ip match this list.
# If no ip match this rule, will choose one randomly.
# use CIDR format, e.g. 10.10.10.0/24
@@ -48,7 +48,7 @@ chunk_reserved_bytes_limit = 134217728
# you can add capacity limit at the end of each root path, seperate by ','
# eg:
# /home/disk2/doris, capacity limit is disk capacity, HDD(default)
-#
+#
# you also can specify the properties by setting '<property>:<value>',
seperate by ','
# property 'medium' has a higher priority than the extension of path
#
@@ -61,7 +61,7 @@ chunk_reserved_bytes_limit = 134217728
# sys_log_verbose_modules = *
log_buffer_level = -1
enable_stream_load_record = true
-# palo_cgroups
+# palo_cgroups
#storage_root_path=/mnt/hdd01/doris.SSD/NON_VEC_RELEASE;/mnt/hdd01/doris.HDD/NON_VEC_RELEASE;/mnt/hdd02/doris.SSD/NON_VEC_RELEASE;/mnt/hdd02/doris.HDD/NON_VEC_RELEASE;/mnt/hdd03/doris.SSD/NON_VEC_RELEASE;/mnt/hdd03/doris.HDD/NON_VEC_RELEASE;/mnt/hdd04/doris.SSD/NON_VEC_RELEASE;/mnt/hdd04/doris.HDD/NON_VEC_RELEASE;/mnt/hdd05/doris.SSD/NON_VEC_RELEASE;/mnt/hdd05/doris.HDD/NON_VEC_RELEASE;/mnt/hdd06/doris.SSD/NON_VEC_RELEASE;/mnt/hdd06/doris.HDD/NON_VEC_RELEASE;
storage_root_path=/mnt/ssd01/cluster_storage/doris.SSD/P0/cluster1
@@ -75,3 +75,4 @@ enable_set_in_bitmap_value=true
enable_feature_binlog=true
max_sys_mem_available_low_water_mark_bytes=69206016
user_files_secure_path=/
+enable_debug_points=true
diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy
b/regression-test/pipeline/p0/conf/regression-conf.groovy
index fecda4db1fb..e0e7be6ed4c 100644
--- a/regression-test/pipeline/p0/conf/regression-conf.groovy
+++ b/regression-test/pipeline/p0/conf/regression-conf.groovy
@@ -58,7 +58,7 @@ excludeGroups = ""
excludeSuites =
"test_sql_block_rule,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter"
// this directories will not be executed
-excludeDirectories = "workload_manager_p1"
+excludeDirectories = "workload_manager_p1,fault_injection_p0"
customConf1 = "test_custom_conf_value"
diff --git
a/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy
new file mode 100644
index 00000000000..2f601f13d11
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+def tableName = "segcompaction_correctness_test"
+def create_table_sql = """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2`
VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20),
+ `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7`
VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20),
+ `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12`
VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20),
+ `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17`
VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20),
+ `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22`
VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20),
+ `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27`
VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20),
+ `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32`
VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20),
+ `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37`
VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20),
+ `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42`
VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20),
+ `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47`
VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20)
+ )
+ DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "1" );
+ """
+def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8,
col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18,
col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28,
col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38,
col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48,
col_49"
+
+suite("test_segcompaction_correctness") {
+ def runLoadWithSegcompaction = {
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = getS3BucketName()
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ String backend_id;
+ try {
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ boolean disableAutoCompaction = true
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "disable_auto_compaction") {
+ disableAutoCompaction =
Boolean.parseBoolean(((List<String>) ele)[2])
+ }
+ }
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql "${create_table_sql}"
+
+ def uuid = UUID.randomUUID().toString().replace("-", "0")
+ String columns_str = ("$columns" != "") ? "($columns)" : "";
+
+ sql """
+ LOAD LABEL $uuid (
+ DATA
INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc")
+ INTO TABLE ${tableName}
+ FORMAT AS "ORC"
+ $columns_str
+ )
+ WITH S3 (
+ "AWS_ACCESS_KEY" = "$ak",
+ "AWS_SECRET_KEY" = "$sk",
+ "AWS_ENDPOINT" = "$endpoint",
+ "AWS_REGION" = "$region"
+ )
+ properties(
+ "use_new_load_scan_node" = "true"
+ )
+ """
+
+ def max_try_milli_secs = 3600000
+ String [][] result = ''
+ while (max_try_milli_secs > 0) {
+ result = sql """ show load where label="$uuid" order by
createtime desc limit 1; """
+ if (result[0][2].equals("FINISHED")) {
+ logger.info("Load FINISHED " + " $uuid")
+ break;
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ logger.info("Load CANCELLED " + " $uuid")
+ break;
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertTrue(1 == 2, "load Timeout: $uuid")
+ }
+ }
+ assertTrue(result[0][2].equals("CANCELLED"))
+
+ result = sql """ show load where label="$uuid" order by createtime
desc limit 1; """
+ qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47
order by col_1, col_2; """
+ tablets = sql """ show tablets from ${tableName}; """
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${tableName}")
+ }
+ }
+
+ // wrong_sum_src_row
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_sum_src_row")
+ runLoadWithSegcompaction()
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_sum_src_row")
+ }
+
+ // wrong_merged_rows
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_merged_rows")
+ runLoadWithSegcompaction()
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_merged_rows")
+ }
+
+ // wrong_filtered_rows
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_filtered_rows")
+ runLoadWithSegcompaction()
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_filtered_rows")
+ }
+}
+
diff --git
a/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy
new file mode 100644
index 00000000000..b68e324ee98
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+def tableName = "too_many_segments_test"
+def create_table_sql = """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2`
VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20),
+ `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7`
VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20),
+ `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12`
VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20),
+ `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17`
VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20),
+ `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22`
VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20),
+ `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27`
VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20),
+ `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32`
VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20),
+ `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37`
VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20),
+ `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42`
VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20),
+ `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47`
VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20)
+ )
+ DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1
+ PROPERTIES ( "replication_num" = "1" );
+ """
+def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8,
col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18,
col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28,
col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38,
col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48,
col_49"
+
+suite("test_too_many_segments") { // the epic -238 case
+ def runLoadWithTooManySegments = {
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = getS3BucketName()
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ String backend_id;
+ try {
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ boolean disableAutoCompaction = true
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "disable_auto_compaction") {
+ disableAutoCompaction =
Boolean.parseBoolean(((List<String>) ele)[2])
+ }
+ }
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql "${create_table_sql}"
+
+ def uuid = UUID.randomUUID().toString().replace("-", "0")
+ String columns_str = ("$columns" != "") ? "($columns)" : "";
+
+ sql """
+ LOAD LABEL $uuid (
+ DATA
INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc")
+ INTO TABLE ${tableName}
+ FORMAT AS "ORC"
+ $columns_str
+ )
+ WITH S3 (
+ "AWS_ACCESS_KEY" = "$ak",
+ "AWS_SECRET_KEY" = "$sk",
+ "AWS_ENDPOINT" = "$endpoint",
+ "AWS_REGION" = "$region"
+ )
+ properties(
+ "use_new_load_scan_node" = "true"
+ )
+ """
+
+ Thread.sleep(2000)
+
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
+ ["segnum":2000])
+
+ def max_try_milli_secs = 3600000
+ String [][] result = ''
+ while (max_try_milli_secs > 0) {
+ result = sql """ show load where label="$uuid" order by
createtime desc limit 1; """
+ if (result[0][2].equals("FINISHED")) {
+ logger.info("Load FINISHED " + " $uuid")
+ break;
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ logger.info("Load CANCELLED " + " $uuid")
+ break;
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertTrue(1 == 2, "load Timeout: $uuid")
+ }
+ }
+ assertTrue(result[0][7].contains("-238")) // EPIC!
+
+ result = sql """ show load where label="$uuid" order by createtime
desc limit 1; """
+ qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47
order by col_1, col_2; """
+ tablets = sql """ show tablets from ${tableName}; """
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${tableName}")
+
GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
+ }
+ }
+ runLoadWithTooManySegments()
+}
diff --git
a/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy
b/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy
index 81fd2f2661f..8c3efa5aa59 100644
---
a/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy
+++
b/regression-test/suites/load/insert/test_min_load_replica_num_complicate.groovy
@@ -16,7 +16,7 @@
// under the License.
import org.apache.doris.regression.suite.ClusterOptions
-import org.apache.doris.regression.suite.NodeType
+import org.apache.doris.regression.util.NodeType
import org.apache.doris.regression.suite.SuiteCluster
class InjectCase {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]