This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e82e7651a3 [Improve](txn) Add some fuzzy test stub in txn (#26712)
7e82e7651a3 is described below

commit 7e82e7651a368c47fafa6607b585d77e35c5df19
Author: deardeng <[email protected]>
AuthorDate: Thu Nov 16 11:50:06 2023 +0800

    [Improve](txn) Add some fuzzy test stub in txn (#26712)
---
 be/src/olap/task/engine_publish_version_task.cpp   | 13 +++++
 be/src/olap/txn_manager.cpp                        | 61 ++++++++++++++++++++--
 .../java/org/apache/doris/alter/AlterJobV2.java    | 23 ++++++++
 3 files changed, 92 insertions(+), 5 deletions(-)

diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index 1d59efcfee5..24096d1b693 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -39,6 +39,7 @@
 #include "olap/txn_manager.h"
 #include "olap/utils.h"
 #include "util/bvar_helper.h"
+#include "util/debug_points.h"
 #include "util/threadpool.h"
 
 namespace doris {
@@ -91,6 +92,18 @@ Status EnginePublishVersionTask::finish() {
     int64_t transaction_id = _publish_version_req.transaction_id;
     OlapStopWatch watch;
     VLOG_NOTICE << "begin to process publish version. transaction_id=" << 
transaction_id;
+    DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.random", {
+        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+            LOG_WARNING("EnginePublishVersionTask.finish.random random 
failed");
+            return Status::InternalError("debug engine publish version task 
random failed");
+        }
+    });
+    DBUG_EXECUTE_IF("EnginePublishVersionTask.finish.wait", {
+        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+            LOG_WARNING("EnginePublishVersionTask.finish.wait wait").tag("wait 
ms", wait);
+            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+        }
+    });
     std::unique_ptr<ThreadPoolToken> token =
             
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
                     ThreadPool::ExecutionMode::CONCURRENT);
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 2d5ce73b867..1ed6f74eb88 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -139,6 +139,19 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, 
TTransactionId transac
     std::lock_guard<std::shared_mutex> 
txn_wrlock(_get_txn_map_lock(transaction_id));
     txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
 
+    DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
+        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+            LOG_WARNING("TxnManager.prepare_txn.random_failed random failed");
+            return Status::InternalError("debug prepare txn random failed");
+        }
+    });
+    DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
+        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+            LOG_WARNING("TxnManager.prepare_txn.wait").tag("wait ms", wait);
+            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+        }
+    });
+
     /// Step 1: check if the transaction is already exist
     do {
         auto iter = txn_tablet_map.find(key);
@@ -296,11 +309,18 @@ Status TxnManager::commit_txn(OlapMeta* meta, 
TPartitionId partition_id,
                 key.first, key.second, tablet_info.to_string());
     }
 
-    DBUG_EXECUTE_IF(
-            "TxnManager.commit_txn_random_failed",
-            if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
-                return Status::InternalError("debug commit txn random failed");
-            });
+    DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
+        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+            LOG_WARNING("TxnManager.commit_txn.random_failed");
+            return Status::InternalError("debug commit txn random failed");
+        }
+    });
+    DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
+        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+            LOG_WARNING("TxnManager.commit_txn.wait").tag("wait ms", wait);
+            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+        }
+    });
 
     std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
     // this while loop just run only once, just for if break
@@ -356,6 +376,12 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId 
partition_id,
     if (!is_recovery) {
         Status save_status = RowsetMetaManager::save(meta, tablet_uid, 
rowset_ptr->rowset_id(),
                                                      
rowset_ptr->rowset_meta()->get_rowset_pb());
+        DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
+            if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+                
LOG_WARNING("TxnManager.RowsetMetaManager.save_wait").tag("wait ms", wait);
+                std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+            }
+        });
         if (!save_status.ok()) {
             return Status::Error<ROWSET_SAVE_FAILED>(
                     "save committed rowset failed. when commit txn rowset_id: 
{}, tablet id: {}, "
@@ -430,6 +456,18 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
                 "tablet={}",
                 partition_id, transaction_id, tablet_info.to_string());
     }
+    
DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
+        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+            
LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta");
+            return Status::InternalError("debug publish txn before save rs 
meta random failed");
+        }
+    });
+    DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
+        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+            
LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta").tag("wait ms", 
wait);
+            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+        }
+    });
 
     /// Step 2: make rowset visible
     // save meta need access disk, it maybe very slow, so that it is not in 
global txn lock
@@ -437,6 +475,19 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
     // TODO(ygl): rowset is already set version here, memory is changed, if 
save failed
     // it maybe a fatal error
     rowset->make_visible(version);
+
+    DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", 
{
+        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+            
LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta");
+            return Status::InternalError("debug publish txn after save rs meta 
random failed");
+        }
+    });
+    DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
+        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
+            
LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta").tag("wait ms", 
wait);
+            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
+        }
+    });
     // update delete_bitmap
     if (tablet_txn_info->unique_key_merge_on_write) {
         std::unique_ptr<RowsetWriter> rowset_writer;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
index c1984d31d47..a5b3c867eeb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.gson.annotations.SerializedName;
@@ -164,6 +165,19 @@ public abstract class AlterJobV2 implements Writable {
         this.finishedTimeMs = finishedTimeMs;
     }
 
+    // /api/debug_point/add/{name}?value=100
+    private void stateWait(final String name) {
+        long waitTimeMs = DebugPointUtil.getDebugParamOrDefault(name, 0);
+        if (waitTimeMs > 0) {
+            try {
+                LOG.info("debug point {} wait {} ms", name, waitTimeMs);
+                Thread.sleep(waitTimeMs);
+            } catch (InterruptedException e) {
+                LOG.warn(name, e);
+            }
+        }
+    }
+
     /**
      * The keyword 'synchronized' only protects 2 methods:
      * run() and cancel()
@@ -180,15 +194,24 @@ public abstract class AlterJobV2 implements Writable {
             return;
         }
 
+        // /api/debug_point/add/FE.STOP_ALTER_JOB_RUN
+        if (DebugPointUtil.isEnable("FE.STOP_ALTER_JOB_RUN")) {
+            LOG.info("debug point FE.STOP_ALTER_JOB_RUN, schema change 
schedule stopped");
+            return;
+        }
+
         try {
             switch (jobState) {
                 case PENDING:
+                    stateWait("FE.ALTER_JOB_V2_PENDING");
                     runPendingJob();
                     break;
                 case WAITING_TXN:
+                    stateWait("FE.ALTER_JOB_V2_WAITING_TXN");
                     runWaitingTxnJob();
                     break;
                 case RUNNING:
+                    stateWait("FE.ALTER_JOB_V2_RUNNING");
                     runRunningJob();
                     break;
                 default:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to