This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7e82e7651a3 [Improve](txn) Add some fuzzy test stub in txn (#26712)
7e82e7651a3 is described below
commit 7e82e7651a368c47fafa6607b585d77e35c5df19
Author: deardeng <[email protected]>
AuthorDate: Thu Nov 16 11:50:06 2023 +0800
[Improve](txn) Add some fuzzy test stub in txn (#26712)
---
be/src/olap/task/engine_publish_version_task.cpp | 13 +++++
be/src/olap/txn_manager.cpp | 61 ++++++++++++++++++++--
.../java/org/apache/doris/alter/AlterJobV2.java | 23 ++++++++
3 files changed, 92 insertions(+), 5 deletions(-)
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 1d59efcfee5..24096d1b693 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -39,6 +39,7 @@
#include "olap/txn_manager.h"
#include "olap/utils.h"
#include "util/bvar_helper.h"
+#include "util/debug_points.h"
#include "util/threadpool.h"
namespace doris {
@@ -91,6 +92,18 @@ Status EnginePublishVersionTask::finish() {
int64_t transaction_id = _publish_version_req.transaction_id;
OlapStopWatch watch;
VLOG_NOTICE << "begin to process publish version. transaction_id=" <<
transaction_id;
+ DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.random", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+ LOG_WARNING("EnginePublishVersionTask.finish.random random
failed");
+ return Status::InternalError("debug engine publish version task
random failed");
+ }
+ });
+ DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.wait", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+ LOG_WARNING("EnginePublishVersionTask.finish.wait wait").tag("wait
ms", wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
std::unique_ptr<ThreadPoolToken> token =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 2d5ce73b867..1ed6f74eb88 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -139,6 +139,19 @@ Status TxnManager::prepare_txn(TPartitionId partition_id,
TTransactionId transac
std::lock_guard<std::shared_mutex>
txn_wrlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+ LOG_WARNING("TxnManager.prepare_txn.random_failed random failed");
+ return Status::InternalError("debug prepare txn random failed");
+ }
+ });
+ DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+ LOG_WARNING("TxnManager.prepare_txn.wait").tag("wait ms", wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
+
/// Step 1: check if the transaction is already exist
do {
auto iter = txn_tablet_map.find(key);
@@ -296,11 +309,18 @@ Status TxnManager::commit_txn(OlapMeta* meta,
TPartitionId partition_id,
key.first, key.second, tablet_info.to_string());
}
- DBUG_EXECUTE_IF(
- "TxnManager.commit_txn_random_failed",
- if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
- return Status::InternalError("debug commit txn random failed");
- });
+ DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+ LOG_WARNING("TxnManager.commit_txn.random_failed");
+ return Status::InternalError("debug commit txn random failed");
+ }
+ });
+ DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+ LOG_WARNING("TxnManager.commit_txn.wait").tag("wait ms", wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
// this while loop just run only once, just for if break
@@ -356,6 +376,12 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId
partition_id,
if (!is_recovery) {
Status save_status = RowsetMetaManager::save(meta, tablet_uid,
rowset_ptr->rowset_id(),
rowset_ptr->rowset_meta()->get_rowset_pb());
+ DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+
LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
if (!save_status.ok()) {
return Status::Error<ROWSET_SAVE_FAILED>(
"save committed rowset failed. when commit txn rowset_id:
{}, tablet id: {}, "
@@ -430,6 +456,18 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
"tablet={}",
partition_id, transaction_id, tablet_info.to_string());
}
+
DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta");
+ return Status::InternalError("debug publish txn before save rs
meta random failed");
+ }
+ });
+ DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+
LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta").tag("wait ms",
wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
/// Step 2: make rowset visible
// save meta need access disk, it maybe very slow, so that it is not in
global txn lock
@@ -437,6 +475,19 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
// TODO(ygl): rowset is already set version here, memory is changed, if
save failed
// it maybe a fatal error
rowset->make_visible(version);
+
+ DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta",
{
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta");
+ return Status::InternalError("debug publish txn after save rs meta
random failed");
+ }
+ });
+ DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+
LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta").tag("wait ms",
wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
// update delete_bitmap
if (tablet_txn_info->unique_key_merge_on_write) {
std::unique_ptr<RowsetWriter> rowset_writer;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
index c1984d31d47..a5b3c867eeb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
@@ -164,6 +165,19 @@ public abstract class AlterJobV2 implements Writable {
this.finishedTimeMs = finishedTimeMs;
}
+ // /api/debug_point/add/{name}?value=100
+ private void stateWait(final String name) {
+ long waitTimeMs = DebugPointUtil.getDebugParamOrDefault(name, 0);
+ if (waitTimeMs > 0) {
+ try {
+ LOG.info("debug point {} wait {} ms", name, waitTimeMs);
+ Thread.sleep(waitTimeMs);
+ } catch (InterruptedException e) {
+ LOG.warn(name, e);
+ }
+ }
+ }
+
/**
* The keyword 'synchronized' only protects 2 methods:
* run() and cancel()
@@ -180,15 +194,24 @@ public abstract class AlterJobV2 implements Writable {
return;
}
+ // /api/debug_point/add/FE.STOP_ALTER_JOB_RUN
+ if (DebugPointUtil.isEnable("FE.STOP_ALTER_JOB_RUN")) {
+ LOG.info("debug point FE.STOP_ALTER_JOB_RUN, schema change
schedule stopped");
+ return;
+ }
+
try {
switch (jobState) {
case PENDING:
+ stateWait("FE.ALTER_JOB_V2_PENDING");
runPendingJob();
break;
case WAITING_TXN:
+ stateWait("FE.ALTER_JOB_V2_WAITING_TXN");
runWaitingTxnJob();
break;
case RUNNING:
+ stateWait("FE.ALTER_JOB_V2_RUNNING");
runRunningJob();
break;
default:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]