This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 7b43c82f8d3 branch-4.1: [fix](cloud) Fix txn kv destruction order
#61630 (#61640)
7b43c82f8d3 is described below
commit 7b43c82f8d3d6fa0e456552b21a70559cc6766d3
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 24 15:35:33 2026 +0800
branch-4.1: [fix](cloud) Fix txn kv destruction order #61630 (#61640)
Cherry-picked from #61630
Co-authored-by: walter <[email protected]>
---
cloud/test/mem_txn_kv_test.cpp | 4 +
cloud/test/txn_kv_test.cpp | 3 +
cloud/test/txn_lazy_commit_test.cpp | 162 ++++++++++++++++++++++--------------
3 files changed, 105 insertions(+), 64 deletions(-)
diff --git a/cloud/test/mem_txn_kv_test.cpp b/cloud/test/mem_txn_kv_test.cpp
index f9012fb144d..ac00b075b85 100644
--- a/cloud/test/mem_txn_kv_test.cpp
+++ b/cloud/test/mem_txn_kv_test.cpp
@@ -25,6 +25,7 @@
#include <thread>
#include "common/config.h"
+#include "common/defer.h"
#include "common/util.h"
#include "meta-service/doris_txn.h"
#include "meta-store/codec.h"
@@ -47,6 +48,9 @@ int main(int argc, char** argv) {
std::cout << "exit inti FdbTxnKv error" << std::endl;
return -1;
}
+ DORIS_CLOUD_DEFER {
+ fdb_txn_kv.reset();
+ };
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
diff --git a/cloud/test/txn_kv_test.cpp b/cloud/test/txn_kv_test.cpp
index d82fe3806fa..cac05a9e2c5 100644
--- a/cloud/test/txn_kv_test.cpp
+++ b/cloud/test/txn_kv_test.cpp
@@ -59,6 +59,9 @@ int main(int argc, char** argv) {
config::init(nullptr, true);
::testing::InitGoogleTest(&argc, argv);
init_txn_kv();
+ DORIS_CLOUD_DEFER {
+ txn_kv.reset();
+ };
return RUN_ALL_TESTS();
}
diff --git a/cloud/test/txn_lazy_commit_test.cpp
b/cloud/test/txn_lazy_commit_test.cpp
index a7506f52289..6ff0742505f 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -91,6 +91,7 @@ int main(int argc, char** argv) {
config::label_keep_max_second = 0;
config::force_immediate_recycle = true;
+ config::log_immediate_flush = true;
if (!doris::cloud::init_glog("txn_lazy_commit_test")) {
std::cerr << "failed to init glog" << std::endl;
@@ -100,6 +101,9 @@ int main(int argc, char** argv) {
// Initialize FDB
get_fdb_txn_kv();
+ DORIS_CLOUD_DEFER {
+ txn_kv.reset();
+ };
auto s3_producer_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
s3_producer_pool->start();
@@ -582,6 +586,11 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithoutDbIdTest) {
commit_txn_eventually_finish_hit = true;
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
brpc::Controller cntl;
@@ -659,10 +668,6 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithoutDbIdTest) {
check_rowset_meta_exist(txn, tablet_id, 2);
}
}
-
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortedTest) {
@@ -782,6 +787,11 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) {
commit_txn_eventually_finish_hit = true;
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
brpc::Controller cntl;
@@ -840,10 +850,6 @@ TEST(TxnLazyCommitTest, CommitTxnEventuallyWithDbIdTest) {
check_rowset_meta_exist(txn, tablet_id, 2);
}
}
-
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually) {
@@ -875,6 +881,11 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually) {
commit_txn_eventually_finish_hit = true;
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, false);
std::string instance_id = "test_instance";
@@ -979,10 +990,6 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually) {
<< " status is " << resp.status().DebugString();
ASSERT_GT(resp.version(), 1);
}
-
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitVersionedReadTest,
DISABLED_CommitTxnEventuallyWithoutDbIdTest) {
@@ -1019,6 +1026,11 @@ TEST(TxnLazyCommitVersionedReadTest,
DISABLED_CommitTxnEventuallyWithoutDbIdTest
commit_txn_eventually_finish_hit = true;
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, false);
std::string instance_id = "test_instance";
@@ -1114,10 +1126,6 @@ TEST(TxnLazyCommitVersionedReadTest,
DISABLED_CommitTxnEventuallyWithoutDbIdTest
}
}
}
-
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) {
@@ -1136,6 +1144,11 @@ TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) {
commit_txn_immediatelly_hit = true;
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
brpc::Controller cntl;
@@ -1192,10 +1205,6 @@ TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) {
check_rowset_meta_exist(txn, tablet_id, 2);
}
}
-
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitTest, NotFallThroughCommitTxnEventuallyTest) {
@@ -1226,6 +1235,11 @@ TEST(TxnLazyCommitTest,
NotFallThroughCommitTxnEventuallyTest) {
commit_txn_eventually_finish_hit = true;
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
brpc::Controller cntl;
@@ -1281,10 +1295,6 @@ TEST(TxnLazyCommitTest,
NotFallThroughCommitTxnEventuallyTest) {
check_rowset_meta_not_exist(txn, tablet_id, 2);
}
}
-
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitTest, FallThroughCommitTxnEventuallyTest) {
@@ -1316,6 +1326,11 @@ TEST(TxnLazyCommitTest,
FallThroughCommitTxnEventuallyTest) {
commit_txn_eventually_finish_hit = true;
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
brpc::Controller cntl;
@@ -1374,10 +1389,6 @@ TEST(TxnLazyCommitTest,
FallThroughCommitTxnEventuallyTest) {
check_rowset_meta_exist(txn, tablet_id, 2);
}
}
-
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitTest, ConcurrentCommitTxnEventuallyCase1Test) {
@@ -1463,6 +1474,11 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
@@ -1593,9 +1609,6 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase1Test) {
<< ", txn1_table_version=" << txn1_table_version
<< ", txn2_table_version=" << txn2_table_version;
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
ASSERT_EQ(commit_txn_eventually_begin_count, 3);
ASSERT_EQ(last_pending_txn_id_count, 1);
ASSERT_EQ(finish_count, 2);
@@ -1720,6 +1733,11 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase2Test) {
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
@@ -1837,9 +1855,6 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase2Test) {
thread1.join();
thread2.join();
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
ASSERT_EQ(commit_txn_immediately_begin_count, 2);
ASSERT_EQ(last_pending_txn_id_count, 1);
ASSERT_EQ(immediately_finish_count, 1);
@@ -2001,6 +2016,11 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase3Test) {
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
int64_t txn_id1 = 0;
std::thread thread1([&] {
@@ -2088,10 +2108,6 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase3Test) {
thread1.join();
thread2.join();
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
-
ASSERT_EQ(get_rowset_begin_count, 2);
ASSERT_EQ(last_pending_txn_id_count, 1);
ASSERT_EQ(txn_lazy_committer_submit_count, 1);
@@ -2152,6 +2168,11 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase4Test) {
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
@@ -2226,9 +2247,6 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase4Test) {
check_txn_not_exist(txn, db_id, txn_id, label);
}
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
ASSERT_TRUE(commit_txn_eventullay_hit);
ASSERT_TRUE(abort_timeout_txn_hit);
ASSERT_EQ(txn_id, txn_info_pb.txn_id());
@@ -2327,6 +2345,11 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase5Test) {
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
@@ -2495,9 +2518,6 @@ TEST(TxnLazyCommitTest,
ConcurrentCommitTxnEventuallyCase5Test) {
thread1.join();
thread2.join();
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
ASSERT_EQ(commit_txn_immediately_begin_count, 2);
ASSERT_EQ(last_pending_txn_id_count, 1);
ASSERT_EQ(immediately_finish_count, 1);
@@ -2584,6 +2604,11 @@ TEST(TxnLazyCommitTest, RowsetMetaSizeExceedTest) {
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
{
brpc::Controller cntl;
GetRowsetRequest req;
@@ -2683,6 +2708,11 @@ TEST(TxnLazyCommitTest, RecyclePartitions) {
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
@@ -2772,10 +2802,6 @@ TEST(TxnLazyCommitTest, RecyclePartitions) {
}
ASSERT_EQ(recycler.recycle_partitions(), 0);
-
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitTest, RecycleIndexes) {
@@ -2831,6 +2857,11 @@ TEST(TxnLazyCommitTest, RecycleIndexes) {
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
// mock rowset and tablet
@@ -2918,10 +2949,6 @@ TEST(TxnLazyCommitTest, RecycleIndexes) {
}
ASSERT_EQ(recycler.recycle_indexes(), 0);
-
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitTest, CommitTxnEventuallyWithMultiTableTest) {
@@ -2943,6 +2970,11 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithMultiTableTest) {
commit_txn_eventually_finish_hit = true;
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
brpc::Controller cntl;
@@ -3011,10 +3043,6 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithMultiTableTest) {
check_rowset_meta_exist(txn, tablet_id, 2);
}
}
-
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) {
@@ -3045,6 +3073,11 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithHugeRowsetMetaTest) {
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
brpc::Controller cntl;
@@ -3113,10 +3146,6 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithHugeRowsetMetaTest) {
check_rowset_meta_exist(txn, tablet_id, 2);
}
}
-
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitTest, CommitTxnEventuallyWithSchemaChangeTest) {
@@ -3187,6 +3216,11 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithSchemaChangeTest) {
});
sp->enable_processing();
+ DORIS_CLOUD_DEFER {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ };
auto meta_service = get_meta_service(txn_kv, true);
brpc::Controller cntl;
@@ -3291,9 +3325,6 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithSchemaChangeTest) {
check_rowset_meta_exist(txn, tablet_id, 2);
}
}
- sp->clear_all_call_backs();
- sp->clear_trace();
- sp->disable_processing();
}
TEST(TxnLazyCommitTest, CommitTxnEventuallyWithAbortAfterCommitTest) {
@@ -3320,7 +3351,9 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithAbortAfterCommitTest) {
int64_t txn_id = res.txn_id();
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
- SyncPoint::get_instance()->clear_all_call_backs();
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
};
sp->enable_processing();
sp->set_call_back("commit_txn_eventually::abort_txn_after_mark_txn_commited",
[&](auto&&) {
@@ -3395,6 +3428,7 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithManyPartitions) {
auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
sp->clear_all_call_backs();
+ sp->clear_trace();
sp->disable_processing();
};
sp->set_call_back("commit_txn_eventually::task->wait", [&](auto&& args) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]