This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new ab11c54fbe5 [Improve](txn) Add some fuzzy test stub in txn (#26712)
(#27144)
ab11c54fbe5 is described below
commit ab11c54fbe5cdd4b3adbed1a2ec02f911e50a627
Author: deardeng <[email protected]>
AuthorDate: Fri Nov 17 22:17:13 2023 +0800
[Improve](txn) Add some fuzzy test stub in txn (#26712) (#27144)
---
be/src/olap/task/engine_publish_version_task.cpp | 13 +++++
be/src/olap/txn_manager.cpp | 60 +++++++++++++++++++++-
.../java/org/apache/doris/alter/AlterJobV2.java | 23 +++++++++
3 files changed, 95 insertions(+), 1 deletion(-)
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 80dbb2e3d2e..eed0deb03c4 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -39,6 +39,7 @@
#include "olap/txn_manager.h"
#include "olap/utils.h"
#include "util/bvar_helper.h"
+#include "util/debug_points.h"
#include "util/threadpool.h"
namespace doris {
@@ -91,6 +92,18 @@ Status EnginePublishVersionTask::finish() {
int64_t transaction_id = _publish_version_req.transaction_id;
OlapStopWatch watch;
VLOG_NOTICE << "begin to process publish version. transaction_id=" <<
transaction_id;
+ DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.random", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+ LOG_WARNING("EnginePublishVersionTask.finish.random random
failed");
+ return Status::InternalError("debug engine publish version task
random failed");
+ }
+ });
+ DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.wait", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+ LOG_WARNING("EnginePublishVersionTask.finish.wait wait").tag("wait
ms", wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
std::unique_ptr<ThreadPoolToken> token =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index a57fba4f7b2..4dbe317ad1f 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -44,6 +44,7 @@
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
#include "olap/task/engine_publish_version_task.h"
+#include "util/debug_points.h"
#include "util/time.h"
namespace doris {
@@ -100,6 +101,19 @@ Status TxnManager::prepare_txn(TPartitionId partition_id,
TTransactionId transac
std::lock_guard<std::shared_mutex>
txn_wrlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+ LOG_WARNING("TxnManager.prepare_txn.random_failed random failed");
+ return Status::InternalError("debug prepare txn random failed");
+ }
+ });
+ DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+ LOG_WARNING("TxnManager.prepare_txn.wait").tag("wait ms", wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
+
/// Step 1: check if the transaction is already exist
do {
auto iter = txn_tablet_map.find(key);
@@ -257,6 +271,19 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId
partition_id,
key.first, key.second, tablet_info.to_string());
}
+ DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+ LOG_WARNING("TxnManager.commit_txn.random_failed");
+ return Status::InternalError("debug commit txn random failed");
+ }
+ });
+ DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+ LOG_WARNING("TxnManager.commit_txn.wait").tag("wait ms", wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
+
std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
// this while loop just run only once, just for if break
do {
@@ -309,7 +336,13 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId
partition_id,
if (!is_recovery) {
Status save_status = RowsetMetaManager::save(meta, tablet_uid,
rowset_ptr->rowset_id(),
rowset_ptr->rowset_meta()->get_rowset_pb());
- if (save_status != Status::OK()) {
+ DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+
LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
+ if (!save_status.ok()) {
return Status::Error<ROWSET_SAVE_FAILED>(
"save committed rowset failed. when commit txn rowset_id:
{}, tablet id: {}, "
"txn id: {}",
@@ -382,6 +415,18 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
"tablet={}",
partition_id, transaction_id, tablet_info.to_string());
}
+
DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta");
+ return Status::InternalError("debug publish txn before save rs
meta random failed");
+ }
+ });
+ DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+
LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta").tag("wait ms",
wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
/// Step 2: make rowset visible
// save meta need access disk, it maybe very slow, so that it is not in
global txn lock
@@ -389,6 +434,19 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
// TODO(ygl): rowset is already set version here, memory is changed, if
save failed
// it maybe a fatal error
rowset->make_visible(version);
+
+ DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta",
{
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta");
+ return Status::InternalError("debug publish txn after save rs meta
random failed");
+ }
+ });
+ DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
+ if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+
LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta").tag("wait ms",
wait);
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+ }
+ });
// update delete_bitmap
if (tablet_txn_info.unique_key_merge_on_write) {
std::unique_ptr<RowsetWriter> rowset_writer;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
index fb616fe429e..844865f139a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
@@ -155,6 +156,19 @@ public abstract class AlterJobV2 implements Writable {
this.finishedTimeMs = finishedTimeMs;
}
+ // /api/debug_point/add/{name}?value=100
+ private void stateWait(final String name) {
+ long waitTimeMs = DebugPointUtil.getDebugParamOrDefault(name, 0);
+ if (waitTimeMs > 0) {
+ try {
+ LOG.info("debug point {} wait {} ms", name, waitTimeMs);
+ Thread.sleep(waitTimeMs);
+ } catch (InterruptedException e) {
+ LOG.warn(name, e);
+ }
+ }
+ }
+
/**
* The keyword 'synchronized' only protects 2 methods:
* run() and cancel()
@@ -171,15 +185,24 @@ public abstract class AlterJobV2 implements Writable {
return;
}
+ // /api/debug_point/add/FE.STOP_ALTER_JOB_RUN
+ if (DebugPointUtil.isEnable("FE.STOP_ALTER_JOB_RUN")) {
+ LOG.info("debug point FE.STOP_ALTER_JOB_RUN, schema change
schedule stopped");
+ return;
+ }
+
try {
switch (jobState) {
case PENDING:
+ stateWait("FE.ALTER_JOB_V2_PENDING");
runPendingJob();
break;
case WAITING_TXN:
+ stateWait("FE.ALTER_JOB_V2_WAITING_TXN");
runWaitingTxnJob();
break;
case RUNNING:
+ stateWait("FE.ALTER_JOB_V2_RUNNING");
runRunningJob();
break;
default:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]