This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0a5a18ba39b [improve](txn insert) Txn load support cloud mode (#34721)
0a5a18ba39b is described below
commit 0a5a18ba39b89a658257aa94cc8020dce1a0eb01
Author: meiyi <[email protected]>
AuthorDate: Wed Jun 5 15:49:24 2024 +0800
[improve](txn insert) Txn load support cloud mode (#34721)
## Proposed changes
### Purpose
The user doc:
https://doris.apache.org/zh-CN/docs/dev/data-operate/import/transaction-load-manual
We have supported insert into
select(https://github.com/apache/doris/pull/31666),
update(https://github.com/apache/doris/pull/33034) and
delete(https://github.com/apache/doris/pull/33100) in transaction load.
https://github.com/apache/doris/pull/32980 implements one txn write to
one partition more than one rowsets.
This pr implements to cloud mode of
https://github.com/apache/doris/pull/32980
### Implementation
#### sub_txn_id
see https://github.com/apache/doris/pull/32980
#### Meta service supports commit txn
This process is generally the same as commit_txn, the difference is that
he partitions version will plus 1 in multi sub txns.
One example:
Suppose the table, partition, tablet and version info is:
```
--------------------------------------------
| table | partition | tablet | version |
--------------------------------------------
| t1 | t1_p1 | t1_p1.1 | 1 |
| t1 | t1_p1 | t1_p1.2 | 1 |
| t1 | t1_p2 | t1_p2.1 | 2 |
| t2 | t2_p3 | t2_p3.1 | 3 |
| t2 | t2_p4 | t2_p4.1 | 4 |
--------------------------------------------
```
Now we commit a txn with 3 sub txns and the tablets are:
* sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1
* sub_txn2: t2_p3.1
* sub_txn3: t1_p1.1, t1_p1.2
When commit, the partitions version will be:
* sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3)
* sub_txn2: t2_p3(3 -> 4)
* sub_txn3: t1_p1(2 -> 3)
After commit, the partitions version will be:
* t1: t1_p1(3), t1_p2(3)
* t2: t2_p3(4), t2_p4(4)
#### Meta service support generate sub_txn_id by `begin_sub_txn`
---
cloud/src/common/bvars.cpp | 2 +
cloud/src/common/bvars.h | 2 +
cloud/src/meta-service/meta_service.h | 24 +
cloud/src/meta-service/meta_service_txn.cpp | 965 ++++++++++++++++++++-
cloud/src/recycler/recycler.cpp | 9 +
cloud/test/meta_service_test.cpp | 320 +++++++
cloud/test/recycler_test.cpp | 145 ++++
.../apache/doris/cloud/rpc/MetaServiceClient.java | 18 +
.../apache/doris/cloud/rpc/MetaServiceProxy.java | 20 +
.../transaction/CloudGlobalTransactionMgr.java | 144 ++-
.../apache/doris/transaction/TransactionEntry.java | 83 +-
.../apache/doris/transaction/TransactionState.java | 6 +-
gensrc/proto/cloud.proto | 48 +
regression-test/data/insert_p0/txn_insert.out | 131 +--
.../data/insert_p0/txn_insert_inject_case.out | 9 +
regression-test/suites/insert_p0/txn_insert.groovy | 468 +++++-----
.../insert_p0/txn_insert_concurrent_insert.groovy | 26 +-
.../suites/insert_p0/txn_insert_inject_case.groovy | 69 +-
.../insert_p0/txn_insert_with_schema_change.groovy | 88 +-
19 files changed, 2174 insertions(+), 403 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 1aa436bb603..43acb47e365 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -27,6 +27,8 @@ BvarLatencyRecorderWithTag g_bvar_ms_commit_txn("ms",
"commit_txn");
BvarLatencyRecorderWithTag g_bvar_ms_abort_txn("ms", "abort_txn");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn("ms", "get_txn");
BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id("ms",
"get_current_max_txn_id");
+BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms", "begin_sub_txn");
+BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn");
BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms",
"check_txn_conflict");
BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label("ms", "clean_txn_label");
BvarLatencyRecorderWithTag g_bvar_ms_get_version("ms", "get_version");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index b55e1051cd9..e5b50262104 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -126,6 +126,8 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict;
+extern BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn;
+extern BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn;
extern BvarLatencyRecorderWithTag g_bvar_ms_clean_txn_label;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_version;
extern BvarLatencyRecorderWithTag g_bvar_ms_batch_get_version;
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index 4dc4113f341..6ba3d5b45eb 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -61,6 +61,10 @@ public:
void commit_txn(::google::protobuf::RpcController* controller, const
CommitTxnRequest* request,
CommitTxnResponse* response, ::google::protobuf::Closure*
done) override;
+ void commit_txn_with_sub_txn(::google::protobuf::RpcController* controller,
+ const CommitTxnRequest* request,
CommitTxnResponse* response,
+ ::google::protobuf::Closure* done);
+
void abort_txn(::google::protobuf::RpcController* controller, const
AbortTxnRequest* request,
AbortTxnResponse* response, ::google::protobuf::Closure*
done) override;
@@ -76,6 +80,14 @@ public:
GetCurrentMaxTxnResponse* response,
::google::protobuf::Closure* done) override;
+ void begin_sub_txn(::google::protobuf::RpcController* controller,
+ const BeginSubTxnRequest* request, BeginSubTxnResponse*
response,
+ ::google::protobuf::Closure* done) override;
+
+ void abort_sub_txn(::google::protobuf::RpcController* controller,
+ const AbortSubTxnRequest* request, AbortSubTxnResponse*
response,
+ ::google::protobuf::Closure* done) override;
+
void check_txn_conflict(::google::protobuf::RpcController* controller,
const CheckTxnConflictRequest* request,
CheckTxnConflictResponse* response,
@@ -321,6 +333,18 @@ public:
call_impl(&cloud::MetaService::get_current_max_txn_id, controller,
request, response, done);
}
+ void begin_sub_txn(::google::protobuf::RpcController* controller,
+ const BeginSubTxnRequest* request, BeginSubTxnResponse*
response,
+ ::google::protobuf::Closure* done) override {
+ call_impl(&cloud::MetaService::begin_sub_txn, controller, request,
response, done);
+ }
+
+ void abort_sub_txn(::google::protobuf::RpcController* controller,
+ const AbortSubTxnRequest* request, AbortSubTxnResponse*
response,
+ ::google::protobuf::Closure* done) override {
+ call_impl(&cloud::MetaService::abort_sub_txn, controller, request,
response, done);
+ }
+
void check_txn_conflict(::google::protobuf::RpcController* controller,
const CheckTxnConflictRequest* request,
CheckTxnConflictResponse* response,
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 7866fccaa39..5596cbbf7eb 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -662,6 +662,10 @@ void
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle
void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
const CommitTxnRequest* request,
CommitTxnResponse* response,
::google::protobuf::Closure* done) {
+ if (request->has_is_txn_load() && request->is_txn_load()) {
+ commit_txn_with_sub_txn(controller, request, response, done);
+ return;
+ }
RPC_PREPROCESS(commit_txn);
if (!request->has_txn_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
@@ -1024,15 +1028,585 @@ void
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" <<
hex(lock_keys[i])
<< " txn_id=" << txn_id;
- for (auto tablet_id : table_id_tablet_ids[table_id]) {
- std::string pending_key =
meta_pending_delete_bitmap_key({instance_id, tablet_id});
- txn->remove(pending_key);
- LOG(INFO) << "xxx remove delete bitmap pending key, pending_key="
<< hex(pending_key)
- << " txn_id=" << txn_id;
- }
+ for (auto tablet_id : table_id_tablet_ids[table_id]) {
+ std::string pending_key =
meta_pending_delete_bitmap_key({instance_id, tablet_id});
+ txn->remove(pending_key);
+ LOG(INFO) << "xxx remove delete bitmap pending key, pending_key="
<< hex(pending_key)
+ << " txn_id=" << txn_id;
+ }
+ }
+ lock_keys.clear();
+ lock_values.clear();
+
+ // Save rowset meta
+ for (auto& i : rowsets) {
+ size_t rowset_size = i.first.size() + i.second.size();
+ txn->put(i.first, i.second);
+ LOG(INFO) << "xxx put rowset_key=" << hex(i.first) << " txn_id=" <<
txn_id
+ << " rowset_size=" << rowset_size;
+ }
+
+ // Save versions
+ for (auto& i : new_versions) {
+ std::string ver_val;
+ VersionPB version_pb;
+ version_pb.set_version(i.second);
+ if (!version_pb.SerializeToString(&ver_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize version_pb when saving, txn_id=" <<
txn_id;
+ msg = ss.str();
+ return;
+ }
+
+ txn->put(i.first, ver_val);
+ LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << "
version:" << i.second
+ << " txn_id=" << txn_id;
+
+ std::string_view ver_key = i.first;
+ ver_key.remove_prefix(1); // Remove key space
+ // PartitionVersionKeyInfo {instance_id, db_id, table_id,
partition_id}
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
+ int ret = decode_key(&ver_key, &out);
+ if (ret != 0) [[unlikely]] {
+ // decode version key error means this is something wrong,
+ // we can not continue this txn
+ LOG(WARNING) << "failed to decode key, ret=" << ret << " key=" <<
hex(ver_key);
+ code = MetaServiceCode::UNDEFINED_ERR;
+ msg = "decode version key error";
+ return;
+ }
+
+ int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
+ int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
+ VLOG_DEBUG << " table_id=" << table_id << " partition_id=" <<
partition_id;
+
+ response->add_table_ids(table_id);
+ response->add_partition_ids(partition_id);
+ response->add_versions(i.second);
+ }
+
+ // Save table versions
+ for (auto& i : table_id_tablet_ids) {
+ std::string ver_key = table_version_key({instance_id, db_id, i.first});
+ txn->atomic_add(ver_key, 1);
+ LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) << "
txn_id=" << txn_id;
+ }
+
+ LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();
+
+ // Update txn_info
+ txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
+
+ auto now_time = system_clock::now();
+ uint64_t commit_time =
duration_cast<milliseconds>(now_time.time_since_epoch()).count();
+ if ((txn_info.prepare_time() + txn_info.timeout_ms()) < commit_time) {
+ code = MetaServiceCode::UNDEFINED_ERR;
+ msg = fmt::format("txn is expired, not allow to commit txn_id={}",
txn_id);
+ LOG(INFO) << msg << " prepare_time=" << txn_info.prepare_time()
+ << " timeout_ms=" << txn_info.timeout_ms() << "
commit_time=" << commit_time;
+ return;
+ }
+ txn_info.set_commit_time(commit_time);
+ txn_info.set_finish_time(commit_time);
+ if (request->has_commit_attachment()) {
+
txn_info.mutable_commit_attachment()->CopyFrom(request->commit_attachment());
+ }
+ LOG(INFO) << "after update txn_info=" << txn_info.ShortDebugString();
+ info_val.clear();
+ if (!txn_info.SerializeToString(&info_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
+ msg = ss.str();
+ return;
+ }
+ txn->put(info_key, info_val);
+ LOG(INFO) << "xxx put info_key=" << hex(info_key) << " txn_id=" << txn_id;
+
+ // Update stats of affected tablet
+ std::deque<std::string> kv_pool;
+ std::function<void(const StatsTabletKeyInfo&, const TabletStats&)>
update_tablet_stats;
+ if (config::split_tablet_stats) {
+ update_tablet_stats = [&](const StatsTabletKeyInfo& info, const
TabletStats& stats) {
+ if (stats.num_segs > 0) {
+ auto& data_size_key = kv_pool.emplace_back();
+ stats_tablet_data_size_key(info, &data_size_key);
+ txn->atomic_add(data_size_key, stats.data_size);
+ auto& num_rows_key = kv_pool.emplace_back();
+ stats_tablet_num_rows_key(info, &num_rows_key);
+ txn->atomic_add(num_rows_key, stats.num_rows);
+ auto& num_segs_key = kv_pool.emplace_back();
+ stats_tablet_num_segs_key(info, &num_segs_key);
+ txn->atomic_add(num_segs_key, stats.num_segs);
+ }
+ auto& num_rowsets_key = kv_pool.emplace_back();
+ stats_tablet_num_rowsets_key(info, &num_rowsets_key);
+ txn->atomic_add(num_rowsets_key, stats.num_rowsets);
+ };
+ } else {
+ update_tablet_stats = [&](const StatsTabletKeyInfo& info, const
TabletStats& stats) {
+ auto& key = kv_pool.emplace_back();
+ stats_tablet_key(info, &key);
+ auto& val = kv_pool.emplace_back();
+ TxnErrorCode err = txn->get(key, &val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TABLET_NOT_FOUND
+ :
cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get tablet stats, err={}
tablet_id={}", err,
+ std::get<4>(info));
+ return;
+ }
+ TabletStatsPB stats_pb;
+ if (!stats_pb.ParseFromString(val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("malformed tablet stats value, key={}",
hex(key));
+ return;
+ }
+ stats_pb.set_data_size(stats_pb.data_size() + stats.data_size);
+ stats_pb.set_num_rows(stats_pb.num_rows() + stats.num_rows);
+ stats_pb.set_num_rowsets(stats_pb.num_rowsets() +
stats.num_rowsets);
+ stats_pb.set_num_segments(stats_pb.num_segments() +
stats.num_segs);
+ stats_pb.SerializeToString(&val);
+ txn->put(key, val);
+ };
+ }
+ for (auto& [tablet_id, stats] : tablet_stats) {
+ DCHECK(tablet_ids.count(tablet_id));
+ auto& tablet_idx = tablet_ids[tablet_id];
+ StatsTabletKeyInfo info {instance_id, tablet_idx.table_id(),
tablet_idx.index_id(),
+ tablet_idx.partition_id(), tablet_id};
+ update_tablet_stats(info, stats);
+ if (code != MetaServiceCode::OK) return;
+ }
+ // Remove tmp rowset meta
+ for (auto& [k, _] : tmp_rowsets_meta) {
+ txn->remove(k);
+ LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" <<
txn_id;
+ }
+
+ const std::string running_key = txn_running_key({instance_id, db_id,
txn_id});
+ LOG(INFO) << "xxx remove running_key=" << hex(running_key) << " txn_id="
<< txn_id;
+ txn->remove(running_key);
+
+ std::string recycle_val;
+ std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
+ RecycleTxnPB recycle_pb;
+ recycle_pb.set_creation_time(commit_time);
+ recycle_pb.set_label(txn_info.label());
+
+ if (!recycle_pb.SerializeToString(&recycle_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize recycle_pb, txn_id=" << txn_id;
+ msg = ss.str();
+ return;
+ }
+ txn->put(recycle_key, recycle_val);
+
+ if (txn_info.load_job_source_type() ==
+ LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
+ put_routine_load_progress(code, msg, instance_id, request, txn.get(),
db_id);
+ }
+
+ LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) <<
" txn_id=" << txn_id;
+ LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" <<
txn->delete_bytes()
+ << " num_put_keys=" << txn->num_put_keys() << " num_del_keys="
<< txn->num_del_keys()
+ << " txn_size=" << txn->approximate_bytes() << " txn_id=" <<
txn_id;
+
+ // Finally we are done...
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ // calculate table stats from tablets stats
+ std::map<int64_t /*table_id*/, TableStats> table_stats;
+ std::vector<int64_t> base_tablet_ids(request->base_tablet_ids().begin(),
+ request->base_tablet_ids().end());
+ calc_table_stats(tablet_ids, tablet_stats, table_stats, base_tablet_ids);
+ for (const auto& pair : table_stats) {
+ TableStatsPB* stats_pb = response->add_table_stats();
+ auto table_id = pair.first;
+ auto stats = pair.second;
+ get_pb_from_tablestats(stats, stats_pb);
+ stats_pb->set_table_id(table_id);
+ VLOG_DEBUG << "Add TableStats to CommitTxnResponse. txn_id=" << txn_id
+ << " table_id=" << table_id
+ << " updated_row_count=" << stats_pb->updated_row_count();
+ }
+
+ response->mutable_txn_info()->CopyFrom(txn_info);
+} // end commit_txn
+
+/**
+ * This process is generally the same as commit_txn, the difference is that
+ * the partitions version will plus 1 in multi sub txns.
+ *
+ * One example:
+ * Suppose the table, partition, tablet and version info is:
+ * --------------------------------------------
+ * | table | partition | tablet | version |
+ * --------------------------------------------
+ * | t1 | t1_p1 | t1_p1.1 | 1 |
+ * | t1 | t1_p1 | t1_p1.2 | 1 |
+ * | t1 | t1_p2 | t1_p2.1 | 2 |
+ * | t2 | t2_p3 | t2_p3.1 | 3 |
+ * | t2 | t2_p4 | t2_p4.1 | 4 |
+ * --------------------------------------------
+ *
+ * Now we commit a txn with 3 sub txns and the tablets are:
+ * sub_txn1: t1_p1.1, t1_p1.2, t1_p2.1
+ * sub_txn2: t2_p3.1
+ * sub_txn3: t1_p1.1, t1_p1.2
+ * When commit, the partitions version will be:
+ * sub_txn1: t1_p1(1 -> 2), t1_p2(2 -> 3)
+ * sub_txn2: t2_p3(3 -> 4)
+ * sub_txn3: t1_p1(2 -> 3)
+ * After commit, the partitions version will be:
+ * t1: t1_p1(3), t1_p2(3)
+ * t2: t2_p3(4), t2_p4(4)
+ */
+void
MetaServiceImpl::commit_txn_with_sub_txn(::google::protobuf::RpcController*
controller,
+ const CommitTxnRequest* request,
+ CommitTxnResponse* response,
+ ::google::protobuf::Closure*
done) {
+ RPC_PREPROCESS(commit_txn);
+ if (!request->has_txn_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "invalid argument, missing txn id";
+ return;
+ }
+
+ int64_t txn_id = request->txn_id();
+ auto sub_txn_infos = request->sub_txn_infos();
+
+ std::string cloud_unique_id = request->has_cloud_unique_id() ?
request->cloud_unique_id() : "";
+ instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id << "
txn_id=" << txn_id;
+ return;
+ }
+
+ RPC_RATE_LIMIT(commit_txn)
+
+ // Create a readonly txn for scan tmp rowset
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ // Get db id with txn id
+ std::string index_val;
+ const std::string index_key = txn_index_key({instance_id, txn_id});
+ err = txn->get(index_key, &index_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get db id, txn_id=" << txn_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ TxnIndexPB index_pb;
+ if (!index_pb.ParseFromString(index_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse txn_index_pb, txn_id=" << txn_id;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ DCHECK(index_pb.has_tablet_index() == true);
+ DCHECK(index_pb.tablet_index().has_db_id() == true);
+ int64_t db_id = index_pb.tablet_index().db_id();
+
+ // Get temporary rowsets involved in the txn
+ std::map<int64_t, std::vector<std::pair<std::string,
doris::RowsetMetaCloudPB>>>
+ sub_txn_to_tmp_rowsets_meta;
+ for (const auto& sub_txn_info : sub_txn_infos) {
+ auto sub_txn_id = sub_txn_info.sub_txn_id();
+ // This is a range scan
+ MetaRowsetTmpKeyInfo rs_tmp_key_info0 {instance_id, sub_txn_id, 0};
+ MetaRowsetTmpKeyInfo rs_tmp_key_info1 {instance_id, sub_txn_id + 1, 0};
+ std::string rs_tmp_key0;
+ std::string rs_tmp_key1;
+ meta_rowset_tmp_key(rs_tmp_key_info0, &rs_tmp_key0);
+ meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1);
+ // Get rowset meta that should be commited
+ // tmp_rowset_key -> rowset_meta
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
tmp_rowsets_meta;
+
+ int num_rowsets = 0;
+ std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+ (int*)0x01, [rs_tmp_key0, rs_tmp_key1, &num_rowsets, &txn_id,
&sub_txn_id](int*) {
+ LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id
+ << ", sub_txn_id=" << sub_txn_id << "
num_rowsets=" << num_rowsets
+ << " range=[" << hex(rs_tmp_key0) << "," <<
hex(rs_tmp_key1) << ")";
+ });
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+ if (err == TxnErrorCode::TXN_TOO_OLD) {
+ err = txn_kv_->create_txn(&txn);
+ if (err == TxnErrorCode::TXN_OK) {
+ err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+ }
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "internal error, failed to get tmp rowset while
committing, txn_id=" << txn_id
+ << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ LOG(INFO) << "range_get rowset_tmp_key=" << hex(k) << "
txn_id=" << txn_id;
+ tmp_rowsets_meta.emplace_back();
+ if (!tmp_rowsets_meta.back().second.ParseFromArray(v.data(),
v.size())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed rowset meta, unable to initialize,
txn_id=" << txn_id
+ << " key=" << hex(k);
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ // Save keys that will be removed later
+ tmp_rowsets_meta.back().first = std::string(k.data(),
k.size());
+ ++num_rowsets;
+ if (!it->has_next()) rs_tmp_key0 = k;
+ }
+ rs_tmp_key0.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more());
+
+ VLOG_DEBUG << "txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id
+ << " tmp_rowsets_meta.size()=" << tmp_rowsets_meta.size();
+ sub_txn_to_tmp_rowsets_meta.emplace(sub_txn_id,
std::move(tmp_rowsets_meta));
+ }
+
+ // Create a read/write txn for guarantee consistency
+ txn.reset();
+ err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ // Get txn info with db_id and txn_id
+ std::string info_val; // Will be reused when saving updated txn
+ const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ err = txn->get(info_key, &info_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
+ :
cast_as<ErrCategory::READ>(err);
+ ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" <<
txn_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ TxnInfoPB txn_info;
+ if (!txn_info.ParseFromString(info_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" <<
txn_id;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ // TODO: do more check like txn state
+ DCHECK(txn_info.txn_id() == txn_id);
+ if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
+ code = MetaServiceCode::TXN_ALREADY_ABORTED;
+ ss << "transaction is already aborted: db_id=" << db_id << " txn_id="
<< txn_id;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
+ code = MetaServiceCode::TXN_ALREADY_VISIBLE;
+ ss << "transaction is already visible: db_id=" << db_id << " txn_id="
<< txn_id;
+ msg = ss.str();
+ response->mutable_txn_info()->CopyFrom(txn_info);
+ return;
+ }
+
+ LOG(INFO) << "txn_id=" << txn_id << " txn_info=" <<
txn_info.ShortDebugString();
+
+ // Prepare rowset meta and new_versions
+ // Read tablet indexes in batch.
+ std::map<int64_t, int64_t> tablet_id_to_idx;
+ std::vector<std::string> tablet_idx_keys;
+ std::vector<int64_t> partition_ids;
+ auto idx = 0;
+ for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+ for (auto& [_, i] : tmp_rowsets_meta) {
+ auto tablet_id = i.tablet_id();
+ if (tablet_id_to_idx.count(tablet_id) == 0) {
+ tablet_id_to_idx.emplace(tablet_id, idx);
+ tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id,
i.tablet_id()}));
+ partition_ids.push_back(i.partition_id());
+ idx++;
+ }
+ }
+ }
+ std::vector<std::optional<std::string>> tablet_idx_values;
+ err = txn->batch_get(&tablet_idx_values, tablet_idx_keys,
Transaction::BatchGetOptions(false));
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get tablet table index ids, err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg << " txn_id=" << txn_id;
+ return;
+ }
+
+ // tablet_id -> {table/index/partition}_id
+ std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
+ // table_id -> tablets_ids
+ std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
+ for (auto [tablet_id, i] : tablet_id_to_idx) {
+ if (!tablet_idx_values[i].has_value()) [[unlikely]] {
+ // The value must existed
+ code = MetaServiceCode::KV_TXN_GET_ERR;
+ ss << "failed to get tablet table index ids, err=not found"
+ << " tablet_id=" << tablet_id << " key=" <<
hex(tablet_idx_keys[i]);
+ msg = ss.str();
+ LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
+ return;
+ }
+ if
(!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value()))
[[unlikely]] {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed tablet index value tablet_id=" << tablet_id << "
txn_id=" << txn_id;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
+ VLOG_DEBUG << "tablet_id:" << tablet_id
+ << " value:" << tablet_ids[tablet_id].ShortDebugString();
+ }
+
+ tablet_idx_keys.clear();
+ tablet_idx_values.clear();
+
+ // {table/partition} -> version
+ std::unordered_map<std::string, uint64_t> new_versions;
+ std::vector<std::string> version_keys;
+ for (auto& [tablet_id, i] : tablet_id_to_idx) {
+ int64_t table_id = tablet_ids[tablet_id].table_id();
+ int64_t partition_id = partition_ids[i];
+ std::string ver_key = partition_version_key({instance_id, db_id,
table_id, partition_id});
+ if (new_versions.count(ver_key) == 0) {
+ new_versions.insert({ver_key, 0});
+ LOG(INFO) << "xxx add a partition_version_key=" << hex(ver_key) <<
" txn_id=" << txn_id
+ << ", db_id=" << db_id << ", table_id=" << table_id
+ << ", partition_id=" << partition_id;
+ version_keys.push_back(std::move(ver_key));
+ }
+ }
+ std::vector<std::optional<std::string>> version_values;
+ err = txn->batch_get(&version_values, version_keys);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get partition versions, err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg << " txn_id=" << txn_id;
+ return;
+ }
+ size_t total_versions = version_keys.size();
+ for (size_t i = 0; i < total_versions; i++) {
+ int64_t version;
+ if (version_values[i].has_value()) {
+ VersionPB version_pb;
+ if (!version_pb.ParseFromString(version_values[i].value())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse version pb txn_id=" << txn_id
+ << " key=" << hex(version_keys[i]);
+ msg = ss.str();
+ return;
+ }
+ version = version_pb.version();
+ } else {
+ version = 1;
+ }
+ new_versions[version_keys[i]] = version;
+ LOG(INFO) << "xxx get partition_version_key=" << hex(version_keys[i])
+ << " version:" << version << " txn_id=" << txn_id;
+ }
+ version_keys.clear();
+ version_values.clear();
+
+ std::vector<std::pair<std::string, std::string>> rowsets;
+ std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id ->
stats
+ for (const auto& sub_txn_info : sub_txn_infos) {
+ auto sub_txn_id = sub_txn_info.sub_txn_id();
+ auto tmp_rowsets_meta = sub_txn_to_tmp_rowsets_meta[sub_txn_id];
+ std::unordered_map<int64_t, int64_t> partition_id_to_version;
+ for (auto& [_, i] : tmp_rowsets_meta) {
+ int64_t tablet_id = i.tablet_id();
+ int64_t table_id = tablet_ids[tablet_id].table_id();
+ int64_t partition_id = i.partition_id();
+ std::string ver_key =
+ partition_version_key({instance_id, db_id, table_id,
partition_id});
+ if (new_versions.count(ver_key) == 0) [[unlikely]] {
+ // it is impossible.
+ code = MetaServiceCode::UNDEFINED_ERR;
+ ss << "failed to get partition version key, the target version
not exists in "
+ "new_versions."
+ << " txn_id=" << txn_id << ", db_id=" << db_id << ",
table_id=" << table_id
+ << ", partition_id=" << partition_id;
+ msg = ss.str();
+ LOG(ERROR) << msg;
+ return;
+ }
+
+ // Update rowset version
+ int64_t new_version = new_versions[ver_key];
+ if (partition_id_to_version.count(partition_id) == 0) {
+ new_versions[ver_key] = new_version + 1;
+ new_version = new_versions[ver_key];
+ partition_id_to_version[partition_id] = new_version;
+ }
+ i.set_start_version(new_version);
+ i.set_end_version(new_version);
+ LOG(INFO) << "xxx update rowset version, txn_id=" << txn_id
+ << ", sub_txn_id=" << sub_txn_id << ", table_id=" <<
table_id
+ << ", partition_id=" << partition_id << ", tablet_id="
<< tablet_id
+ << ", new_version=" << new_version;
+
+ std::string key = meta_rowset_key({instance_id, tablet_id,
i.end_version()});
+ std::string val;
+ if (!i.SerializeToString(&val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize rowset_meta, txn_id=" << txn_id;
+ msg = ss.str();
+ return;
+ }
+ rowsets.emplace_back(std::move(key), std::move(val));
+
+ // Accumulate affected rows
+ auto& stats = tablet_stats[tablet_id];
+ stats.data_size += i.data_disk_size();
+ stats.num_rows += i.num_rows();
+ ++stats.num_rowsets;
+ stats.num_segs += i.num_segments();
+ } // for tmp_rowsets_meta
}
- lock_keys.clear();
- lock_values.clear();
// Save rowset meta
for (auto& i : rowsets) {
@@ -1074,7 +1648,8 @@ void
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
int64_t table_id = std::get<int64_t>(std::get<0>(out[4]));
int64_t partition_id = std::get<int64_t>(std::get<0>(out[5]));
- VLOG_DEBUG << " table_id=" << table_id << " partition_id=" <<
partition_id;
+ VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
+ << " partition_id=" << partition_id << " version=" <<
i.second;
response->add_table_ids(table_id);
response->add_partition_ids(partition_id);
@@ -1174,9 +1749,11 @@ void
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
if (code != MetaServiceCode::OK) return;
}
// Remove tmp rowset meta
- for (auto& [k, _] : tmp_rowsets_meta) {
- txn->remove(k);
- LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id=" <<
txn_id;
+ for (auto& [_, tmp_rowsets_meta] : sub_txn_to_tmp_rowsets_meta) {
+ for (auto& [k, _] : tmp_rowsets_meta) {
+ txn->remove(k);
+ LOG(INFO) << "xxx remove tmp_rowset_key=" << hex(k) << " txn_id="
<< txn_id;
+ }
}
const std::string running_key = txn_running_key({instance_id, db_id,
txn_id});
@@ -1197,11 +1774,6 @@ void
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
}
txn->put(recycle_key, recycle_val);
- if (txn_info.load_job_source_type() ==
- LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
- put_routine_load_progress(code, msg, instance_id, request, txn.get(),
db_id);
- }
-
LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) <<
" txn_id=" << txn_id;
LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" <<
txn->delete_bytes()
<< " num_put_keys=" << txn->num_put_keys() << " num_del_keys="
<< txn->num_del_keys()
@@ -1233,7 +1805,7 @@ void
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
}
response->mutable_txn_info()->CopyFrom(txn_info);
-} // end commit_txn
+} // end commit_txn_with_sub_txn
void MetaServiceImpl::abort_txn(::google::protobuf::RpcController* controller,
const AbortTxnRequest* request,
AbortTxnResponse* response,
@@ -1612,6 +2184,327 @@ void
MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController*
response->set_current_max_txn_id(current_max_txn_id);
}
+/**
+ * 1. Generate a sub_txn_id
+ *
+ * The following steps are done in a txn:
+ * 2. Put txn_index_key in sub_txn_id
+ * 3. Delete txn_label_key in sub_txn_id
+ * 4. Modify the txn state of the txn_id:
+ * - Add the sub txn id to sub_txn_ids: recycler use it to recycle the
txn_index_key
+ * - Add the table id to table_ids
+ */
+void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController*
controller,
+ const BeginSubTxnRequest* request,
+ BeginSubTxnResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(begin_sub_txn);
+ int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
+ int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num()
: -1;
+ int64_t db_id = request->has_db_id() ? request->db_id() : -1;
+ auto& table_ids = request->table_ids();
+ std::string label = request->has_label() ? request->label() : "";
+ if (txn_id < 0 || sub_txn_num < 0 || db_id < 0 || table_ids.empty() ||
label.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_num=" <<
sub_txn_num
+ << " db_id=" << db_id << ", label=" << label << ", table_ids=[";
+ for (auto table_id : table_ids) {
+ ss << table_id << ", ";
+ }
+ ss << "]";
+ msg = ss.str();
+ return;
+ }
+
+ std::string cloud_unique_id = request->has_cloud_unique_id() ?
request->cloud_unique_id() : "";
+ instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ ss << "cannot find instance_id with cloud_unique_id="
+ << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+ msg = ss.str();
+ return;
+ }
+
+ RPC_RATE_LIMIT(begin_sub_txn)
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" <<
txn_id
+ << " db_id=" << db_id;
+ msg = ss.str();
+ return;
+ }
+
+ const std::string label_key = txn_label_key({instance_id, db_id, label});
+ std::string label_val;
+ err = txn->get(label_key, &label_val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "txn->get failed(), err=" << err << " label=" << label;
+ msg = ss.str();
+ return;
+ }
+
+ LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label
<< " err=" << err;
+
+ // err == OK means this is a retry rpc?
+ if (err == TxnErrorCode::TXN_OK) {
+ label_val = label_val.substr(0, label_val.size() - VERSION_STAMP_LEN);
+ }
+
+ // ret > 0, means label not exist previously.
+ txn->atomic_set_ver_value(label_key, label_val);
+ LOG(INFO) << "txn->atomic_set_ver_value label_key=" << hex(label_key);
+
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "txn->commit failed(), label=" << label << " err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ // 2. Get sub txn id from version stamp
+ txn.reset();
+ err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "failed to create txn when get txn id, label=" << label << "
err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ label_val.clear();
+ err = txn->get(label_key, &label_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "txn->get() failed, label=" << label << " err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ LOG(INFO) << "txn->get label_key=" << hex(label_key) << " label=" << label
<< " err=" << err;
+
+ // Generated by TxnKv system
+ int64_t sub_txn_id = 0;
+ int ret =
+ get_txn_id_from_fdb_ts(std::string_view(label_val).substr(
+ label_val.size() -
VERSION_STAMP_LEN, label_val.size()),
+ &sub_txn_id);
+ if (ret != 0) {
+ code = MetaServiceCode::TXN_GEN_ID_ERR;
+ ss << "get_txn_id_from_fdb_ts() failed, label=" << label << " ret=" <<
ret;
+ msg = ss.str();
+ return;
+ }
+
+ LOG(INFO) << "get_txn_id_from_fdb_ts() label=" << label << " sub_txn_id="
<< sub_txn_id
+ << " txn_id=" << txn_id << " label_val.size()=" <<
label_val.size();
+
+ // write txn_index_key
+ const std::string index_key = txn_index_key({instance_id, sub_txn_id});
+ std::string index_val;
+ TxnIndexPB index_pb;
+ if (!index_pb.SerializeToString(&index_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize txn_index_pb "
+ << "label=" << label << " txn_id=" << txn_id;
+ msg = ss.str();
+ return;
+ }
+
+ // Get and update txn info with db_id and txn_id
+ std::string info_val; // Will be reused when saving updated txn
+ const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ err = txn->get(info_key, &info_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
+ :
cast_as<ErrCategory::READ>(err);
+ ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" <<
txn_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ TxnInfoPB txn_info;
+ if (!txn_info.ParseFromString(info_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" <<
txn_id;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ DCHECK(txn_info.txn_id() == txn_id);
+ if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+ code = MetaServiceCode::TXN_INVALID_STATUS;
+ ss << "transaction status is " << txn_info.status() << " : db_id=" <<
db_id
+ << " txn_id=" << txn_id;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ if (txn_info.sub_txn_ids().size() != sub_txn_num) {
+ code = MetaServiceCode::UNDEFINED_ERR;
+ ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", expected
sub_txn_num=" << sub_txn_num
+ << ", txn_info.sub_txn_ids=[";
+ for (auto id : txn_info.sub_txn_ids()) {
+ ss << id << ", ";
+ }
+ ss << "]";
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ }
+ txn_info.mutable_sub_txn_ids()->Add(sub_txn_id);
+ txn_info.mutable_table_ids()->Clear();
+ for (auto table_id : table_ids) {
+ txn_info.mutable_table_ids()->Add(table_id);
+ }
+ if (!txn_info.SerializeToString(&info_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize txn_info when saving, txn_id=" << txn_id;
+ msg = ss.str();
+ return;
+ }
+
+ txn->remove(label_key);
+ txn->put(info_key, info_val);
+ txn->put(index_key, index_val);
+ LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
+ << ", remove label_key=" << hex(label_key) << ", put info_key="
<< hex(info_key)
+ << ", put index_key=" << hex(index_key);
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << err;
+ msg = ss.str();
+ return;
+ }
+ response->set_sub_txn_id(sub_txn_id);
+ response->mutable_txn_info()->CopyFrom(txn_info);
+}
+
+/**
+ * 1. Modify the txn state of the txn_id:
+ * - Remove the table id from table_ids
+ */
+void MetaServiceImpl::abort_sub_txn(::google::protobuf::RpcController*
controller,
+ const AbortSubTxnRequest* request,
+ AbortSubTxnResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(abort_sub_txn);
+ int64_t txn_id = request->has_txn_id() ? request->txn_id() : -1;
+ int64_t sub_txn_id = request->has_sub_txn_id() ? request->sub_txn_id() :
-1;
+ int64_t sub_txn_num = request->has_sub_txn_num() ? request->sub_txn_num()
: -1;
+ int64_t db_id = request->has_db_id() ? request->db_id() : -1;
+ auto& table_ids = request->table_ids();
+ if (txn_id < 0 || sub_txn_id < 0 || sub_txn_num < 0 || db_id < 0) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ ss << "invalid argument, txn_id=" << txn_id << ", sub_txn_id=" <<
sub_txn_id
+ << ", sub_txn_num=" << sub_txn_num << " db_id=" << db_id << ",
table_ids=[";
+ for (auto table_id : table_ids) {
+ ss << table_id << ", ";
+ }
+ ss << "]";
+ msg = ss.str();
+ return;
+ }
+
+ std::string cloud_unique_id = request->has_cloud_unique_id() ?
request->cloud_unique_id() : "";
+ instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ ss << "cannot find instance_id with cloud_unique_id="
+ << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
+ msg = ss.str();
+ return;
+ }
+
+ RPC_RATE_LIMIT(abort_sub_txn)
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "txn_kv_->create_txn() failed, err=" << err << " txn_id=" <<
txn_id
+ << " sub_txn_id=" << sub_txn_id << " db_id=" << db_id;
+ msg = ss.str();
+ return;
+ }
+
+ // Get and update txn info with db_id and txn_id
+ std::string info_val; // Will be reused when saving updated txn
+ const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ err = txn->get(info_key, &info_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::TXN_ID_NOT_FOUND
+ :
cast_as<ErrCategory::READ>(err);
+ ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id
+ << " sub_txn_id=" << sub_txn_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ TxnInfoPB txn_info;
+ if (!txn_info.ParseFromString(info_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" <<
txn_id
+ << " sub_txn_id=" << sub_txn_id;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ DCHECK(txn_info.txn_id() == txn_id);
+ if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+ code = MetaServiceCode::TXN_INVALID_STATUS;
+ ss << "transaction status is " << txn_info.status() << " : db_id=" <<
db_id
+ << " txn_id=" << txn_id << " sub_txn_id=" << sub_txn_id;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+
+ // remove table_id and does not need to remove sub_txn_id
+ if (txn_info.sub_txn_ids().size() != sub_txn_num) {
+ code = MetaServiceCode::UNDEFINED_ERR;
+ ss << "sub_txn_num mismatch, txn_id=" << txn_id << ", sub_txn_id=" <<
sub_txn_id
+ << ", expected sub_txn_num=" << sub_txn_num << ",
txn_info.sub_txn_ids=[";
+ for (auto id : txn_info.sub_txn_ids()) {
+ ss << id << ", ";
+ }
+ ss << "]";
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ }
+ txn_info.mutable_table_ids()->Clear();
+ for (auto table_id : table_ids) {
+ txn_info.mutable_table_ids()->Add(table_id);
+ }
+ // TODO should we try to delete txn_label_key if begin_sub_txn failed to
delete?
+
+ if (!txn_info.SerializeToString(&info_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize txn_info when saving, txn_id=" << txn_id
+ << " sub_txn_id=" << sub_txn_id;
+ msg = ss.str();
+ return;
+ }
+
+ txn->put(info_key, info_val);
+ LOG(INFO) << "txn_id=" << txn_id << ", sub_txn_id=" << sub_txn_id
+ << ", put info_key=" << hex(info_key);
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to commit kv txn, txn_id=" << txn_id << ", sub_txn_id="
<< sub_txn_id
+ << ", err=" << err;
+ msg = ss.str();
+ return;
+ }
+ response->mutable_txn_info()->CopyFrom(txn_info);
+}
+
void MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController*
controller,
const CheckTxnConflictRequest*
request,
CheckTxnConflictResponse* response,
@@ -1686,24 +2579,24 @@ void
MetaServiceImpl::check_txn_conflict(::google::protobuf::RpcController* cont
if (running_pb.timeout_time() < check_time) {
skip_timeout_txn_cnt++;
- break;
- }
-
- LOG(INFO) << "check watermark conflict range_get txn_run_key=" <<
hex(k)
- << " running_pb=" << running_pb.ShortDebugString();
- std::vector<int64_t>
running_table_ids(running_pb.table_ids().begin(),
-
running_pb.table_ids().end());
- std::sort(running_table_ids.begin(), running_table_ids.end());
- std::vector<int64_t> result(std::min(running_table_ids.size(),
src_table_ids.size()));
- std::vector<int64_t>::iterator iter = std::set_intersection(
- src_table_ids.begin(), src_table_ids.end(),
running_table_ids.begin(),
- running_table_ids.end(), result.begin());
- result.resize(iter - result.begin());
- if (result.size() > 0) {
- response->set_finished(false);
- LOG(INFO) << "skip timeout txn count: " << skip_timeout_txn_cnt
- << " total iteration count: " << total_iteration_cnt;
- return;
+ } else {
+ LOG(INFO) << "check watermark conflict range_get txn_run_key="
<< hex(k)
+ << " running_pb=" << running_pb.ShortDebugString();
+ std::vector<int64_t>
running_table_ids(running_pb.table_ids().begin(),
+
running_pb.table_ids().end());
+ std::sort(running_table_ids.begin(), running_table_ids.end());
+ std::vector<int64_t> result(
+ std::min(running_table_ids.size(),
src_table_ids.size()));
+ std::vector<int64_t>::iterator iter = std::set_intersection(
+ src_table_ids.begin(), src_table_ids.end(),
running_table_ids.begin(),
+ running_table_ids.end(), result.begin());
+ result.resize(iter - result.begin());
+ if (result.size() > 0) {
+ response->set_finished(false);
+ LOG(INFO) << "skip timeout txn count: " <<
skip_timeout_txn_cnt
+ << " total iteration count: " <<
total_iteration_cnt;
+ return;
+ }
}
if (!it->has_next()) {
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 05744bc596d..ca4c17b61ff 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -1906,6 +1906,15 @@ int InstanceRecycler::recycle_expired_txn_label() {
return -1;
}
txn->remove(info_key);
+ // Remove sub txn index kvs
+ std::vector<std::string> sub_txn_index_keys;
+ for (auto sub_txn_id : txn_info.sub_txn_ids()) {
+ auto sub_txn_index_key = txn_index_key({instance_id_, sub_txn_id});
+ sub_txn_index_keys.push_back(sub_txn_index_key);
+ }
+ for (auto& sub_txn_index_key : sub_txn_index_keys) {
+ txn->remove(sub_txn_index_key);
+ }
// Update txn label
std::string label_key, label_val;
txn_label_key({instance_id_, db_id, txn_info.label()}, &label_key);
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index b2b203f66e0..013433239d3 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -1332,6 +1332,326 @@ TEST(MetaServiceTest, CommitTxnExpiredTest) {
}
}
+TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
+ auto meta_service = get_meta_service();
+ int64_t db_id = 98131;
+ int64_t txn_id = -1;
+ int64_t t1 = 10;
+ int64_t t1_index = 100;
+ int64_t t1_p1 = 11;
+ int64_t t1_p1_t1 = 12;
+ int64_t t1_p1_t2 = 13;
+ int64_t t1_p2 = 14;
+ int64_t t1_p2_t1 = 15;
+ int64_t t2 = 16;
+ int64_t t2_index = 101;
+ int64_t t2_p3 = 17;
+ int64_t t2_p3_t1 = 18;
+ [[maybe_unused]] int64_t t2_p4 = 19;
+ [[maybe_unused]] int64_t t2_p4_t1 = 20;
+ // begin txn
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label("test_label");
+ txn_info_pb.add_table_ids(t1);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ }
+
+ // mock rowset and tablet: for sub_txn1
+ int64_t sub_txn_id1 = txn_id;
+ {
+ create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1);
+ auto tmp_rowset = create_rowset(sub_txn_id1, t1_p1_t1, t1_p1);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ {
+ create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2);
+ auto tmp_rowset = create_rowset(sub_txn_id1, t1_p1_t2, t1_p1);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ {
+ create_tablet(meta_service.get(), t1, t1_index, t1_p2, t1_p2_t1);
+ auto tmp_rowset = create_rowset(sub_txn_id1, t1_p2_t1, t1_p2);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ // begin_sub_txn2
+ int64_t sub_txn_id2 = -1;
+ {
+ brpc::Controller cntl;
+ BeginSubTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_num(0);
+ req.set_db_id(db_id);
+ req.set_label("test_label_0");
+ req.mutable_table_ids()->Add(t1);
+ req.mutable_table_ids()->Add(t2);
+ BeginSubTxnResponse res;
+
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1);
+ ASSERT_TRUE(res.has_sub_txn_id());
+ sub_txn_id2 = res.sub_txn_id();
+ ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+ }
+ // mock rowset and tablet: for sub_txn3
+ {
+ create_tablet(meta_service.get(), t2, t2_index, t2_p3, t2_p3_t1);
+ auto tmp_rowset = create_rowset(sub_txn_id2, t2_p3_t1, t2_p3);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ // begin_sub_txn3
+ int64_t sub_txn_id3 = -1;
+ {
+ brpc::Controller cntl;
+ BeginSubTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_num(1);
+ req.set_db_id(db_id);
+ req.set_label("test_label_1");
+ req.mutable_table_ids()->Add(t1);
+ req.mutable_table_ids()->Add(t2);
+ req.mutable_table_ids()->Add(t1);
+ BeginSubTxnResponse res;
+
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), 3);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2);
+ ASSERT_TRUE(res.has_sub_txn_id());
+ sub_txn_id3 = res.sub_txn_id();
+ ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]);
+ }
+ // mock rowset and tablet: for sub_txn3
+ {
+ create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1);
+ auto tmp_rowset = create_rowset(sub_txn_id3, t1_p1_t1, t1_p1);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ {
+ create_tablet(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2);
+ auto tmp_rowset = create_rowset(sub_txn_id3, t1_p1_t2, t1_p1);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ // commit txn
+ CommitTxnRequest req;
+ {
+ brpc::Controller cntl;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(666);
+ req.set_txn_id(txn_id);
+ req.set_is_txn_load(true);
+
+ SubTxnInfo sub_txn_info1;
+ sub_txn_info1.set_sub_txn_id(sub_txn_id1);
+ sub_txn_info1.set_table_id(t1);
+ sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t1);
+ sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t2);
+ sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p2_t1);
+
+ SubTxnInfo sub_txn_info2;
+ sub_txn_info2.set_sub_txn_id(sub_txn_id2);
+ sub_txn_info2.set_table_id(t2);
+ sub_txn_info2.mutable_base_tablet_ids()->Add(t2_p3_t1);
+
+ SubTxnInfo sub_txn_info3;
+ sub_txn_info3.set_sub_txn_id(sub_txn_id3);
+ sub_txn_info3.set_table_id(t1);
+ sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t1);
+ sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t2);
+
+ req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info1));
+ req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info2));
+ req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info3));
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ // std::cout << res.DebugString() << std::endl;
+ ASSERT_EQ(res.table_ids().size(), 3);
+
+ ASSERT_EQ(res.table_ids()[0], t2);
+ ASSERT_EQ(res.partition_ids()[0], t2_p3);
+ ASSERT_EQ(res.versions()[0], 2);
+
+ ASSERT_EQ(res.table_ids()[1], t1);
+ ASSERT_EQ(res.partition_ids()[1], t1_p2);
+ ASSERT_EQ(res.versions()[1], 2);
+
+ ASSERT_EQ(res.table_ids()[2], t1);
+ ASSERT_EQ(res.partition_ids()[2], t1_p1);
+ ASSERT_EQ(res.versions()[2], 3);
+ }
+
+ // doubly commit txn
+ {
+ brpc::Controller cntl;
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::TXN_ALREADY_VISIBLE);
+ auto found = res.status().msg().find(
+ fmt::format("transaction is already visible: db_id={}
txn_id={}", db_id, txn_id));
+ ASSERT_TRUE(found != std::string::npos);
+ }
+}
+
+TEST(MetaServiceTest, BeginAndAbortSubTxnTest) {
+ auto meta_service = get_meta_service();
+ long db_id = 98762;
+ int64_t txn_id = -1;
+ // begin txn
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label("test_label");
+ txn_info_pb.add_table_ids(1234);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ }
+ // case: begin 2 sub txn
+ int64_t sub_txn_id1 = -1;
+ int64_t sub_txn_id2 = -1;
+ for (int i = 0; i < 2; i++) {
+ {
+ brpc::Controller cntl;
+ BeginSubTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_num(i);
+ req.set_db_id(db_id);
+ req.set_label("test_label_" + std::to_string(i));
+ req.mutable_table_ids()->Add(1234);
+ req.mutable_table_ids()->Add(1235);
+ if (i == 1) {
+ req.mutable_table_ids()->Add(1235);
+ }
+ BeginSubTxnResponse res;
+
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), i == 0 ? 2 : 3);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(), i == 0 ? 1 : 2);
+ ASSERT_TRUE(res.has_sub_txn_id());
+ if (i == 0) {
+ sub_txn_id1 = res.sub_txn_id();
+ ASSERT_EQ(sub_txn_id1, res.txn_info().sub_txn_ids()[0]);
+ } else {
+ sub_txn_id2 = res.sub_txn_id();
+ ASSERT_EQ(sub_txn_id1, res.txn_info().sub_txn_ids()[0]);
+ ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[1]);
+ }
+ }
+ // get txn state
+ {
+ brpc::Controller cntl;
+ GetTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ GetTxnResponse res;
+
meta_service->get_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), i == 0 ? 2 : 3);
+ ASSERT_EQ(res.txn_info().table_ids()[0], 1234);
+ ASSERT_EQ(res.txn_info().table_ids()[1], 1235);
+ if (i == 1) {
+ ASSERT_EQ(res.txn_info().table_ids()[2], 1235);
+ }
+ }
+ }
+ // case: abort sub txn
+ {
+ {
+ brpc::Controller cntl;
+ AbortSubTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_id(sub_txn_id2);
+ req.set_sub_txn_num(2);
+ req.set_db_id(db_id);
+ req.mutable_table_ids()->Add(1234);
+ req.mutable_table_ids()->Add(1235);
+ AbortSubTxnResponse res;
+
meta_service->abort_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ // check txn state
+ ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2);
+ ASSERT_EQ(sub_txn_id1, res.txn_info().sub_txn_ids()[0]);
+ ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[1]);
+ }
+ // get txn state
+ {
+ brpc::Controller cntl;
+ GetTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ GetTxnResponse res;
+
meta_service->get_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+ ASSERT_EQ(res.txn_info().table_ids()[0], 1234);
+ ASSERT_EQ(res.txn_info().table_ids()[1], 1235);
+ }
+ }
+ // check label key does not exist
+ for (int i = 0; i < 2; i++) {
+ std::string key =
+ txn_label_key({"test_instance", db_id, "test_label_" +
std::to_string(i)});
+ std::string val;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND);
+ }
+ // check txn index key exist
+ for (auto i : {sub_txn_id1, sub_txn_id2}) {
+ std::string key = txn_index_key({"test_instance", i});
+ std::string val;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+ }
+}
+
TEST(MetaServiceTest, AbortTxnTest) {
auto meta_service = get_meta_service();
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index f653f4e8e0b..07aa5b2d40b 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -1464,6 +1464,151 @@ TEST(RecyclerTest, recycle_expired_txn_label) {
ASSERT_EQ(get_txn_info(txn_kv, mock_instance, db_id, txn_id,
txn_info_pb), -2);
ASSERT_EQ(check_recycle_txn_keys(txn_kv, mock_instance, db_id, txn_id,
label), 0);
}
+
+ label = "recycle_expired_txn_label_with_sub_txn";
+ int64_t table2_id = 12131278;
+ {
+ // 1. begin_txn
+ // 2. begin_sub_txn2
+ // 3. begin_sub_txn3
+ // 4. abort_sub_txn3
+ // 5. commit_txn
+ // 6. recycle_expired_txn_label
+ // 7. check
+ [[maybe_unused]] int64_t sub_txn_id1 = -1;
+ int64_t sub_txn_id2 = -1;
+ int64_t sub_txn_id3 = -1;
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+
+ req.set_cloud_unique_id(cloud_unique_id);
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label(label);
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(10000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ txn_id = res.txn_id();
+ sub_txn_id1 = txn_id;
+ ASSERT_GT(txn_id, -1);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ InstanceInfoPB instance;
+ instance.set_instance_id(mock_instance);
+ InstanceRecycler recycler(txn_kv, instance);
+ ASSERT_EQ(recycler.init(), 0);
+ sleep(1);
+ recycler.abort_timeout_txn();
+ TxnInfoPB txn_info_pb;
+ get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb);
+ ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_PREPARED);
+
+ // 2. begin sub_txn2
+ {
+ brpc::Controller cntl;
+ BeginSubTxnRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_num(0);
+ req.set_db_id(db_id);
+ req.set_label("test_sub_label1");
+ req.mutable_table_ids()->Add(table_id);
+ req.mutable_table_ids()->Add(table2_id);
+ BeginSubTxnResponse res;
+
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1);
+ ASSERT_TRUE(res.has_sub_txn_id());
+ sub_txn_id2 = res.sub_txn_id();
+ ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+ }
+
+ // 3. begin sub_txn3
+ {
+ brpc::Controller cntl;
+ BeginSubTxnRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_num(1);
+ req.set_db_id(db_id);
+ req.set_label("test_sub_label2");
+ req.mutable_table_ids()->Add(table_id);
+ req.mutable_table_ids()->Add(table2_id);
+ req.mutable_table_ids()->Add(table_id);
+ BeginSubTxnResponse res;
+
meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(res.txn_info().table_ids().size(), 3);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2);
+ ASSERT_TRUE(res.has_sub_txn_id());
+ sub_txn_id3 = res.sub_txn_id();
+ ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+ ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]);
+ }
+
+ // 4. abort sub_txn3
+ {
+ brpc::Controller cntl;
+ AbortSubTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_txn_id(txn_id);
+ req.set_sub_txn_num(2);
+ req.set_sub_txn_id(sub_txn_id3);
+ req.set_db_id(db_id);
+ req.mutable_table_ids()->Add(table_id);
+ req.mutable_table_ids()->Add(table2_id);
+ AbortSubTxnResponse res;
+
meta_service->abort_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ // check txn state
+ ASSERT_EQ(res.txn_info().table_ids().size(), 2);
+ ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2);
+ ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]);
+ ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]);
+ }
+
+ // 4. commit_txn
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ req.set_is_txn_load(true);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
+ &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+ // check txn_index_key for sub_txn_id exist
+ for (auto i : {sub_txn_id2, sub_txn_id3}) {
+ std::string key = txn_index_key({mock_instance, i});
+ std::string val;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+ }
+ // 5. recycle
+ recycler.recycle_expired_txn_label();
+ ASSERT_EQ(get_txn_info(txn_kv, mock_instance, db_id, txn_id,
txn_info_pb), -2);
+ ASSERT_EQ(check_recycle_txn_keys(txn_kv, mock_instance, db_id, txn_id,
label), 0);
+ // check txn_index_key for sub_txn_id are deleted
+ for (auto i : {sub_txn_id2, sub_txn_id3}) {
+ std::string key = txn_index_key({mock_instance, i});
+ std::string val;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND);
+ }
+ }
}
void create_object_file_pb(std::string prefix, std::vector<ObjectFilePB>*
object_files,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 7463a684680..f7a178deb01 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -200,6 +200,24 @@ public class MetaServiceClient {
return blockingStub.getCurrentMaxTxnId(request);
}
+ public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest
request) {
+ if (!request.hasCloudUniqueId()) {
+ Cloud.BeginSubTxnRequest.Builder builder =
Cloud.BeginSubTxnRequest.newBuilder();
+ builder.mergeFrom(request);
+ return
blockingStub.beginSubTxn(builder.setCloudUniqueId(Config.cloud_unique_id).build());
+ }
+ return blockingStub.beginSubTxn(request);
+ }
+
+ public Cloud.AbortSubTxnResponse abortSubTxn(Cloud.AbortSubTxnRequest
request) {
+ if (!request.hasCloudUniqueId()) {
+ Cloud.AbortSubTxnRequest.Builder builder =
Cloud.AbortSubTxnRequest.newBuilder();
+ builder.mergeFrom(request);
+ return
blockingStub.abortSubTxn(builder.setCloudUniqueId(Config.cloud_unique_id).build());
+ }
+ return blockingStub.abortSubTxn(request);
+ }
+
public Cloud.CheckTxnConflictResponse
checkTxnConflict(Cloud.CheckTxnConflictRequest request) {
return blockingStub.checkTxnConflict(request);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 117cfd71bd0..a2dbdaac2c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -246,6 +246,26 @@ public class MetaServiceProxy {
}
}
+ public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest
request)
+ throws RpcException {
+ try {
+ final MetaServiceClient client = getProxy();
+ return client.beginSubTxn(request);
+ } catch (Exception e) {
+ throw new RpcException("", e.getMessage(), e);
+ }
+ }
+
+ public Cloud.AbortSubTxnResponse abortSubTxn(Cloud.AbortSubTxnRequest
request)
+ throws RpcException {
+ try {
+ final MetaServiceClient client = getProxy();
+ return client.abortSubTxn(request);
+ } catch (Exception e) {
+ throw new RpcException("", e.getMessage(), e);
+ }
+ }
+
public Cloud.CheckTxnConflictResponse
checkTxnConflict(Cloud.CheckTxnConflictRequest request)
throws RpcException {
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 98c3c5dfc8d..3184d150df7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -28,8 +28,12 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.catalog.CloudPartition;
+import org.apache.doris.cloud.proto.Cloud.AbortSubTxnRequest;
+import org.apache.doris.cloud.proto.Cloud.AbortSubTxnResponse;
import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest;
import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse;
+import org.apache.doris.cloud.proto.Cloud.BeginSubTxnRequest;
+import org.apache.doris.cloud.proto.Cloud.BeginSubTxnResponse;
import org.apache.doris.cloud.proto.Cloud.BeginTxnRequest;
import org.apache.doris.cloud.proto.Cloud.BeginTxnResponse;
import org.apache.doris.cloud.proto.Cloud.CheckTxnConflictRequest;
@@ -50,6 +54,7 @@ import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB;
import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
import org.apache.doris.cloud.proto.Cloud.PrecommitTxnRequest;
import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse;
+import org.apache.doris.cloud.proto.Cloud.SubTxnInfo;
import org.apache.doris.cloud.proto.Cloud.TableStatsPB;
import org.apache.doris.cloud.proto.Cloud.TxnInfoPB;
import org.apache.doris.cloud.proto.Cloud.TxnStatusPB;
@@ -63,6 +68,7 @@ import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
@@ -124,10 +130,12 @@ import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -139,6 +147,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
private static final int CALCULATE_DELETE_BITMAP_TASK_TIMEOUT_SECONDS = 15;
private TxnStateCallbackFactory callbackFactory;
+ private final Map<Long, Long> subTxnIdToTxnId = new ConcurrentHashMap<>();
public CloudGlobalTransactionMgr() {
this.callbackFactory = new TxnStateCallbackFactory();
@@ -453,6 +462,11 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
final CommitTxnRequest commitTxnRequest = builder.build();
+ commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
+ }
+
+ private void commitTxn(CommitTxnRequest commitTxnRequest, long
transactionId, boolean is2PC, long dbId,
+ List<Table> tableList) throws UserException {
CommitTxnResponse commitTxnResponse = null;
int retryTime = 0;
@@ -783,7 +797,35 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
@Override
public boolean commitAndPublishTransaction(DatabaseIf db, long
transactionId,
List<SubTransactionState> subTransactionStates, long
timeoutMillis) throws UserException {
- throw new UnsupportedOperationException("commitAndPublishTransaction
is not supported in cloud");
+ if (Config.disable_load_job) {
+ throw new TransactionCommitFailedException(
+ "disable_load_job is set to true, all load jobs are not
allowed");
+ }
+ LOG.info("try to commit transaction, txnId: {}, subTxnStates: {}",
transactionId, subTransactionStates);
+ cleanSubTransactions(transactionId);
+ CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
+ builder.setDbId(db.getId())
+ .setTxnId(transactionId)
+ .setIs2Pc(false)
+ .setCloudUniqueId(Config.cloud_unique_id)
+ .setIsTxnLoad(true);
+ // add sub txn infos
+ for (SubTransactionState subTransactionState : subTransactionStates) {
+
builder.addSubTxnInfos(SubTxnInfo.newBuilder().setSubTxnId(subTransactionState.getSubTransactionId())
+ .setTableId(subTransactionState.getTable().getId())
+ .addAllBaseTabletIds(
+
getBaseTabletsFromTables(Lists.newArrayList(subTransactionState.getTable()),
+
subTransactionState.getTabletCommitInfos().stream()
+ .map(c -> new
TabletCommitInfo(c.getTabletId(), c.getBackendId()))
+ .collect(Collectors.toList())))
+ .build());
+ }
+
+ final CommitTxnRequest commitTxnRequest = builder.build();
+ commitTxn(commitTxnRequest, transactionId, false, db.getId(),
+
subTransactionStates.stream().map(SubTransactionState::getTable)
+ .collect(Collectors.toList()));
+ return true;
}
@Override
@@ -812,6 +854,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
@Override
public void abortTransaction(Long dbId, Long transactionId, String reason)
throws UserException {
+ cleanSubTransactions(transactionId);
abortTransaction(dbId, transactionId, reason, null, null);
}
@@ -1133,6 +1176,11 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
@Override
public TransactionState getTransactionState(long dbId, long transactionId)
{
+ if (subTxnIdToTxnId.containsKey(transactionId)) {
+ LOG.info("try to get transaction state, subTxnId:{},
transactionId:{}", transactionId,
+ subTxnIdToTxnId.get(transactionId));
+ transactionId = subTxnIdToTxnId.get(transactionId);
+ }
LOG.info("try to get transaction state, dbId:{}, transactionId:{}",
dbId, transactionId);
GetTxnRequest.Builder builder = GetTxnRequest.newBuilder();
builder.setDbId(dbId);
@@ -1351,11 +1399,101 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
@Override
public void addSubTransaction(long dbId, long transactionId, long
subTransactionId) {
- throw new UnsupportedOperationException("addSubTransaction is not
supported in cloud");
+ subTxnIdToTxnId.put(subTransactionId, transactionId);
}
@Override
public void removeSubTransaction(long dbId, long subTransactionId) {
- throw new UnsupportedOperationException("removeSubTransaction is not
supported in cloud");
+ subTxnIdToTxnId.remove(subTransactionId);
+ }
+
+ private void cleanSubTransactions(long transactionId) {
+ Iterator<Entry<Long, Long>> iterator =
subTxnIdToTxnId.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<Long, Long> entry = iterator.next();
+ if (entry.getValue() == transactionId) {
+ iterator.remove();
+ }
+ }
+ }
+
+ public Pair<Long, TransactionState> beginSubTxn(long txnId, long dbId,
List<Long> tableIds, String label,
+ long subTxnNum) throws UserException {
+ LOG.info("try to begin sub transaction, txnId: {}, dbId: {}, tableIds:
{}, label: {}, subTxnNum: {}", txnId,
+ dbId, tableIds, label, subTxnNum);
+ BeginSubTxnRequest request =
BeginSubTxnRequest.newBuilder().setCloudUniqueId(Config.cloud_unique_id)
+
.setTxnId(txnId).setDbId(dbId).addAllTableIds(tableIds).setLabel(label).setSubTxnNum(subTxnNum).build();
+ BeginSubTxnResponse response = null;
+ int retryTime = 0;
+ try {
+ while (retryTime < Config.metaServiceRpcRetryTimes()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("retryTime:{}, beginSubTxnRequest:{}",
retryTime, request);
+ }
+ response = MetaServiceProxy.getInstance().beginSubTxn(request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("retryTime:{}, beginSubTxnResponse:{}",
retryTime, response);
+ }
+
+ if (response.getStatus().getCode() !=
MetaServiceCode.KV_TXN_CONFLICT) {
+ break;
+ }
+ LOG.info("beginSubTxn KV_TXN_CONFLICT, retryTime:{}",
retryTime);
+ backoff();
+ retryTime++;
+ }
+
+ Preconditions.checkNotNull(response);
+ Preconditions.checkNotNull(response.getStatus());
+ } catch (Exception e) {
+ LOG.warn("beginSubTxn failed, exception:", e);
+ throw new UserException("beginSubTxn failed, errMsg:" +
e.getMessage());
+ }
+
+ if (response.getStatus().getCode() != MetaServiceCode.OK) {
+ throw new UserException(response.getStatus().getMsg());
+ }
+ return Pair.of(response.hasSubTxnId() ? response.getSubTxnId() : 0,
+ TxnUtil.transactionStateFromPb(response.getTxnInfo()));
+ }
+
+ public TransactionState abortSubTxn(long txnId, long subTxnId, long dbId,
List<Long> tableIds, long subTxnNum)
+ throws UserException {
+ LOG.info("try to abort sub transaction, txnId: {}, subTxnId: {}, dbId:
{}, tableIds: {}, subTxnNum: {}", txnId,
+ subTxnId, dbId, tableIds, subTxnNum);
+ AbortSubTxnRequest request =
AbortSubTxnRequest.newBuilder().setCloudUniqueId(Config.cloud_unique_id)
+
.setTxnId(txnId).setSubTxnId(subTxnId).setDbId(dbId).addAllTableIds(tableIds).setSubTxnNum(subTxnId)
+ .build();
+ AbortSubTxnResponse response = null;
+ int retryTime = 0;
+ try {
+ while (retryTime < Config.metaServiceRpcRetryTimes()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("retryTime:{}, abortSubTxnRequest:{}",
retryTime, request);
+ }
+ response = MetaServiceProxy.getInstance().abortSubTxn(request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("retryTime:{}, abortSubTxnResponse:{}",
retryTime, response);
+ }
+
+ if (response.getStatus().getCode() !=
MetaServiceCode.KV_TXN_CONFLICT) {
+ break;
+ }
+ LOG.info("abortSubTxn KV_TXN_CONFLICT, retryTime:{}",
retryTime);
+ backoff();
+ retryTime++;
+ }
+
+ Preconditions.checkNotNull(response);
+ Preconditions.checkNotNull(response.getStatus());
+ } catch (Exception e) {
+ LOG.warn("abortSubTxn failed, exception:", e);
+ throw new UserException("abortSubTxn failed, errMsg:" +
e.getMessage());
+ }
+
+ if (response.getStatus().getCode() != MetaServiceCode.OK) {
+ throw new UserException(response.getStatus().getMsg());
+ }
+ return TxnUtil.transactionStateFromPb(response.getTxnInfo());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 00a95a42c22..f80782bbf5d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -21,10 +21,15 @@ import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
@@ -40,6 +45,7 @@ import org.apache.doris.thrift.TLoadTxnBeginResult;
import org.apache.doris.thrift.TTabletCommitInfo;
import org.apache.doris.thrift.TTxnLoadInfo;
import org.apache.doris.thrift.TTxnParams;
+import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.SubTransactionState.SubTransactionType;
@@ -77,6 +83,12 @@ public class TransactionEntry {
private long transactionId = -1;
private TransactionState transactionState;
private long timeoutTimestamp = -1;
+ // 1. For cloud mode, we keep subTransactionStates in TransactionEntry;
+ // 2. For doris, we keep subTransactionStates in TransactionState, because
if executed in observer,
+ // the dml statements will be forwarded to master, so keep the
subTransactionStates is in master.
+ private List<SubTransactionState> subTransactionStates = new ArrayList<>();
+ // Used for cloud mode, including all successful or failed sub
transactions except the first one
+ private long allSubTxnNum = 0;
public TransactionEntry() {
}
@@ -176,11 +188,18 @@ public class TransactionEntry {
throw new AnalysisException(
"Transaction insert can not insert into values and insert
into select at the same time");
}
+ if (Config.isCloudMode()) {
+ OlapTable olapTable = (OlapTable) table;
+ if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS &&
olapTable.getEnableUniqueKeyMergeOnWrite()) {
+ throw new UserException(
+ "Transaction load is not supported for merge on write
unique keys table in cloud mode");
+ }
+ }
DatabaseIf database = table.getDatabase();
if (!isTransactionBegan) {
long timeoutSecond = ConnectContext.get().getExecTimeout();
this.timeoutTimestamp = System.currentTimeMillis() + timeoutSecond
* 1000;
- if (Env.getCurrentEnv().isMaster()) {
+ if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
this.transactionId =
Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), Lists.newArrayList(table.getId()),
label,
new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
@@ -206,9 +225,23 @@ public class TransactionEntry {
throw new AnalysisException(
"Transaction insert must be in the same database,
expect db_id=" + this.database.getId());
}
- this.transactionState.addTableId(table.getId());
- long subTxnId =
Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
+ long subTxnId;
+ if (Config.isCloudMode()) {
+ TUniqueId queryId = ConnectContext.get().queryId();
+ String label = String.format("tl_%x_%x", queryId.hi,
queryId.lo);
+ List<Long> tableIds = getTableIds();
+ tableIds.add(table.getId());
+ Pair<Long, TransactionState> pair
+ = ((CloudGlobalTransactionMgr)
Env.getCurrentGlobalTransactionMgr()).beginSubTxn(
+ transactionId, table.getDatabase().getId(), tableIds,
label, allSubTxnNum);
+ this.transactionState = pair.second;
+ subTxnId = pair.first;
+ } else {
+ subTxnId =
Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
+ this.transactionState.addTableId(table.getId());
+ }
Env.getCurrentGlobalTransactionMgr().addSubTransaction(database.getId(),
transactionId, subTxnId);
+ allSubTxnNum++;
return subTxnId;
}
}
@@ -216,7 +249,7 @@ public class TransactionEntry {
public TransactionStatus commitTransaction() throws Exception {
if (isTransactionBegan) {
try {
- if (Env.getCurrentEnv().isMaster()) {
+ if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
beforeFinishTransaction();
long commitTimeout = Math.min(60000L,
Math.max(timeoutTimestamp - System.currentTimeMillis(), 0));
if
(Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(database,
transactionId,
@@ -275,7 +308,7 @@ public class TransactionEntry {
private long abortTransaction(String reason) throws Exception {
if (isTransactionBegan) {
- if (Env.getCurrentEnv().isMaster()) {
+ if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
beforeFinishTransaction();
Env.getCurrentGlobalTransactionMgr().abortTransaction(database.getId(),
transactionId, reason);
return transactionId;
@@ -301,7 +334,9 @@ public class TransactionEntry {
if (isTransactionBegan) {
List<Long> tableIds =
transactionState.getTableIdList().stream().distinct().collect(Collectors.toList());
transactionState.setTableIdList(tableIds);
- transactionState.getSubTransactionStates().sort((s1, s2) -> {
+ List<SubTransactionState> subTransactionStatesPtr =
Config.isCloudMode() ? subTransactionStates
+ : transactionState.getSubTransactionStates();
+ subTransactionStatesPtr.sort((s1, s2) -> {
if (s1.getSubTransactionType() == SubTransactionType.INSERT
&& s2.getSubTransactionType() ==
SubTransactionType.DELETE) {
return 1;
@@ -312,6 +347,9 @@ public class TransactionEntry {
return Long.compare(s1.getSubTransactionId(),
s2.getSubTransactionId());
}
});
+ if (Config.isCloudMode()) {
+
transactionState.setSubTransactionStates(subTransactionStatesPtr);
+ }
LOG.info("subTransactionStates={}",
transactionState.getSubTransactionStates());
transactionState.resetSubTxnIds();
}
@@ -329,7 +367,18 @@ public class TransactionEntry {
public void abortSubTransaction(long subTransactionId, Table table) {
if (isTransactionBegan) {
- this.transactionState.removeTableId(table.getId());
+ if (Config.isCloudMode()) {
+ try {
+ List<Long> tableIds = getTableIds();
+ this.transactionState
+ = ((CloudGlobalTransactionMgr)
Env.getCurrentGlobalTransactionMgr()).abortSubTxn(
+ transactionId, subTransactionId,
table.getDatabase().getId(), tableIds, allSubTxnNum);
+ } catch (UserException e) {
+ LOG.error("Failed to remove table_id={} from txn_id={}",
table.getId(), transactionId, e);
+ }
+ } else {
+ this.transactionState.removeTableId(table.getId());
+ }
Env.getCurrentGlobalTransactionMgr().removeSubTransaction(table.getDatabase().getId(),
subTransactionId);
}
}
@@ -340,13 +389,15 @@ public class TransactionEntry {
LOG.debug("label={}, txn_id={}, sub_txn_id={}, table={},
commit_infos={}",
label, transactionId, subTxnId, table, commitInfos);
}
- transactionState.getSubTransactionStates()
+ List<SubTransactionState> subTransactionStatesPtr =
Config.isCloudMode() ? subTransactionStates
+ : transactionState.getSubTransactionStates();
+ subTransactionStatesPtr
.add(new SubTransactionState(subTxnId, table, commitInfos,
subTransactionType));
- Preconditions.checkState(
- transactionState.getTableIdList().size() ==
transactionState.getSubTransactionStates().size(),
- "txn_id=" + transactionId + ", expect table_list=" +
transactionState.getSubTransactionStates().stream()
- .map(s ->
s.getTable().getId()).collect(Collectors.toList()) + ", real table_list="
- + transactionState.getTableIdList());
+ Preconditions.checkState(transactionState.getTableIdList().size() ==
subTransactionStatesPtr.size(),
+ "txn_id=" + transactionId
+ + ", expect table_list="
+ + subTransactionStatesPtr.stream().map(s ->
s.getTable().getId()).collect(Collectors.toList())
+ + ", real table_list=" +
transactionState.getTableIdList());
}
public boolean isTransactionBegan() {
@@ -442,4 +493,10 @@ public class TransactionEntry {
LOG.info("set txn load info in observer, label={}, txnId={}, dbId={},
timeoutTimestamp={}",
label, transactionId, dbId, timeoutTimestamp);
}
+
+ private List<Long> getTableIds() {
+ List<SubTransactionState> subTransactionStatesPtr =
Config.isCloudMode() ? subTransactionStates
+ : transactionState.getSubTransactionStates();
+ return subTransactionStatesPtr.stream().map(s ->
s.getTable().getId()).collect(Collectors.toList());
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 3d1c2d54faa..f628c945b6c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -689,7 +689,7 @@ public class TransactionState implements Writable {
sb.append(", db id: ").append(dbId);
sb.append(", table id list: ").append(StringUtils.join(tableIdList,
","));
sb.append(", callback id: ").append(callbackId);
- sb.append(", coordinator: ").append(txnCoordinator.toString());
+ sb.append(", coordinator: ").append(txnCoordinator);
sb.append(", transaction status: ").append(transactionStatus);
sb.append(", error replicas num: ").append(errorReplicas.size());
sb.append(", replica ids:
").append(Joiner.on(",").join(errorReplicas.stream().limit(5).toArray()));
@@ -859,6 +859,10 @@ public class TransactionState implements Writable {
this.subTransactionStates = new ArrayList<>();
}
+ public void setSubTransactionStates(List<SubTransactionState>
subTransactionStates) {
+ this.subTransactionStates = subTransactionStates;
+ }
+
public void resetSubTxnIds() {
this.subTxnIds =
subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList());
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index bc57925e572..76ac8a3d306 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -378,6 +378,8 @@ message TxnInfoPB {
optional TxnStatusPB status = 15;
optional TxnCommitAttachmentPB commit_attachment = 16;
optional int64 listener_id = 17; //callback id
+ // for transaction load, used for recycler
+ repeated int64 sub_txn_ids = 18;
// TODO: There are more fields TBD
}
@@ -646,6 +648,15 @@ message CommitTxnRequest {
// merge-on-write table ids
repeated int64 mow_table_ids = 6;
repeated int64 base_tablet_ids= 7; // all tablet from base tables
(excluding mv)
+ // for transaction load
+ optional bool is_txn_load = 9;
+ repeated SubTxnInfo sub_txn_infos = 10;
+}
+
+message SubTxnInfo {
+ optional int64 sub_txn_id = 1;
+ optional int64 table_id = 2;
+ repeated int64 base_tablet_ids= 3;
}
// corresponding to TabletStats in meta_service.h and FrontendServiceImpl.java
@@ -702,6 +713,41 @@ message GetTxnIdResponse {
optional int64 txn_id = 2;
}
+message BeginSubTxnRequest {
+ optional string cloud_unique_id = 1; // For auth
+ optional int64 txn_id = 2;
+ // all successful or failed sub txn except the first one
+ optional int64 sub_txn_num = 3;
+ optional int64 db_id = 4;
+ // set table_ids in txn_info
+ repeated int64 table_ids = 5;
+ // a random label used to generate a sub_txn_id
+ optional string label = 6;
+}
+
+message BeginSubTxnResponse {
+ optional MetaServiceResponseStatus status = 1;
+ optional int64 sub_txn_id = 2;
+ optional TxnInfoPB txn_info = 3;
+}
+
+message AbortSubTxnRequest {
+ optional string cloud_unique_id = 1; // For auth
+ optional int64 txn_id = 2;
+ // used for log
+ optional int64 sub_txn_id = 3;
+ // all successful or failed sub txn except the first one
+ optional int64 sub_txn_num = 4;
+ optional int64 db_id = 5;
+ // set table_ids in txn_info
+ repeated int64 table_ids = 6;
+}
+
+message AbortSubTxnResponse {
+ optional MetaServiceResponseStatus status = 1;
+ optional TxnInfoPB txn_info = 2;
+}
+
message GetCurrentMaxTxnRequest {
optional string cloud_unique_id = 1; // For auth
}
@@ -1365,6 +1411,8 @@ service MetaService {
rpc check_txn_conflict(CheckTxnConflictRequest) returns
(CheckTxnConflictResponse);
rpc clean_txn_label(CleanTxnLabelRequest) returns (CleanTxnLabelResponse);
rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse);
+ rpc begin_sub_txn(BeginSubTxnRequest) returns (BeginSubTxnResponse);
+ rpc abort_sub_txn(AbortSubTxnRequest) returns (AbortSubTxnResponse);
rpc get_version(GetVersionRequest) returns (GetVersionResponse);
rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse);
diff --git a/regression-test/data/insert_p0/txn_insert.out
b/regression-test/data/insert_p0/txn_insert.out
index 4cd97b8cb0c..257bdcc0311 100644
--- a/regression-test/data/insert_p0/txn_insert.out
+++ b/regression-test/data/insert_p0/txn_insert.out
@@ -39,52 +39,6 @@
6
8
--- !select7 --
-
--- !select8 --
-
--- !select9 --
-
--- !select1 --
-\N \N \N [null] [null, 0]
-1 2.2 abc [] []
-2 3.3 xyz [1] [1, 0]
-
--- !select2 --
-\N \N \N [null] [null, 0]
-1 2.2 abc [] []
-2 3.3 xyz [1] [1, 0]
-
--- !select3 --
-\N \N \N [null] [null, 0]
-1 2.2 abc [] []
-1 2.2 abc [] []
-1 2.2 abc [] []
-2 3.3 xyz [1] [1, 0]
-2 3.3 xyz [1] [1, 0]
-2 3.3 xyz [1] [1, 0]
-
--- !select4 --
-\N \N \N [null] [null, 0]
-1 2.2 abc [] []
-1 2.2 abc [] []
-1 2.2 abc [] []
-2 3.3 xyz [1] [1, 0]
-2 3.3 xyz [1] [1, 0]
-2 3.3 xyz [1] [1, 0]
-
--- !select5 --
-1 2
-3 4
-5 6
-7 8
-
--- !select6 --
-2
-4
-6
-8
-
-- !select7 --
\N \N \N [null] [null, 0]
1 2.2 abc [] []
@@ -521,23 +475,6 @@
2 3.3 xyz [1] [1, 0]
2 3.3 xyz [1] [1, 0]
--- !select28 --
-1 a 10
-2 b 20
-3 c 30
-
--- !select29 --
-1 a 10
-2 b 20
-3 c 30
-4 d 40
-
--- !select30 --
-1 a 11
-2 b 20
-3 c 30
-4 d 40
-
-- !select31 --
1 a 10
10 a 10
@@ -958,31 +895,6 @@
9 a 10
9 a 10
--- !select38 --
-1 a 101
-
--- !select39 --
-1 a 100
-
--- !select40 --
-1 2000-01-01 1 1 1.0
-3 2000-01-03 3 3 3.0
-
--- !select41 --
-2 2000-01-20 20 20 20.0
-3 2000-01-30 30 30 30.0
-4 2000-01-04 4 4 4.0
-6 2000-01-10 10 10 10.0
-
--- !select42 --
-3 2000-01-03 3 3 3.0
-
--- !select43 --
-1 2000-01-01 1 1 1.0
-2 2000-01-02 2 2 2.0
-3 2000-01-03 3 3 3.0
-6 2000-01-10 10 10 10.0
-
-- !select44 --
\N \N \N [null] [null, 0]
\N \N \N [null] [null, 0]
@@ -1244,6 +1156,49 @@
2 3.3 xyz [1] [1, 0]
2 3.3 xyz [1] [1, 0]
+-- !selectmowi0 --
+1 a 10
+2 b 20
+3 c 30
+
+-- !selectmowi1 --
+1 a 10
+2 b 20
+3 c 30
+4 d 40
+
+-- !selectmowi2 --
+1 a 11
+2 b 21
+3 c 30
+4 d 40
+5 e 50
+
+-- !selectmowu1 --
+1 a 101
+
+-- !selectmowu2 --
+1 a 100
+
+-- !selectmowd1 --
+1 2000-01-01 1 1 1.0
+3 2000-01-03 3 3 3.0
+
+-- !selectmowd2 --
+2 2000-01-20 20 20 20.0
+3 2000-01-30 30 30 30.0
+4 2000-01-04 4 4 4.0
+6 2000-01-10 10 10 10.0
+
+-- !selectmowd3 --
+3 2000-01-03 3 3 3.0
+
+-- !selectmowd4 --
+1 2000-01-01 1 1 1.0
+2 2000-01-02 2 2 2.0
+3 2000-01-03 3 3 3.0
+6 2000-01-10 10 10 10.0
+
-- !select_cu0 --
1 0 10
2 0 20
diff --git a/regression-test/data/insert_p0/txn_insert_inject_case.out
b/regression-test/data/insert_p0/txn_insert_inject_case.out
new file mode 100644
index 00000000000..799229be54a
--- /dev/null
+++ b/regression-test/data/insert_p0/txn_insert_inject_case.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select1 --
+\N \N \N [null] [null, 0]
+\N \N \N [null] [null, 0]
+1 2.2 abc [] []
+1 2.2 abc [] []
+2 3.3 xyz [1] [1, 0]
+2 3.3 xyz [1] [1, 0]
+
diff --git a/regression-test/suites/insert_p0/txn_insert.groovy
b/regression-test/suites/insert_p0/txn_insert.groovy
index d9ccd6831cf..1d4183c8662 100644
--- a/regression-test/suites/insert_p0/txn_insert.groovy
+++ b/regression-test/suites/insert_p0/txn_insert.groovy
@@ -42,7 +42,7 @@ suite("txn_insert") {
return null
}
- for (def use_nereids_planner : [false, true]) {
+ for (def use_nereids_planner : [/*false,*/ true]) {
sql " SET enable_nereids_planner = $use_nereids_planner; "
sql """ DROP TABLE IF EXISTS $table """
@@ -242,7 +242,26 @@ suite("txn_insert") {
order_qt_select24 """select * from ${table}_2"""
}
- // 7. insert into select to same table
+ // 7. insert into tables in different database
+ if (use_nereids_planner) {
+ def db2 = "regression_test_insert_p0_1"
+ sql """ create database if not exists $db2 """
+
+ try {
+ sql """ create table ${db2}.${table} like ${table} """
+ sql """ begin; """
+ sql """ insert into ${table} select * from ${table}_0; """
+ test {
+ sql """ insert into $db2.${table} select * from
${table}_0; """
+ exception """Transaction insert must be in the same
database, expect db_id"""
+ }
+ } finally {
+ sql """rollback"""
+ sql """ drop database if exists $db2 """
+ }
+ }
+
+ // 8. insert into select to same table
if (use_nereids_planner) {
sql """ begin; """
sql """ insert into ${table}_0 select * from ${table}_1; """
@@ -273,61 +292,9 @@ suite("txn_insert") {
}
}
- // 8. insert into tables in different database
- if (use_nereids_planner) {
- def db2 = "regression_test_insert_p0_1"
- sql """ create database if not exists $db2 """
-
- try {
- sql """ create table ${db2}.${table} like ${table} """
- sql """ begin; """
- sql """ insert into ${table} select * from ${table}_0; """
- test {
- sql """ insert into $db2.${table} select * from
${table}_0; """
- exception """Transaction insert must be in the same
database, expect db_id"""
- }
- } finally {
- sql """rollback"""
- sql """ drop database if exists $db2 """
- }
- }
-
- // 9. insert into mow tables
- if (use_nereids_planner) {
- def unique_table = "ut"
- for (def i in 0..2) {
- sql """ drop table if exists ${unique_table}_${i} """
- sql """
- CREATE TABLE ${unique_table}_${i} (
- `id` int(11) NOT NULL,
- `name` varchar(50) NULL,
- `score` int(11) NULL default "-1"
- ) ENGINE=OLAP
- UNIQUE KEY(`id`, `name`)
- DISTRIBUTED BY HASH(`id`) BUCKETS 1
- PROPERTIES (
- """ + (i == 2 ? "\"function_column.sequence_col\"='score', " :
"") +
- """
- "replication_num" = "1"
- );
- """
- }
- sql """ insert into ${unique_table}_0 values(1, "a", 10), (2, "b",
20), (3, "c", 30); """
- sql """ insert into ${unique_table}_1 values(1, "a", 11), (2, "b",
19), (4, "d", 40); """
- sql """ begin """
- sql """ insert into ${unique_table}_2 select * from
${unique_table}_0; """
- sql """ insert into ${unique_table}_1 select * from
${unique_table}_0; """
- sql """ insert into ${unique_table}_2 select * from
${unique_table}_1; """
- sql """ commit; """
- sql "sync"
- order_qt_select28 """select * from ${unique_table}_0"""
- order_qt_select29 """select * from ${unique_table}_1"""
- order_qt_select30 """select * from ${unique_table}_2"""
- }
-
- // 10. insert into table with multi partitions and tablets
+ // 9. insert into table with multi partitions and tablets
if (use_nereids_planner) {
- def pt = "multi_partition_t"
+ def pt = "txn_insert_multi_partition_t"
for (def i in 0..3) {
sql """ drop table if exists ${pt}_${i} """
sql """
@@ -380,142 +347,7 @@ suite("txn_insert") {
sql """ set enable_insert_strict = true """
}
- // 11. update stmt
- if (use_nereids_planner) {
- def ut_table = "txn_insert_ut"
- for (def i in 1..2) {
- def tableName = ut_table + "_" + i
- sql """ DROP TABLE IF EXISTS ${tableName} """
- sql """
- CREATE TABLE ${tableName} (
- `ID` int(11) NOT NULL,
- `NAME` varchar(100) NULL,
- `score` int(11) NULL
- ) ENGINE=OLAP
- unique KEY(`id`)
- COMMENT 'OLAP'
- DISTRIBUTED BY HASH(`id`) BUCKETS 1
- PROPERTIES (
- "replication_num" = "1"
- );
- """
- }
- sql """ insert into ${ut_table}_1 values(1, "a", 100); """
- sql """ begin; """
- sql """ insert into ${ut_table}_2 select * from ${ut_table}_1; """
- sql """ update ${ut_table}_1 set score = 101 where id = 1; """
- sql """ commit; """
- sql "sync"
- order_qt_select38 """select * from ${ut_table}_1 """
- order_qt_select39 """select * from ${ut_table}_2 """
- }
-
- // 12. delete from using and delete from stmt
- if (use_nereids_planner) {
- for (def ta in ["txn_insert_dt1", "txn_insert_dt2",
"txn_insert_dt3", "txn_insert_dt4", "txn_insert_dt5"]) {
- sql """ drop table if exists ${ta} """
- }
-
- for (def ta in ["txn_insert_dt1", "txn_insert_dt4",
"txn_insert_dt5"]) {
- sql """
- create table ${ta} (
- id int,
- dt date,
- c1 bigint,
- c2 string,
- c3 double
- ) unique key (id, dt)
- partition by range(dt) (
- from ("2000-01-01") TO ("2000-01-31") INTERVAL 1 DAY
- )
- distributed by hash(id)
- properties(
- 'replication_num'='1',
- "enable_unique_key_merge_on_write" = "true"
- );
- """
- sql """
- INSERT INTO ${ta} VALUES
- (1, '2000-01-01', 1, '1', 1.0),
- (2, '2000-01-02', 2, '2', 2.0),
- (3, '2000-01-03', 3, '3', 3.0);
- """
- }
-
- sql """
- create table txn_insert_dt2 (
- id int,
- dt date,
- c1 bigint,
- c2 string,
- c3 double
- ) unique key (id)
- distributed by hash(id)
- properties(
- 'replication_num'='1'
- );
- """
- sql """
- create table txn_insert_dt3 (
- id int
- ) distributed by hash(id)
- properties(
- 'replication_num'='1'
- );
- """
- sql """
- INSERT INTO txn_insert_dt2 VALUES
- (1, '2000-01-10', 10, '10', 10.0),
- (2, '2000-01-20', 20, '20', 20.0),
- (3, '2000-01-30', 30, '30', 30.0),
- (4, '2000-01-04', 4, '4', 4.0),
- (5, '2000-01-05', 5, '5', 5.0);
- """
- sql """
- INSERT INTO txn_insert_dt3 VALUES(1),(2),(4),(5);
- """
- sql """ begin """
- test {
- sql '''
- delete from txn_insert_dt1 temporary partition (p_20000102)
- using txn_insert_dt2 join txn_insert_dt3 on
txn_insert_dt2.id = txn_insert_dt3.id
- where txn_insert_dt1.id = txn_insert_dt2.id;
- '''
- exception 'Partition: p_20000102 is not exists'
- }
- sql """
- delete from txn_insert_dt1 partition (p_20000102)
- using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id
= txn_insert_dt3.id
- where txn_insert_dt1.id = txn_insert_dt2.id;
- """
- sql """
- delete from txn_insert_dt4
- using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id
= txn_insert_dt3.id
- where txn_insert_dt4.id = txn_insert_dt2.id;
- """
- sql """
- delete from txn_insert_dt2 where id = 1;
- """
- sql """
- delete from txn_insert_dt2 where id = 5;
- """
- sql """
- delete from txn_insert_dt5 partition(p_20000102) where id = 1;
- """
- sql """
- delete from txn_insert_dt5 partition(p_20000102) where id = 5;
- """
- sql """ commit """
- sql """ insert into txn_insert_dt2 VALUES (6, '2000-01-10', 10,
'10', 10.0) """
- sql """ insert into txn_insert_dt5 VALUES (6, '2000-01-10', 10,
'10', 10.0) """
- sql "sync"
- order_qt_select40 """select * from txn_insert_dt1 """
- order_qt_select41 """select * from txn_insert_dt2 """
- order_qt_select42 """select * from txn_insert_dt4 """
- order_qt_select43 """select * from txn_insert_dt5 """
- }
-
- // 13. decrease be 'pending_data_expire_time_sec' config
+ // 10. decrease be 'pending_data_expire_time_sec' config
if (use_nereids_planner) {
def backendId_to_params =
get_be_param("pending_data_expire_time_sec")
try {
@@ -534,7 +366,7 @@ suite("txn_insert") {
}
}
- // 14. delete and insert
+ // 11. delete and insert
if (use_nereids_planner) {
sql """ begin; """
sql """ delete from ${table}_0 where k1 = 1 or k1 = 2; """
@@ -544,7 +376,7 @@ suite("txn_insert") {
order_qt_select45 """select * from ${table}_0"""
}
- // 15. insert and delete
+ // 12. insert and delete
if (use_nereids_planner) {
order_qt_select46 """select * from ${table}_1"""
sql """ begin; """
@@ -559,7 +391,7 @@ suite("txn_insert") {
order_qt_select48 """select * from ${table}_1"""
}
- // 16. txn insert does not commit or rollback by user, and txn is
aborted because connection is closed
+ // 13. txn insert does not commit or rollback by user, and txn is
aborted because connection is closed
def dbName = "regression_test_insert_p0"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
@@ -584,22 +416,24 @@ suite("txn_insert") {
thread.start()
thread.join()
assertNotEquals(txn_id, 0)
- def txn_state = ""
- for (int i = 0; i < 20; i++) {
- def txn_info = sql_return_maparray """ show transaction where
id = ${txn_id} """
- logger.info("txn_info: ${txn_info}")
- assertEquals(1, txn_info.size())
- txn_state = txn_info[0].get("TransactionStatus")
- if ("ABORTED" == txn_state) {
- break
- } else {
- sleep(2000)
+ if (!isCloudMode()) {
+ def txn_state = ""
+ for (int i = 0; i < 20; i++) {
+ def txn_info = sql_return_maparray """ show transaction
where id = ${txn_id} """
+ logger.info("txn_info: ${txn_info}")
+ assertEquals(1, txn_info.size())
+ txn_state = txn_info[0].get("TransactionStatus")
+ if ("ABORTED" == txn_state) {
+ break
+ } else {
+ sleep(2000)
+ }
}
+ assertEquals("ABORTED", txn_state)
}
- assertEquals("ABORTED", txn_state)
}
- // 17. txn insert does not commit or rollback by user, and txn is
aborted because timeout
+ // 14. txn insert does not commit or rollback by user, and txn is
aborted because timeout
// TODO find a way to check be txn_manager is also cleaned
if (use_nereids_planner) {
// 1. use show transaction command to check
@@ -620,19 +454,21 @@ suite("txn_insert") {
thread.start()
insertLatch.await(1, TimeUnit.MINUTES)
assertNotEquals(txn_id, 0)
- def txn_state = ""
- for (int i = 0; i < 20; i++) {
- def txn_info = sql_return_maparray """ show transaction where
id = ${txn_id} """
- logger.info("txn_info: ${txn_info}")
- assertEquals(1, txn_info.size())
- txn_state = txn_info[0].get("TransactionStatus")
- if ("ABORTED" == txn_state) {
- break
- } else {
- sleep(2000)
+ if (!isCloudMode()) {
+ def txn_state = ""
+ for (int i = 0; i < 20; i++) {
+ def txn_info = sql_return_maparray """ show transaction
where id = ${txn_id} """
+ logger.info("txn_info: ${txn_info}")
+ assertEquals(1, txn_info.size())
+ txn_state = txn_info[0].get("TransactionStatus")
+ if ("ABORTED" == txn_state) {
+ break
+ } else {
+ sleep(2000)
+ }
}
+ assertEquals("ABORTED", txn_state)
}
- assertEquals("ABORTED", txn_state)
// after the txn is timeout: do insert/ commit/ rollback
def insert_timeout = sql """show variables where variable_name =
'insert_timeout';"""
@@ -686,7 +522,191 @@ suite("txn_insert") {
}
}
- // 18. column update
+ // 15. insert into mow tables
+ if (use_nereids_planner) {
+ def unique_table = "txn_insert_ut"
+ for (def i in 0..2) {
+ sql """ drop table if exists ${unique_table}_${i} """
+ sql """
+ CREATE TABLE ${unique_table}_${i} (
+ `id` int(11) NOT NULL,
+ `name` varchar(50) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ """ + (i == 2 ? "\"function_column.sequence_col\"='score', " :
"") +
+ """
+ "replication_num" = "1"
+ );
+ """
+ }
+ sql """ insert into ${unique_table}_0 values(1, "a", 10), (2, "b",
20), (3, "c", 30); """
+ sql """ insert into ${unique_table}_1 values(1, "a", 11), (2, "b",
19), (4, "d", 40); """
+ sql """ insert into ${unique_table}_2 values(1, "a", 9), (2, "b",
21), (4, "d", 39), (5, "e", 50); """
+ sql """ begin """
+ try {
+ sql """ insert into ${unique_table}_2 select * from
${unique_table}_0; """
+ sql """ insert into ${unique_table}_1 select * from
${unique_table}_0; """
+ sql """ insert into ${unique_table}_2 select * from
${unique_table}_1; """
+ sql """ commit; """
+ sql "sync"
+ order_qt_selectmowi0 """select * from ${unique_table}_0"""
+ order_qt_selectmowi1 """select * from ${unique_table}_1"""
+ order_qt_selectmowi2 """select * from ${unique_table}_2"""
+ } catch (Exception e) {
+ logger.info("exception: " + e)
+ if (isCloudMode()) {
+ assertTrue(e.getMessage().contains("Transaction load is
not supported for merge on write unique keys table in cloud mode"))
+ sql """ rollback """
+ } else {
+ assertTrue(false, "should not reach here")
+ }
+ }
+ }
+
+ // the following cases are not supported in cloud mode
+ if (isCloudMode()) {
+ break
+ }
+
+ // 16. update stmt(mow table)
+ if (use_nereids_planner) {
+ def ut_table = "txn_insert_ut"
+ for (def i in 1..2) {
+ def tableName = ut_table + "_" + i
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ `ID` int(11) NOT NULL,
+ `NAME` varchar(100) NULL,
+ `score` int(11) NULL
+ ) ENGINE=OLAP
+ unique KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ }
+ sql """ insert into ${ut_table}_1 values(1, "a", 100); """
+ sql """ begin; """
+ sql """ insert into ${ut_table}_2 select * from ${ut_table}_1; """
+ sql """ update ${ut_table}_1 set score = 101 where id = 1; """
+ sql """ commit; """
+ sql "sync"
+ order_qt_selectmowu1 """select * from ${ut_table}_1 """
+ order_qt_selectmowu2 """select * from ${ut_table}_2 """
+ }
+
+ // 17. delete from using and delete from stmt(mow table)
+ if (use_nereids_planner) {
+ for (def ta in ["txn_insert_dt1", "txn_insert_dt2",
"txn_insert_dt3", "txn_insert_dt4", "txn_insert_dt5"]) {
+ sql """ drop table if exists ${ta} """
+ }
+
+ for (def ta in ["txn_insert_dt1", "txn_insert_dt4",
"txn_insert_dt5"]) {
+ sql """
+ create table ${ta} (
+ id int,
+ dt date,
+ c1 bigint,
+ c2 string,
+ c3 double
+ ) unique key (id, dt)
+ partition by range(dt) (
+ from ("2000-01-01") TO ("2000-01-31") INTERVAL 1 DAY
+ )
+ distributed by hash(id)
+ properties(
+ 'replication_num'='1',
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+ sql """
+ INSERT INTO ${ta} VALUES
+ (1, '2000-01-01', 1, '1', 1.0),
+ (2, '2000-01-02', 2, '2', 2.0),
+ (3, '2000-01-03', 3, '3', 3.0);
+ """
+ }
+
+ sql """
+ create table txn_insert_dt2 (
+ id int,
+ dt date,
+ c1 bigint,
+ c2 string,
+ c3 double
+ ) unique key (id)
+ distributed by hash(id)
+ properties(
+ 'replication_num'='1'
+ );
+ """
+ sql """
+ create table txn_insert_dt3 (
+ id int
+ ) distributed by hash(id)
+ properties(
+ 'replication_num'='1'
+ );
+ """
+ sql """
+ INSERT INTO txn_insert_dt2 VALUES
+ (1, '2000-01-10', 10, '10', 10.0),
+ (2, '2000-01-20', 20, '20', 20.0),
+ (3, '2000-01-30', 30, '30', 30.0),
+ (4, '2000-01-04', 4, '4', 4.0),
+ (5, '2000-01-05', 5, '5', 5.0);
+ """
+ sql """
+ INSERT INTO txn_insert_dt3 VALUES(1),(2),(4),(5);
+ """
+ sql """ begin """
+ test {
+ sql '''
+ delete from txn_insert_dt1 temporary partition (p_20000102)
+ using txn_insert_dt2 join txn_insert_dt3 on
txn_insert_dt2.id = txn_insert_dt3.id
+ where txn_insert_dt1.id = txn_insert_dt2.id;
+ '''
+ exception 'Partition: p_20000102 is not exists'
+ }
+ sql """
+ delete from txn_insert_dt1 partition (p_20000102)
+ using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id
= txn_insert_dt3.id
+ where txn_insert_dt1.id = txn_insert_dt2.id;
+ """
+ sql """
+ delete from txn_insert_dt4
+ using txn_insert_dt2 join txn_insert_dt3 on txn_insert_dt2.id
= txn_insert_dt3.id
+ where txn_insert_dt4.id = txn_insert_dt2.id;
+ """
+ sql """
+ delete from txn_insert_dt2 where id = 1;
+ """
+ sql """
+ delete from txn_insert_dt2 where id = 5;
+ """
+ sql """
+ delete from txn_insert_dt5 partition(p_20000102) where id = 1;
+ """
+ sql """
+ delete from txn_insert_dt5 partition(p_20000102) where id = 5;
+ """
+ sql """ commit """
+ sql """ insert into txn_insert_dt2 VALUES (6, '2000-01-10', 10,
'10', 10.0) """
+ sql """ insert into txn_insert_dt5 VALUES (6, '2000-01-10', 10,
'10', 10.0) """
+ sql "sync"
+ order_qt_selectmowd1 """select * from txn_insert_dt1 """
+ order_qt_selectmowd2 """select * from txn_insert_dt2 """
+ order_qt_selectmowd3 """select * from txn_insert_dt4 """
+ order_qt_selectmowd4 """select * from txn_insert_dt5 """
+ }
+
+ // 18. column update(mow table)
if (use_nereids_planner) {
def unique_table = "txn_insert_cu"
for (def i in 0..3) {
@@ -738,4 +758,14 @@ suite("txn_insert") {
order_qt_select_cu3 """select * from ${unique_table}_3"""
}
}
+
+ def db_name = "regression_test_insert_p0"
+ def tables = sql """ show tables from $db_name """
+ logger.info("tables: $tables")
+ for (def table_info : tables) {
+ def table_name = table_info[0]
+ if (table_name.startsWith("txn_insert_")) {
+ check_table_version_continuous(db_name, table_name)
+ }
+ }
}
diff --git
a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
b/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
index 60cffc4d0df..3e84a9f56e8 100644
--- a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
+++ b/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
@@ -83,14 +83,20 @@ suite("txn_insert_concurrent_insert") {
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
+ def sqls = [
+ "begin",
+ "insert into ${tableName}_0 select * from ${tableName}_1 where
L_ORDERKEY < 30000;",
+ "insert into ${tableName}_1 select * from ${tableName}_2 where
L_ORDERKEY > 500000;",
+ "insert into ${tableName}_0 select * from ${tableName}_2 where
L_ORDERKEY < 30000;",
+ "commit"
+ ]
def txn_insert = { ->
try (Connection conn = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword);
Statement stmt = conn.createStatement()) {
- stmt.execute("begin")
- stmt.execute("insert into ${tableName}_0 select * from
${tableName}_1 where L_ORDERKEY < 30000;")
- stmt.execute("insert into ${tableName}_1 select * from
${tableName}_2 where L_ORDERKEY > 500000;")
- stmt.execute("insert into ${tableName}_0 select * from
${tableName}_2 where L_ORDERKEY < 30000;")
- stmt.execute("commit")
+ for (def sql : sqls) {
+ logger.info(Thread.currentThread().getName() + " execute sql:
" + sql)
+ stmt.execute(sql)
+ }
logger.info("finish txn insert for " +
Thread.currentThread().getName())
} catch (Throwable e) {
logger.error("txn insert failed", e)
@@ -112,4 +118,14 @@ suite("txn_insert_concurrent_insert") {
result = sql """ select count() from ${tableName}_1 """
logger.info("result: ${result}")
assertEquals(2606192, result[0][0])
+
+ def db_name = "regression_test_insert_p0"
+ def tables = sql """ show tables from $db_name """
+ logger.info("tables: $tables")
+ for (def table_info : tables) {
+ def table_name = table_info[0]
+ if (table_name.startsWith(tableName)) {
+ check_table_version_continuous(db_name, table_name)
+ }
+ }
}
diff --git a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
index e22c38c70af..083f01b4a8a 100644
--- a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
+++ b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
@@ -19,10 +19,12 @@ import com.mysql.cj.jdbc.StatementImpl
import java.sql.Connection
import java.sql.DriverManager
import java.sql.Statement
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
suite("txn_insert_inject_case", "nonConcurrent") {
+ // test load fail
def table = "txn_insert_inject_case"
-
for (int j = 0; j < 3; j++) {
def tableName = table + "_" + j
sql """ DROP TABLE IF EXISTS $tableName """
@@ -37,9 +39,74 @@ suite("txn_insert_inject_case", "nonConcurrent") {
properties("replication_num" = "1");
"""
}
+ GetDebugPoint().disableDebugPointForAllBEs("FlushToken.submit_flush_error")
sql """insert into ${table}_1 values(1, 2.2, "abc", [], []), (2, 3.3,
"xyz", [1], [1, 0]), (null, null, null, [null], [null, 0]) """
sql """insert into ${table}_2 values(3, 2.2, "abc", [], []), (4, 3.3,
"xyz", [1], [1, 0]), (null, null, null, [null], [null, 0]) """
+ def ipList = [:]
+ def portList = [:]
+ (ipList, portList) = GetDebugPoint().getBEHostAndHTTPPort()
+ logger.info("be ips: ${ipList}, ports: ${portList}")
+
+ def enableDebugPoint = { ->
+ ipList.each { beid, ip ->
+ DebugPoint.enableDebugPoint(ip, portList[beid] as int,
NodeType.BE, "FlushToken.submit_flush_error")
+ }
+ }
+
+ def disableDebugPoint = { ->
+ ipList.each { beid, ip ->
+ DebugPoint.disableDebugPoint(ip, portList[beid] as int,
NodeType.BE, "FlushToken.submit_flush_error")
+ }
+ }
+
+ try {
+ enableDebugPoint()
+ sql """ begin """
+ try {
+ sql """ insert into ${table}_0 select * from ${table}_1; """
+ assertTrue(false, "insert should fail")
+ } catch (Exception e) {
+ logger.info("1" + e.getMessage())
+
assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error"))
+ }
+ try {
+ sql """ insert into ${table}_0 select * from ${table}_1; """
+ assertTrue(false, "insert should fail")
+ } catch (Exception e) {
+ logger.info("2" + e.getMessage())
+
assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error"))
+ }
+
+ disableDebugPoint()
+ sql """ insert into ${table}_0 select * from ${table}_1; """
+
+ enableDebugPoint()
+ try {
+ sql """ insert into ${table}_0 select * from ${table}_1; """
+ assertTrue(false, "insert should fail")
+ } catch (Exception e) {
+ logger.info("4" + e.getMessage())
+
assertTrue(e.getMessage().contains("dbug_be_memtable_submit_flush_error"))
+ }
+
+ disableDebugPoint()
+ sql """ insert into ${table}_0 select * from ${table}_1; """
+ sql """ commit"""
+ } catch (Exception e) {
+ logger.error("failed", e)
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("FlushToken.submit_flush_error")
+ }
+ sql "sync"
+ order_qt_select1 """select * from ${table}_0"""
+
+ if (isCloudMode()) {
+ return
+ }
+
+ sql """ truncate table ${table}_0 """
+
// 1. publish timeout
def backendId_to_params = get_be_param("pending_data_expire_time_sec")
try {
diff --git
a/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy
b/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy
index 388342bad53..7481a52e940 100644
--- a/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy
+++ b/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy
@@ -30,13 +30,33 @@ suite("txn_insert_with_schema_change") {
CountDownLatch insertLatch = new CountDownLatch(1)
CountDownLatch schemaChangeLatch = new CountDownLatch(1)
- def getAlterTableState = { job_state ->
+ for (int j = 0; j < 5; j++) {
+ def tableName = table + "_" + j
+ sql """ DROP TABLE IF EXISTS $tableName """
+ sql """
+ create table $tableName (
+ `ID` int(11) NOT NULL,
+ `NAME` varchar(100) NULL,
+ `score` int(11) NULL
+ ) ENGINE=OLAP
+ duplicate KEY(`id`)
+ distributed by hash(id) buckets 1
+ properties("replication_num" = "1");
+ """
+ }
+ sql """ insert into ${table}_0 values(0, '0', 0) """
+ sql """ insert into ${table}_1 values(0, '0', 0) """
+ sql """ insert into ${table}_2 values(0, '0', 0) """
+ sql """ insert into ${table}_3 values(1, '1', 1), (2, '2', 2) """
+ sql """ insert into ${table}_4 values(3, '3', 3), (4, '4', 4), (5, '5', 5)
"""
+
+ def getAlterTableState = { tName, job_state ->
def retry = 0
sql "use ${dbName};"
def last_state = ""
while (true) {
sleep(2000)
- def state = sql " show alter table column where tablename =
'${table}_0' order by CreateTime desc limit 1"
+ def state = sql """ show alter table column where tablename =
"${tName}" order by CreateTime desc limit 1"""
logger.info("alter table state: ${state}")
last_state = state[0][9]
if (state.size() > 0 && last_state == job_state) {
@@ -53,13 +73,17 @@ suite("txn_insert_with_schema_change") {
def txnInsert = { sqls ->
try (Connection conn = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword);
Statement statement = conn.createStatement()) {
+ logger.info("execute sql: begin")
statement.execute("begin")
+ logger.info("execute sql: ${sqls[0]}")
statement.execute(sqls[0])
schemaChangeLatch.countDown()
- insertLatch.await(2, TimeUnit.MINUTES)
+ insertLatch.await(5, TimeUnit.MINUTES)
+ logger.info("execute sql: ${sqls[1]}")
statement.execute(sqls[1])
+ logger.info("execute sql: commit")
statement.execute("commit")
} catch (Throwable e) {
logger.error("txn insert failed", e)
@@ -67,13 +91,14 @@ suite("txn_insert_with_schema_change") {
}
}
- def schemaChange = { sql, job_state ->
+ def schemaChange = { sql, tName, job_state ->
try (Connection conn = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword);
Statement statement = conn.createStatement()) {
- schemaChangeLatch.await(2, TimeUnit.MINUTES)
+ schemaChangeLatch.await(5, TimeUnit.MINUTES)
+ logger.info("execute sql: ${sql}")
statement.execute(sql)
if (job_state != null) {
- getAlterTableState(job_state)
+ getAlterTableState(tName, job_state)
}
insertLatch.countDown()
} catch (Throwable e) {
@@ -83,32 +108,20 @@ suite("txn_insert_with_schema_change") {
}
def sqls = [
- ["insert into ${table}_0(id, name, score) select * from
${table}_1;",
- "insert into ${table}_0(id, name, score) select * from
${table}_2;"],
- ["delete from ${table}_0 where id = 0 or id = 3;",
- "insert into ${table}_0(id, name, score) select * from
${table}_2;"],
- ["insert into ${table}_0(id, name, score) select * from
${table}_2;",
- "delete from ${table}_0 where id = 0 or id = 3;"]
+ ["insert into ${table}_0(id, name, score) select * from
${table}_3;",
+ "insert into ${table}_0(id, name, score) select * from
${table}_4;"],
+ ["delete from ${table}_1 where id = 0 or id = 3;",
+ "insert into ${table}_1(id, name, score) select * from
${table}_4;"],
+ ["insert into ${table}_2(id, name, score) select * from
${table}_4;",
+ "delete from ${table}_2 where id = 0 or id = 3;"]
]
- for (def insert_sqls: sqls) {
- for (int j = 0; j < 3; j++) {
- def tableName = table + "_" + j
- sql """ DROP TABLE IF EXISTS $tableName force """
- sql """
- create table $tableName (
- `ID` int(11) NOT NULL,
- `NAME` varchar(100) NULL,
- `score` int(11) NULL
- ) ENGINE=OLAP
- duplicate KEY(`id`)
- distributed by hash(id) buckets 1
- properties("replication_num" = "1");
- """
+ for (int i = 0; i < sqls.size(); i++) {
+ def insert_sqls = sqls[i]
+ // TODO skip because it will cause ms core
+ if (isCloudMode() && insert_sqls[1].startsWith("delete")) {
+ continue
}
- sql """ insert into ${table}_0 values(0, '0', 0) """
- sql """ insert into ${table}_1 values(1, '1', 1), (2, '2', 2) """
- sql """ insert into ${table}_2 values(3, '3', 3), (4, '4', 4), (5,
'5', 5) """
logger.info("insert sqls: ${insert_sqls}")
// 1. do light weight schema change: add column
@@ -116,7 +129,7 @@ suite("txn_insert_with_schema_change") {
insertLatch = new CountDownLatch(1)
schemaChangeLatch = new CountDownLatch(1)
Thread insert_thread = new Thread(() -> txnInsert(insert_sqls))
- Thread schema_change_thread = new Thread(() -> schemaChange("alter
table ${table}_0 ADD column age int after name;", null))
+ Thread schema_change_thread = new Thread(() -> schemaChange("alter
table ${table}_${i} ADD column age int after name;", "${table}_${i}", null))
insert_thread.start()
schema_change_thread.start()
insert_thread.join()
@@ -124,9 +137,9 @@ suite("txn_insert_with_schema_change") {
logger.info("errors: " + errors)
assertEquals(0, errors.size())
- order_qt_select1 """select id, name, score from ${table}_0 """
- getAlterTableState("FINISHED")
- order_qt_select2 """select id, name, score from ${table}_0 """
+ order_qt_select1 """select id, name, score from ${table}_${i} """
+ getAlterTableState("${table}_${i}", "FINISHED")
+ order_qt_select2 """select id, name, score from ${table}_${i} """
}
// 2. do hard weight schema change: change order
@@ -134,7 +147,7 @@ suite("txn_insert_with_schema_change") {
insertLatch = new CountDownLatch(1)
schemaChangeLatch = new CountDownLatch(1)
Thread insert_thread = new Thread(() -> txnInsert(insert_sqls))
- Thread schema_change_thread = new Thread(() -> schemaChange("alter
table ${table}_0 order by (id, score, age, name);", "WAITING_TXN"))
+ Thread schema_change_thread = new Thread(() -> schemaChange("alter
table ${table}_${i} order by (id, score, age, name);", "${table}_${i}",
"WAITING_TXN"))
insert_thread.start()
schema_change_thread.start()
insert_thread.join()
@@ -142,9 +155,10 @@ suite("txn_insert_with_schema_change") {
logger.info("errors: " + errors)
assertEquals(0, errors.size())
- order_qt_select3 """select id, name, score from ${table}_0 """
- getAlterTableState("FINISHED")
- order_qt_select4 """select id, name, score from ${table}_0 """
+ order_qt_select3 """select id, name, score from ${table}_${i} """
+ getAlterTableState("${table}_${i}", "FINISHED")
+ order_qt_select4 """select id, name, score from ${table}_${i} """
}
+ check_table_version_continuous(dbName, table + "_" + i)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]