This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new ce5425cc0ef branch-3.1: [improve](cloud-mow)Add more delete bitmap
lock case #49204 (#52006)
ce5425cc0ef is described below
commit ce5425cc0efbc3e213544277f22fa8e68feb0aad
Author: meiyi <[email protected]>
AuthorDate: Fri Jun 20 13:30:48 2025 +0800
branch-3.1: [improve](cloud-mow)Add more delete bitmap lock case #49204
(#52006)
pick from master #49204
Co-authored-by: huanghaibin <[email protected]>
---
.../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 6 +-
be/src/cloud/cloud_meta_mgr.cpp | 19 +-
be/src/cloud/cloud_txn_delete_bitmap_cache.cpp | 5 +
be/src/cloud/config.cpp | 2 +
be/src/cloud/config.h | 2 +
... => test_cloud_mow_delete_bitmap_lock_case.out} | Bin 446 -> 461 bytes
.../test_cloud_mow_delete_bitmap_lock_case.groovy | 784 +++++++++++++++++++++
...t_cloud_mow_stream_load_with_commit_fail.groovy | 429 -----------
.../test_delete_bitmap_lock_with_restart.groovy | 314 +++++++++
9 files changed, 1129 insertions(+), 432 deletions(-)
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index dc8ecaf26b4..6880e9733f5 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -71,7 +71,11 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
std::unique_ptr<ThreadPoolToken> token =
_engine.calc_tablet_delete_bitmap_task_thread_pool().new_token(
ThreadPool::ExecutionMode::CONCURRENT);
- DBUG_EXECUTE_IF("CloudEngineCalcDeleteBitmapTask.execute.enable_wait", {
sleep(3); });
+ DBUG_EXECUTE_IF("CloudEngineCalcDeleteBitmapTask.execute.enable_wait", {
+ auto sleep_time =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "CloudEngineCalcDeleteBitmapTask.execute.enable_wait",
"sleep_time", 3);
+ sleep(sleep_time);
+ });
for (const auto& partition : _cal_delete_bitmap_req.partitions) {
int64_t version = partition.version;
bool has_compaction_stats = partition.__isset.base_compaction_cnts &&
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index c5d70342bf0..92bc3770c27 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1304,9 +1304,12 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const
CloudTablet& tablet, in
std::default_random_engine rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(500, 2000);
do {
+ bool test_conflict = false;
st = retry_rpc("get delete bitmap update lock", req, &res,
&MetaService_Stub::get_delete_bitmap_update_lock);
- if (res.status().code() != MetaServiceCode::LOCK_CONFLICT) {
+
DBUG_EXECUTE_IF("CloudMetaMgr::test_get_delete_bitmap_update_lock_conflict",
+ { test_conflict = true; });
+ if (!test_conflict && res.status().code() !=
MetaServiceCode::LOCK_CONFLICT) {
break;
}
@@ -1315,7 +1318,19 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const
CloudTablet& tablet, in
<< " retry_times=" << retry_times << " sleep=" <<
duration_ms
<< "ms : " << res.status().msg();
bthread_usleep(duration_ms * 1000);
- } while (++retry_times <= 100);
+ } while (++retry_times <= config::get_delete_bitmap_lock_max_retry_times);
+ DBUG_EXECUTE_IF("CloudMetaMgr.get_delete_bitmap_update_lock.inject_sleep",
{
+ auto p = dp->param("percent", 0.01);
+ // 100s > Config.calculate_delete_bitmap_task_timeout_seconds = 60s
+ auto sleep_time = dp->param("sleep", 15);
+ std::mt19937 gen {std::random_device {}()};
+ std::bernoulli_distribution inject_fault {p};
+ if (inject_fault(gen)) {
+ LOG_INFO("injection sleep for {} seconds, tablet_id={}",
sleep_time,
+ tablet.tablet_id());
+ std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
+ }
+ });
if (res.status().code() ==
MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"txn conflict when get delete bitmap update lock, table_id {},
lock_id {}, "
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
index 681cbbc8bf0..a0f3b201429 100644
--- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
@@ -60,6 +60,11 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
{
std::shared_lock<std::shared_mutex> rlock(_rwlock);
TxnKey key(transaction_id, tablet_id);
+
DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache.get_tablet_txn_info.not_found", {
+ return Status::Error<ErrorCode::NOT_FOUND>(
+ "not found txn info for test, tablet_id={},
transaction_id={}", tablet_id,
+ transaction_id);
+ });
auto iter = _txn_map.find(key);
if (iter == _txn_map.end()) {
return Status::Error<ErrorCode::NOT_FOUND, false>(
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index 7f26988a68b..bc5c90e6e94 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -70,6 +70,8 @@ DEFINE_mBool(enable_new_tablet_do_compaction, "true");
DEFINE_mInt32(delete_bitmap_lock_expiration_seconds, "10");
+DEFINE_mInt32(get_delete_bitmap_lock_max_retry_times, "100");
+
DEFINE_Bool(enable_cloud_txn_lazy_commit, "false");
DEFINE_mInt32(remove_expired_tablet_txn_info_interval_seconds, "300");
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 9d2027b965b..9e724082c9f 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -105,6 +105,8 @@ DECLARE_mInt32(sync_load_for_tablets_thread);
DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);
+DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times);
+
// enable large txn lazy commit in meta-service `commit_txn`
DECLARE_mBool(enable_cloud_txn_lazy_commit);
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_delete_bitmap_lock_case.out
similarity index 93%
rename from
regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out
rename to
regression-test/data/fault_injection_p0/cloud/test_cloud_mow_delete_bitmap_lock_case.out
index 115d5523e12..e752a2fae45 100644
Binary files
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out
and
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_delete_bitmap_lock_case.out
differ
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_delete_bitmap_lock_case.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_delete_bitmap_lock_case.groovy
new file mode 100644
index 00000000000..37a46dc7814
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_delete_bitmap_lock_case.groovy
@@ -0,0 +1,784 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_cloud_mow_delete_bitmap_lock_case", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+ GetDebugPoint().clearDebugPointsForAllFEs()
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_params = [string: [:]]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_param = { paramName, paramValue ->
+ // for eache be node, set paramName=paramValue
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def reset_be_param = { paramName ->
+ // for eache be node, reset paramName to default
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def original_value = backendId_to_params.get(id).get(paramName)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
original_value))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def get_be_param = { paramName ->
+ // for eache be node, get param value by default
+ def paramValue = ""
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ // get the config value from be
+ def (code, out, err) = curl("GET",
String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort,
paramName))
+ assertTrue(code == 0)
+ assertTrue(out.contains(paramName))
+ // parsing
+ def resultList = parseJson(out)[0]
+ assertTrue(resultList.size() == 4)
+ // get original value
+ paramValue = resultList[2]
+ backendId_to_params.get(id, [:]).put(paramName, paramValue)
+ }
+ }
+
+ def customFeConfig1 = [calculate_delete_bitmap_task_timeout_seconds: 2,
meta_service_rpc_retry_times: 5]
+ def customFeConfig2 = [delete_bitmap_lock_expiration_seconds: 2,
meta_service_rpc_retry_times: 5]
+ def customFeConfig3 = [mow_calculate_delete_bitmap_retry_times: 1]
+ def customFeConfig4 = [calculate_delete_bitmap_task_timeout_seconds: 2,
mow_calculate_delete_bitmap_retry_times: 1]
+ def customFeConfig5 = [meta_service_rpc_retry_times: 5]
+ def tableName = "tbl_basic"
+ String[][] backends = sql """ show backends """
+ assertTrue(backends.size() > 0)
+ String backendId;
+ def backendIdToBackendIP = [:]
+ def backendIdToBackendBrpcPort = [:]
+ for (String[] backend in backends) {
+ if (backend[9].equals("true")) {
+ backendIdToBackendIP.put(backend[0], backend[1])
+ backendIdToBackendBrpcPort.put(backend[0], backend[5])
+ }
+ }
+
+ backendId = backendIdToBackendIP.keySet()[0]
+ def getMetricsMethod = { check_func ->
+ httpTest {
+ endpoint backendIdToBackendIP.get(backendId) + ":" +
backendIdToBackendBrpcPort.get(backendId)
+ uri "/brpc_metrics"
+ op "get"
+ check check_func
+ }
+ }
+
+ int total_retry = 0;
+ int last_total_retry = -1;
+
+ def getTotalRetry = {
+ getMetricsMethod.call() { respCode, body ->
+ logger.info("get total retry resp Code {}",
"${respCode}".toString())
+ assertEquals("${respCode}".toString(), "200")
+ String out = "${body}".toString()
+ def strs = out.split('\n')
+ for (String line in strs) {
+ if (line.startsWith("stream_load_commit_retry_counter")) {
+ logger.info("find: {}", line)
+ total_retry =
line.replaceAll("stream_load_commit_retry_counter ", "").toInteger()
+ if (last_total_retry < 0) {
+ last_total_retry = total_retry
+ }
+ break
+ }
+ }
+ }
+ }
+
+ def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id ->
+ if (compact_type == "cumulative") {
+ def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 +
", err=" + err_1)
+ assertEquals(code_1, 0)
+ return out_1
+ } else if (compact_type == "full") {
+ def (code_2, out_2, err_2) = be_run_full_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 +
", err=" + err_2)
+ assertEquals(code_2, 0)
+ return out_2
+ } else {
+ assertFalse(True)
+ }
+ }
+
+ def getTabletStatus = { be_host, be_http_port, tablet_id ->
+ boolean running = true
+ Thread.sleep(1000)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/show?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ code = process.waitFor()
+ out = process.getText()
+ logger.info("Get tablet status: =" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def tabletStatus = parseJson(out.trim())
+ return tabletStatus
+ }
+
+ def waitForCompaction = { be_host, be_http_port, tablet_id ->
+ boolean running = true
+ do {
+ Thread.sleep(100)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/run_status?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ code = process.waitFor()
+ out = process.getText()
+ logger.info("Get compaction status: code=" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+
+ def do_stream_load = {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ }
+
+ def do_insert_into = {
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "Emily",
25),(2, "Benjamin", 35);"""
+ }
+
+ def getAlterTableState = { table_name ->
+ waitForSchemaChangeDone {
+ sql """ SHOW ALTER TABLE COLUMN WHERE tablename='${table_name}'
ORDER BY createtime DESC LIMIT 1 """
+ time 600
+ }
+ return true
+ }
+
+ def waitForSC = {
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(100,
TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(() -> {
+ def res = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE
TableName='${tableName}' ORDER BY createtime DESC LIMIT 1"
+ assert res.size() == 1
+ if (res[0].State == "FINISHED" || res[0].State == "CANCELLED") {
+ return true;
+ }
+ return false;
+ });
+ }
+
+ try {
+ GetDebugPoint().enableDebugPointForAllFEs('FE.mow.check.lock.release',
null)
+ getTotalRetry.call()
+ log.info("last_total_retry:" + last_total_retry)
+ // store the original value
+ get_be_param("mow_stream_load_commit_retry_times")
+ set_be_param("mow_stream_load_commit_retry_times", "2")
+ // create table
+ sql """ drop table if exists ${tableName}; """
+
+ sql """
+ CREATE TABLE `${tableName}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(10) NULL,
+ `score` int(11) NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_num" = "1"
+ );
+ """
+ // 1.test normal load, lock is released normally, retry times is 0
+ // 1.1 first load success
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load0.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ }
+ qt_sql1 """ select * from ${tableName} order by id"""
+
+ getTotalRetry.call()
+ assertEquals(last_total_retry, total_retry)
+ // 1.2 second load success
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load1.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ qt_sql2 """ select * from ${tableName} order by id"""
+
+ getTotalRetry.call()
+ assertEquals(last_total_retry, total_retry)
+
+
+ //2. test commit fail, lock is released normally, will not retry
+ // 2.1 first load will fail on fe commit phase
+ GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception',
null)
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load2.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(json.Message.contains("FE.mow.commit.exception"))
+ }
+ }
+ qt_sql3 """ select * from ${tableName} order by id"""
+
+ // commit fail is not DELETE_BITMAP_LOCK_ERR will not retry
+ getTotalRetry.call()
+ assertEquals(last_total_retry, total_retry)
+
+ // 2.2 second load will success because of removing exception injection
+ GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load2.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ qt_sql4 """ select * from ${tableName} order by id"""
+ getTotalRetry.call()
+ assertEquals(last_total_retry, total_retry)
+
+ // 3. test update delete bitmap fail, lock is released normally, will
retry
+ setFeConfigTemporary(customFeConfig2) {
+ // 3.1 first load will fail on calculate delete bitmap timeout
+
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail")
+
+ def now = System.currentTimeMillis()
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load3.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(json.Message.contains("update delete bitmap
failed"))
+ }
+ }
+ def time_cost = System.currentTimeMillis() - now
+ getTotalRetry.call()
+ assertEquals(last_total_retry + 2, total_retry)
+ qt_sql5 """ select * from ${tableName} order by id"""
+
+ // 3.2 second load will success because of removing timeout
simulation
+
GetDebugPoint().disableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail")
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load3.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ getTotalRetry.call()
+ assertEquals(last_total_retry + 2, total_retry)
+ qt_sql6 """ select * from ${tableName} order by id"""
+ }
+
+ //4. test wait fe lock timeout, will retry
+ setFeConfigTemporary(customFeConfig1) {
+ get_be_param("txn_commit_rpc_timeout_ms")
+ set_be_param("txn_commit_rpc_timeout_ms", "10000")
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.tryCommitLock.timeout",
[sleep_time: 5])
+ // 4.1 first load will fail, because of waiting for fe lock timeout
+ def now = System.currentTimeMillis()
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load4.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(json.Message.contains("get table cloud commit
lock timeout"))
+ }
+ }
+ def time_cost = System.currentTimeMillis() - now
+ getTotalRetry.call()
+ assertEquals(last_total_retry + 4, total_retry)
+ assertTrue(time_cost > 10000, "wait time should bigger than total
retry interval")
+ qt_sql7 """ select * from ${tableName} order by id"""
+
+ // 4.2 second load will success because of removing timeout
simulation
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.tryCommitLock.timeout")
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load4.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ getTotalRetry.call()
+ assertEquals(last_total_retry + 4, total_retry)
+ qt_sql8 """ select * from ${tableName} order by id"""
+ reset_be_param("txn_commit_rpc_timeout_ms")
+ }
+ //5. test wait delete bitmap lock timeout, lock is released normally,
will retry
+
GetDebugPoint().enableDebugPointForAllFEs("FE.mow.get_delete_bitmap_lock.fail")
+ // 5.1 first load will fail, because of waiting for delete bitmap lock
timeout
+ setFeConfigTemporary(customFeConfig1) {
+ def now = System.currentTimeMillis()
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load5.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(json.Message.contains("test
get_delete_bitmap_lock fail"))
+ }
+ }
+ def time_cost = System.currentTimeMillis() - now
+ getTotalRetry.call()
+ assertEquals(last_total_retry + 6, total_retry)
+ qt_sql9 """ select * from ${tableName} order by id"""
+
+ // 5.2 second load will success because of removing timeout
simulation
+
GetDebugPoint().disableDebugPointForAllFEs("FE.mow.get_delete_bitmap_lock.fail")
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load5.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ getTotalRetry.call()
+ assertEquals(last_total_retry + 6, total_retry)
+ qt_sql10 """ select * from ${tableName} order by id"""
+ }
+
+ //6.test calculate delete bitmap task timeout, after retry, will
succeed
+ setFeConfigTemporary(customFeConfig1) {
+ // 6.1 first load will retry because of calculating delete bitmap
timeout, and finally succeed
+
GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+
+ def now = System.currentTimeMillis()
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load6.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ def time_cost = System.currentTimeMillis() - now
+ getTotalRetry.call()
+ assertEquals(last_total_retry + 7, total_retry)
+ assertTrue(time_cost > 2000, "wait time should bigger than total
retry interval")
+ qt_sql11 """ select * from ${tableName} order by id"""
+
+ // 6.2 second load will success and no need retry because of
removing timeout simulation
+
GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ getTotalRetry.call()
+ assertEquals(last_total_retry + 7, total_retry)
+ qt_sql12 """ select * from ${tableName} order by id"""
+ }
+
+ //7. test parallel load
+ GetDebugPoint().disableDebugPointForAllFEs('FE.mow.check.lock.release')
+ setFeConfigTemporary(customFeConfig2) {
+
GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ def threads = []
+ def now = System.currentTimeMillis()
+ for (int k = 0; k <= 1; k++) {
+ logger.info("start load thread:" + k)
+ threads.add(Thread.startDaemon {
+ do_stream_load()
+ })
+ }
+ for (Thread th in threads) {
+ th.join()
+ }
+ def time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost > 6000, "wait time should bigger than 6s")
+
+ threads = []
+ now = System.currentTimeMillis()
+ for (int k = 0; k <= 1; k++) {
+ logger.info("start insert into thread:" + k)
+ threads.add(Thread.startDaemon {
+ do_insert_into()
+ })
+ }
+ for (Thread th in threads) {
+ th.join()
+ }
+ time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost > 6000, "wait time should bigger than 6s")
+
GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+
+ }
+ //8. test insert into timeout config
+ setFeConfigTemporary(customFeConfig3) {
+ try {
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.tryCommitLock.timeout",
[sleep_time: 15])
+ sql """ set global insert_visible_timeout_ms=15000; """
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1,
"Emily", 25),(2, "Benjamin", 35);"""
+ } catch (Exception e) {
+ logger.info("failed: " + e.getMessage())
+ assertTrue(e.getMessage().contains("test get table cloud
commit lock timeout"))
+ } finally {
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.tryCommitLock.timeout")
+ sql """ set global insert_visible_timeout_ms=60000; """
+ }
+ }
+ setFeConfigTemporary(customFeConfig4) {
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1,
"Emily", 25),(2, "Benjamin", 35);"""
+ } catch (Exception e) {
+ logger.info("failed: " + e.getMessage())
+ assertTrue(e.getMessage().contains("Failed to calculate delete
bitmap. Timeout"))
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ }
+ }
+ setFeConfigTemporary(customFeConfig5) {
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1,
"Emily", 25),(2, "Benjamin", 35);"""
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ }
+ }
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load.csv"
+
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+
+ //9. when load hold delete bitmap lock, compaction and schema change
will fail and retry
+ setFeConfigTemporary(customFeConfig5) {
+
GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.handle.inject_sleep",
[percent: "1.0", sleep: "10"])
+ Thread.startDaemon {
+ do_insert_into()
+ }
+ def tablets = sql_return_maparray """ show tablets from
${tableName}; """
+ logger.info("tablets: " + tablets)
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def tablet_info = sql_return_maparray """ show tablet
${tablet_id}; """
+ logger.info("tablet: " + tablet_info)
+ String trigger_backend_id = tablet.BackendId
+ def now = System.currentTimeMillis()
+
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id],
+ "cumulative", tablet_id).contains("Success"));
+ waitForCompaction(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+ getTabletStatus(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id);
+ def time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost > 10000, "wait time should bigger than
10s")
+ }
+ Thread.startDaemon {
+ do_insert_into()
+ }
+ def now = System.currentTimeMillis()
+ sql """ alter table ${tableName} order by (id,score,name); """
+ assertTrue(getAlterTableState(tableName), "schema change should
success")
+ def time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost > 10000, "wait time should bigger than 10s")
+
GetDebugPoint().disableDebugPointForAllFEs("CloudEngineCalcDeleteBitmapTask.handle.inject_sleep")
+ }
+ //10.test stream load will fail when not found delete bitmap cache
+ setFeConfigTemporary(customFeConfig5) {
+
GetDebugPoint().enableDebugPointForAllBEs("CloudTxnDeleteBitmapCache.get_tablet_txn_info.not_found")
+ try {
+ do_insert_into()
+ } catch (Exception e) {
+ logger.info("failed: " + e.getMessage())
+ assertTrue(e.getMessage().contains("NOT_FOUND"))
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("CloudTxnDeleteBitmapCache.get_tablet_txn_info.not_found")
+ }
+ }
+ //11. test rpc timeout
+ setFeConfigTemporary(customFeConfig5) {
+ get_be_param("txn_commit_rpc_timeout_ms")
+ set_be_param("txn_commit_rpc_timeout_ms", "5000")
+
GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.handle.inject_sleep",
[percent: "1.0", sleep: "15"])
+ def threads = []
+ for (int k = 0; k < 5; k++) {
+ logger.info("start load thread:" + k)
+ threads.add(Thread.startDaemon {
+ do_stream_load()
+ })
+ }
+ for (Thread th in threads) {
+ th.join()
+ }
+
GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.handle.inject_sleep")
+ reset_be_param("txn_commit_rpc_timeout_ms")
+ }
+ //12. test compaction or schema change fail will release lock
+ setFeConfigTemporary(customFeConfig5) {
+ get_be_param("delete_bitmap_lock_expiration_seconds")
+ set_be_param("delete_bitmap_lock_expiration_seconds", "60")
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1,
"AAA", 15);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (2,
"BBB", 25);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (3,
"CCC", 35);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (4,
"DDD", 45);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (5,
"EEE", 55);"""
+ def tablets = sql_return_maparray """ show tablets from
${tableName}; """
+ logger.info("tablets: " + tablets)
+
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed")
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def tablet_info = sql_return_maparray """ show tablet
${tablet_id}; """
+ logger.info("tablet: " + tablet_info)
+ String trigger_backend_id = tablet.BackendId
+
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id],
+ "cumulative", tablet_id).contains("Success"));
+ waitForCompaction(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+ getTabletStatus(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id);
+ }
+ def now = System.currentTimeMillis()
+ do_insert_into()
+ def time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost < 10000, "wait time should less than 10s")
+
GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed")
+
+//
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail")
+// sql "alter table ${tableName} modify column score varchar(100);"
+// waitForSC()
+// def res = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE
TableName='${tableName}' ORDER BY createtime DESC LIMIT 1"
+// assert res[0].State == "CANCELLED"
+// assert res[0].Msg.contains("[DELETE_BITMAP_LOCK_ERROR]test
update delete bitmap failed")
+// now = System.currentTimeMillis()
+// do_insert_into()
+// time_cost = System.currentTimeMillis() - now
+// log.info("time_cost(ms): ${time_cost}")
+// assertTrue(time_cost < 10000, "wait time should less than 10s")
+ reset_be_param("delete_bitmap_lock_expiration_seconds")
+ }
+ //13. when get delete bitmap lock failed, compaction and sc retry
times will not exceed max retry times
+ setFeConfigTemporary(customFeConfig3) {
+ get_be_param("get_delete_bitmap_lock_max_retry_times")
+ set_be_param("get_delete_bitmap_lock_max_retry_times", "2")
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1,
"A1", 15);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (2,
"B2", 25);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (3,
"C3", 35);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (4,
"D4", 45);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (5,
"E5", 55);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (6,
"E6", 66);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (7,
"E7", 77);"""
+ def tablets = sql_return_maparray """ show tablets from
${tableName}; """
+ logger.info("tablets: " + tablets)
+
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::test_get_delete_bitmap_update_lock_conflict")
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def tablet_info = sql_return_maparray """ show tablet
${tablet_id}; """
+ logger.info("tablet: " + tablet_info)
+ String trigger_backend_id = tablet.BackendId
+ getTabletStatus(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+ int index = 0;
+ while
(!triggerCompaction(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id],
+ "cumulative", tablet_id).contains("Success")) {
+ if (index > 60) {
+ break;
+ }
+ Thread.sleep(2000)
+ logger.info("index: " + index)
+ index++;
+ }
+ assertTrue(index <= 60, "index should less than 60")
+ def now = System.currentTimeMillis()
+ waitForCompaction(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+ getTabletStatus(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+ def time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost > 3 * 500, "wait time should bigger than
1.5s")
+ assertTrue(time_cost < 10 * 2000, "wait time should less than
20s")
+
+ now = System.currentTimeMillis()
+ sql "alter table ${tableName} modify column score
varchar(200);"
+ waitForSC()
+ def res = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE
TableName='${tableName}' ORDER BY createtime DESC LIMIT 1"
+ assert res[0].State == "FINISHED"
+ time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost > 3 * 500, "wait time should bigger than
1.5s")
+ assertTrue(time_cost < 10 * 2000, "wait time should less than
20s")
+ }
+ }
+ } finally {
+ reset_be_param("mow_stream_load_commit_retry_times")
+ reset_be_param("txn_commit_rpc_timeout_ms")
+ reset_be_param("delete_bitmap_lock_expiration_seconds")
+ reset_be_param("get_delete_bitmap_lock_max_retry_times")
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+
+}
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
deleted file mode 100644
index c5810bec88a..00000000000
---
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
+++ /dev/null
@@ -1,429 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
- if (!isCloudMode()) {
- return
- }
- GetDebugPoint().clearDebugPointsForAllFEs()
-
- def backendId_to_backendIP = [:]
- def backendId_to_backendHttpPort = [:]
- def backendId_to_params = [string: [:]]
- getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
-
- def set_be_param = { paramName, paramValue ->
- // for eache be node, set paramName=paramValue
- for (String id in backendId_to_backendIP.keySet()) {
- def beIp = backendId_to_backendIP.get(id)
- def bePort = backendId_to_backendHttpPort.get(id)
- def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
- assertTrue(out.contains("OK"))
- }
- }
-
- def reset_be_param = { paramName ->
- // for eache be node, reset paramName to default
- for (String id in backendId_to_backendIP.keySet()) {
- def beIp = backendId_to_backendIP.get(id)
- def bePort = backendId_to_backendHttpPort.get(id)
- def original_value = backendId_to_params.get(id).get(paramName)
- def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
original_value))
- assertTrue(out.contains("OK"))
- }
- }
-
- def get_be_param = { paramName ->
- // for eache be node, get param value by default
- def paramValue = ""
- for (String id in backendId_to_backendIP.keySet()) {
- def beIp = backendId_to_backendIP.get(id)
- def bePort = backendId_to_backendHttpPort.get(id)
- // get the config value from be
- def (code, out, err) = curl("GET",
String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort,
paramName))
- assertTrue(code == 0)
- assertTrue(out.contains(paramName))
- // parsing
- def resultList = parseJson(out)[0]
- assertTrue(resultList.size() == 4)
- // get original value
- paramValue = resultList[2]
- backendId_to_params.get(id, [:]).put(paramName, paramValue)
- }
- }
-
- def customFeConfig1 = [calculate_delete_bitmap_task_timeout_seconds: 2,
meta_service_rpc_retry_times: 5]
- def customFeConfig2 = [delete_bitmap_lock_expiration_seconds: 2,
meta_service_rpc_retry_times: 5]
- String[][] backends = sql """ show backends """
- assertTrue(backends.size() > 0)
- String backendId;
- def backendIdToBackendIP = [:]
- def backendIdToBackendBrpcPort = [:]
- for (String[] backend in backends) {
- if (backend[9].equals("true")) {
- backendIdToBackendIP.put(backend[0], backend[1])
- backendIdToBackendBrpcPort.put(backend[0], backend[5])
- }
- }
-
- backendId = backendIdToBackendIP.keySet()[0]
- def getMetricsMethod = { check_func ->
- httpTest {
- endpoint backendIdToBackendIP.get(backendId) + ":" +
backendIdToBackendBrpcPort.get(backendId)
- uri "/brpc_metrics"
- op "get"
- check check_func
- }
- }
-
- int total_retry = 0;
- int last_total_retry = -1;
-
- def getTotalRetry = {
- getMetricsMethod.call() { respCode, body ->
- logger.info("get total retry resp Code {}",
"${respCode}".toString())
- assertEquals("${respCode}".toString(), "200")
- String out = "${body}".toString()
- def strs = out.split('\n')
- for (String line in strs) {
- if (line.startsWith("stream_load_commit_retry_counter")) {
- logger.info("find: {}", line)
- total_retry =
line.replaceAll("stream_load_commit_retry_counter ", "").toInteger()
- if (last_total_retry < 0) {
- last_total_retry = total_retry
- }
- break
- }
- }
- }
- }
-
- try {
- GetDebugPoint().enableDebugPointForAllFEs('FE.mow.check.lock.release',
null)
- getTotalRetry.call()
- log.info("last_total_retry:" + last_total_retry)
- // store the original value
- get_be_param("mow_stream_load_commit_retry_times")
- set_be_param("mow_stream_load_commit_retry_times", "2")
- def tableName = "tbl_basic"
- // create table
- sql """ drop table if exists ${tableName}; """
-
- sql """
- CREATE TABLE `${tableName}` (
- `id` int(11) NOT NULL,
- `name` varchar(1100) NULL,
- `score` int(11) NULL default "-1"
- ) ENGINE=OLAP
- UNIQUE KEY(`id`)
- DISTRIBUTED BY HASH(`id`) BUCKETS 1
- PROPERTIES (
- "enable_unique_key_merge_on_write" = "true",
- "replication_num" = "1"
- );
- """
- // 1.test normal load, lock is released normally, retry times is 0
- // 1.1 first load success
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load0.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- }
- }
- qt_sql1 """ select * from ${tableName} order by id"""
-
- getTotalRetry.call()
- assertEquals(last_total_retry, total_retry)
- // 1.2 second load success
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load1.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- }
- }
- qt_sql2 """ select * from ${tableName} order by id"""
-
- getTotalRetry.call()
- assertEquals(last_total_retry, total_retry)
-
-
- //2. test commit fail, lock is released normally, will not retry
- // 2.1 first load will fail on fe commit phase
- GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception',
null)
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load2.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("fail", json.Status.toLowerCase())
- assertTrue(json.Message.contains("FE.mow.commit.exception"))
- }
- }
- qt_sql3 """ select * from ${tableName} order by id"""
-
- // commit fail is not DELETE_BITMAP_LOCK_ERR will not retry
- getTotalRetry.call()
- assertEquals(last_total_retry, total_retry)
-
- // 2.2 second load will success because of removing exception injection
- GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load2.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- }
- }
- qt_sql4 """ select * from ${tableName} order by id"""
- getTotalRetry.call()
- assertEquals(last_total_retry, total_retry)
-
- // 3. test update delete bitmap fail, lock is released normally, will
retry
- setFeConfigTemporary(customFeConfig2) {
- // 3.1 first load will fail on calculate delete bitmap timeout
-
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail")
-
- def now = System.currentTimeMillis()
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load3.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("fail", json.Status.toLowerCase())
- assertTrue(json.Message.contains("update delete bitmap
failed"))
- }
- }
- def time_cost = System.currentTimeMillis() - now
- getTotalRetry.call()
- assertEquals(last_total_retry + 2, total_retry)
- qt_sql5 """ select * from ${tableName} order by id"""
-
- // 3.2 second load will success because of removing timeout
simulation
-
GetDebugPoint().disableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail")
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load3.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- }
- }
- getTotalRetry.call()
- assertEquals(last_total_retry + 2, total_retry)
- qt_sql6 """ select * from ${tableName} order by id"""
- }
-
- //4. test wait fe lock timeout, will retry
- setFeConfigTemporary(customFeConfig1) {
- get_be_param("txn_commit_rpc_timeout_ms")
- set_be_param("txn_commit_rpc_timeout_ms", "10000")
-
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.tryCommitLock.timeout",
[sleep_time: 5])
- // 4.1 first load will fail, because of waiting for fe lock timeout
- def now = System.currentTimeMillis()
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load4.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("fail", json.Status.toLowerCase())
- assertTrue(json.Message.contains("get table cloud commit
lock timeout"))
- }
- }
- def time_cost = System.currentTimeMillis() - now
- getTotalRetry.call()
- assertEquals(last_total_retry + 4, total_retry)
- assertTrue(time_cost > 10000, "wait time should bigger than total
retry interval")
- qt_sql7 """ select * from ${tableName} order by id"""
-
- // 4.2 second load will success because of removing timeout
simulation
-
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.tryCommitLock.timeout")
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load4.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- }
- }
- getTotalRetry.call()
- assertEquals(last_total_retry + 4, total_retry)
- qt_sql8 """ select * from ${tableName} order by id"""
- }
- //5. test wait delete bitmap lock timeout, lock is released normally,
will retry
-
GetDebugPoint().enableDebugPointForAllFEs("FE.mow.get_delete_bitmap_lock.fail")
- // 5.1 first load will fail, because of waiting for delete bitmap lock
timeout
- setFeConfigTemporary(customFeConfig1) {
- def now = System.currentTimeMillis()
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load5.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("fail", json.Status.toLowerCase())
- assertTrue(json.Message.contains("test
get_delete_bitmap_lock fail"))
- }
- }
- def time_cost = System.currentTimeMillis() - now
- getTotalRetry.call()
- assertEquals(last_total_retry + 6, total_retry)
- qt_sql9 """ select * from ${tableName} order by id"""
-
- // 5.2 second load will success because of removing timeout
simulation
-
GetDebugPoint().disableDebugPointForAllFEs("FE.mow.get_delete_bitmap_lock.fail")
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load5.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- }
- }
- getTotalRetry.call()
- assertEquals(last_total_retry + 6, total_retry)
- qt_sql10 """ select * from ${tableName} order by id"""
- }
-
- //6.test calculate delete bitmap task timeout, after retry, will
succeed
- setFeConfigTemporary(customFeConfig1) {
- // 6.1 first load will retry because of calculating delete bitmap
timeout, and finally succeed
-
GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
-
- def now = System.currentTimeMillis()
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load6.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- }
- }
- def time_cost = System.currentTimeMillis() - now
- getTotalRetry.call()
- assertEquals(last_total_retry + 7, total_retry)
- assertTrue(time_cost > 2000, "wait time should bigger than total
retry interval")
- qt_sql11 """ select * from ${tableName} order by id"""
-
- // 6.2 second load will success and no need retry because of
removing timeout simulation
-
GetDebugPoint().disableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
- streamLoad {
- table "${tableName}"
-
- set 'column_separator', ','
- set 'columns', 'id, name, score'
- file "test_stream_load.csv"
-
- time 10000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- log.info("Stream load result: ${result}")
- def json = parseJson(result)
- assertEquals("success", json.Status.toLowerCase())
- }
- }
- getTotalRetry.call()
- assertEquals(last_total_retry + 7, total_retry)
- qt_sql12 """ select * from ${tableName} order by id"""
- }
- } finally {
- reset_be_param("mow_stream_load_commit_retry_times")
- GetDebugPoint().clearDebugPointsForAllBEs()
- GetDebugPoint().clearDebugPointsForAllFEs()
- }
-
-}
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_delete_bitmap_lock_with_restart.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_delete_bitmap_lock_with_restart.groovy
new file mode 100644
index 00000000000..cada05ac7fa
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_delete_bitmap_lock_with_restart.groovy
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_delete_bitmap_lock_with_restart", "docker") {
+ if (!isCloudMode()) {
+ return
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.enableDebugPoints()
+ options.cloudMode = true
+
+ def customFeConfig1 = [meta_service_rpc_retry_times: 5]
+ def tableName = "tbl_basic"
+ def do_stream_load = {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'columns', 'id, name, score'
+ file "test_stream_load.csv"
+
+ check { result, exception, startTime, endTime ->
+ log.info("Stream load result: ${result}")
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ }
+ //1. load
+ docker(options) {
+ sql """ drop table if exists ${tableName}; """
+
+ sql """
+ CREATE TABLE `${tableName}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(10) NULL,
+ `score` int(11) NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_num" = "1"
+ );
+ """
+ do_stream_load()
+
GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.handle.inject_sleep",
[percent: "1.0", sleep: "15"])
+ Thread.startDaemon {
+ do_stream_load()
+ }
+ // 1. load + restart fe
+ cluster.restartFrontends()
+ def now = System.currentTimeMillis()
+ do_stream_load()
+ def time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost > 30000, "wait time should bigger than 30s")
+
+ // 2. load + restart be
+
+ Thread.startDaemon {
+ do_stream_load()
+ }
+ cluster.restartBackends()
+ now = System.currentTimeMillis()
+ do_stream_load()
+ time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost < 10000, "wait time should bigger than 10s")
+ }
+ //2. compaction
+ options.beConfigs += [
+ 'delete_bitmap_lock_expiration_seconds=60',
+ ]
+ docker(options) {
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort)
+
+ def getTabletStatus = { be_host, be_http_port, tablet_id ->
+ boolean running = true
+ Thread.sleep(1000)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/show?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ code = process.waitFor()
+ out = process.getText()
+ logger.info("Get tablet status: =" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def tabletStatus = parseJson(out.trim())
+ return tabletStatus
+ }
+ def triggerCompaction = { be_host, be_http_port, compact_type,
tablet_id ->
+ if (compact_type == "cumulative") {
+ def (code_1, out_1, err_1) =
be_run_cumulative_compaction(be_host, be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_1 + ", out=" +
out_1 + ", err=" + err_1)
+ assertEquals(code_1, 0)
+ return out_1
+ } else if (compact_type == "full") {
+ def (code_2, out_2, err_2) = be_run_full_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_2 + ", out=" +
out_2 + ", err=" + err_2)
+ assertEquals(code_2, 0)
+ return out_2
+ } else {
+ assertFalse(True)
+ }
+ }
+ def waitForCompaction = { be_host, be_http_port, tablet_id ->
+ boolean running = true
+ do {
+ Thread.sleep(100)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/run_status?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ code = process.waitFor()
+ out = process.getText()
+ logger.info("Get compaction status: code=" + code + ", out=" +
out)
+ if (code == 0) {
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success",
compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } else {
+ break
+ }
+ } while (running)
+ }
+
+ sql """ drop table if exists ${tableName}; """
+
+ sql """
+ CREATE TABLE `${tableName}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(10) NULL,
+ `score` int(11) NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_num" = "1"
+ );
+ """
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "AAA",
15);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (2, "BBB",
25);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (3, "CCC",
35);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (4, "DDD",
45);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (5, "EEE",
55);"""
+
+
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr.get_delete_bitmap_update_lock.inject_sleep",
[percent: "1.0", sleep: "10"])
+ def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}"
+ logger.info("tablets: " + tablets)
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def tablet_info = sql_return_maparray """ show tablet
${tablet_id}; """
+ logger.info("tablet: " + tablet_info)
+ String trigger_backend_id = tablet.BackendId
+ getTabletStatus(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id],
+ "cumulative", tablet_id).contains("Success"))
+
+ }
+ // 1. compaction + restart fe
+ cluster.restartFrontends()
+ def now = System.currentTimeMillis()
+ do_stream_load()
+ def time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost < 10000, "wait time should less than 10s")
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ waitForCompaction(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+ getTabletStatus(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+ }
+ sleep(30000)
+ context.reconnectFe()
+ // 2. compaction + restart be
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "AAA",
15);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (2, "BBB",
25);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (3, "CCC",
35);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (4, "DDD",
45);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (5, "EEE",
55);"""
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def tablet_info = sql_return_maparray """ show tablet
${tablet_id}; """
+ logger.info("tablet: " + tablet_info)
+ String trigger_backend_id = tablet.BackendId
+ getTabletStatus(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id],
+ "cumulative", tablet_id).contains("Success"))
+
+ }
+ cluster.restartBackends()
+ now = System.currentTimeMillis()
+ do_stream_load()
+ time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost > 10000, "wait time should less than 10s")
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ waitForCompaction(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+ getTabletStatus(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
+ }
+ }
+ //3. sc
+ docker(options) {
+ def getJobState = {
+ def res = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE
TableName='${tableName}' ORDER BY createtime DESC LIMIT 1"
+ assert res.size() == 1
+ log.info("res:" + res[0].State)
+ return res[0].State
+ }
+ sql """ drop table if exists ${tableName}; """
+
+ sql """
+ CREATE TABLE `${tableName}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(10) NULL,
+ `score` int(11) NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_num" = "1"
+ );
+ """
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "AAA",
15);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (2, "BBB",
25);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (3, "CCC",
35);"""
+
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr.get_delete_bitmap_update_lock.inject_sleep",
[percent: "1.0", sleep: "10"])
+ sql "alter table ${tableName} modify column score varchar(100);"
+ // 1. sc + restart fe
+ cluster.restartFrontends()
+ context.reconnectFe()
+ for (int i = 0; i < 30; i++) {
+ log.info("i: ${i}")
+ try {
+ def now = System.currentTimeMillis()
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1,
"AAA", 15);"""
+ def time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost < 10000, "wait time should less than 10s")
+ break
+ } catch (Exception e) {
+ log.info("Exception:" + e)
+ Thread.sleep(2000)
+ }
+ }
+ int max_try_time = 30
+ while (max_try_time--) {
+ def result = getJobState(tableName)
+ if (result == "FINISHED" || result == "CANCELLED") {
+ break
+ } else {
+ Thread.sleep(1000)
+ }
+ }
+ // 2. sc + restart be
+ sql "alter table ${tableName} modify column score varchar(200);"
+ cluster.restartBackends()
+ def now = System.currentTimeMillis()
+ do_stream_load()
+ def time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost > 10000, "wait time should less than 10s")
+ max_try_time = 30
+ while (max_try_time--) {
+ def result = getJobState(tableName)
+ if (result == "FINISHED" || result == "CANCELLED") {
+ break
+ } else {
+ Thread.sleep(1000)
+ }
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]