This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9d727d6c61f [fix](cloud) compaction and schema change potential data
race when retrying prepare rowset (#51048) (#51852)
9d727d6c61f is described below
commit 9d727d6c61f25ac35cc0a7e60295dc5f1a9e585a
Author: Luwei <[email protected]>
AuthorDate: Wed Jun 18 15:24:24 2025 +0800
[fix](cloud) compaction and schema change potential data race when retrying
prepare rowset (#51048) (#51852)
related PR #51129
pick master #51048
---
be/src/cloud/cloud_delete_task.cpp | 2 +-
be/src/cloud/cloud_delta_writer.cpp | 2 +-
be/src/cloud/cloud_meta_mgr.cpp | 6 +-
be/src/cloud/cloud_meta_mgr.h | 4 +-
be/src/cloud/cloud_rowset_builder.cpp | 2 +-
be/src/cloud/cloud_schema_change_job.cpp | 4 +-
be/src/olap/compaction.cpp | 5 +-
cloud/src/common/config.h | 2 +
cloud/src/meta-service/meta_service.cpp | 75 ++++++-
cloud/test/meta_service_test.cpp | 337 +++++++++++++++++++++++++++++++
gensrc/proto/cloud.proto | 2 +
11 files changed, 429 insertions(+), 12 deletions(-)
diff --git a/be/src/cloud/cloud_delete_task.cpp
b/be/src/cloud/cloud_delete_task.cpp
index 5698fb632cd..cf7a6a371bc 100644
--- a/be/src/cloud/cloud_delete_task.cpp
+++ b/be/src/cloud/cloud_delete_task.cpp
@@ -94,7 +94,7 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine,
const TPushReq& requ
RETURN_IF_ERROR(rowset_writer->build(rowset));
rowset->rowset_meta()->set_delete_predicate(std::move(del_pred));
- auto st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta());
+ auto st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "");
// Update tablet stats
tablet->fetch_add_approximate_num_rowsets(1);
diff --git a/be/src/cloud/cloud_delta_writer.cpp
b/be/src/cloud/cloud_delta_writer.cpp
index 7beaeb3e086..2533028314f 100644
--- a/be/src/cloud/cloud_delta_writer.cpp
+++ b/be/src/cloud/cloud_delta_writer.cpp
@@ -113,7 +113,7 @@ Status CloudDeltaWriter::commit_rowset() {
RETURN_IF_ERROR(_rowset_builder->init());
RETURN_IF_ERROR(_rowset_builder->build_rowset());
}
- return _engine.meta_mgr().commit_rowset(*rowset_meta());
+ return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
}
Status CloudDeltaWriter::set_txn_related_delete_bitmap() {
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 4054c3b20fd..2971ff33f8e 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -860,7 +860,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet*
tablet, int64_t old_
return Status::OK();
}
-Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta,
+Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const
std::string& job_id,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " <<
rs_meta.txn_id();
@@ -872,6 +872,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta&
rs_meta,
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_txn_id(rs_meta.txn_id());
+ req.set_tablet_job_id(job_id);
RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true);
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(),
std::move(doris_rs_meta));
@@ -889,7 +890,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta&
rs_meta,
return st;
}
-Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta,
+Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, const
std::string& job_id,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " <<
rs_meta.txn_id();
@@ -902,6 +903,7 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta&
rs_meta,
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_txn_id(rs_meta.txn_id());
+ req.set_tablet_job_id(job_id);
RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb();
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(),
std::move(rs_meta_pb));
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index cf86f09929d..0cc58e48166 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -70,10 +70,10 @@ public:
bool warmup_delta_data = false, bool sync_delete_bitmap = true,
bool full_sync = false,
SyncRowsetStats* sync_stats = nullptr);
- Status prepare_rowset(const RowsetMeta& rs_meta,
+ Status prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta =
nullptr);
- Status commit_rowset(const RowsetMeta& rs_meta,
+ Status commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta =
nullptr);
Status update_tmp_rowset(const RowsetMeta& rs_meta);
diff --git a/be/src/cloud/cloud_rowset_builder.cpp
b/be/src/cloud/cloud_rowset_builder.cpp
index 8715d90340c..b495ae953ca 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -80,7 +80,7 @@ Status CloudRowsetBuilder::init() {
_calc_delete_bitmap_token =
_engine.calc_delete_bitmap_executor()->create_token();
-
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta()));
+
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(),
""));
_is_init = true;
return Status::OK();
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index ce88be52649..05d29383af4 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -290,7 +290,7 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
RowsetMetaSharedPtr existed_rs_meta;
auto st =
_cloud_storage_engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(),
-
&existed_rs_meta);
+ _job_id,
&existed_rs_meta);
if (!st.ok()) {
if (st.is<ALREADY_EXIST>()) {
LOG(INFO) << "Rowset " << rs_reader->version() << " has
already existed in tablet "
@@ -325,7 +325,7 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
st.to_string());
}
- st =
_cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(),
+ st =
_cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(),
_job_id,
&existed_rs_meta);
if (!st.ok()) {
if (st.is<ALREADY_EXIST>()) {
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 9b173db26fb..1f21795e1c9 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1352,7 +1352,7 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t
permits) {
<< _output_rowset->rowset_meta()->rowset_id().to_string();
})
-
RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get()));
+
RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get(),
_uuid));
// 4. modify rowsets in memory
RETURN_IF_ERROR(modify_rowsets());
@@ -1433,7 +1433,8 @@ Status
CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
compaction_type() ==
ReaderType::READER_BASE_COMPACTION);
ctx.file_cache_ttl_sec = _tablet->ttl_seconds();
_output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx,
_is_vertical));
-
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get()));
+ RETURN_IF_ERROR(
+
_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get(),
_uuid));
return Status::OK();
}
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 3bb791f71e9..14047123b4c 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -222,6 +222,8 @@ CONF_mBool(enable_distinguish_hdfs_path, "true");
// If enabled, the txn status will be checked when preapre/commit rowset
CONF_mBool(enable_load_txn_status_check, "true");
+CONF_mBool(enable_tablet_job_check, "true");
+
// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
// this is a list in semicolon-delimited format, in CIDR notation,
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 3041d20f428..9ecb08363cd 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -38,6 +38,7 @@
#include <ios>
#include <limits>
#include <memory>
+#include <ostream>
#include <sstream>
#include <string>
#include <tuple>
@@ -991,6 +992,60 @@ static void fill_schema_from_dict(MetaServiceCode& code,
std::string& msg,
existed_rowset_meta->CopyFrom(metas.Get(0));
}
+bool check_job_existed(Transaction* txn, MetaServiceCode& code, std::string&
msg,
+ const std::string& instance_id, int64_t tablet_id,
+ const std::string& rowset_id, const std::string&
job_id) {
+ TabletIndexPB tablet_idx;
+ get_tablet_idx(code, msg, txn, instance_id, tablet_id, tablet_idx);
+ if (code != MetaServiceCode::OK) {
+ return false;
+ }
+
+ std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(),
tablet_idx.index_id(),
+ tablet_idx.partition_id(),
tablet_id});
+ std::string job_val;
+ auto err = txn->get(job_key, &job_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ std::stringstream ss;
+ ss << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," :
"internal error,")
+ << " instance_id=" << instance_id << " tablet_id=" << tablet_id
+ << " rowset_id=" << rowset_id << " err=" << err;
+ msg = ss.str();
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::STALE_PREPARE_ROWSET
+ :
cast_as<ErrCategory::READ>(err);
+ return false;
+ }
+
+ TabletJobInfoPB job_pb;
+ job_pb.ParseFromString(job_val);
+ bool match = false;
+ if (!job_pb.compaction().empty()) {
+ for (auto c : job_pb.compaction()) {
+ if (c.id() == job_id) {
+ match = true;
+ }
+ }
+ }
+
+ if (job_pb.has_schema_change()) {
+ if (job_pb.schema_change().id() == job_id) {
+ match = true;
+ }
+ }
+
+ if (!match) {
+ std::stringstream ss;
+ ss << " stale perpare rowset request,"
+ << " instance_id=" << instance_id << " tablet_id=" << tablet_id <<
" job id=" << job_id
+ << " rowset_id=" << rowset_id;
+ msg = ss.str();
+ code = MetaServiceCode::STALE_PREPARE_ROWSET;
+ return false;
+ }
+
+ return true;
+}
+
/**
* Check if the transaction status is as expected.
* If the transaction is not in the expected state, return false and set the
error code and message.
@@ -1100,6 +1155,15 @@ void
MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
return;
}
+ // Check if the compaction/sc tablet job has finished
+ if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
+ !request->tablet_job_id().empty()) {
+ if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id,
rowset_id,
+ request->tablet_job_id())) {
+ return;
+ }
+ }
+
// Check if the prepare rowset request is invalid.
// If the transaction has been finished, it means this prepare rowset is a
timeout retry request.
// In this case, do not write the recycle key again, otherwise it may
cause data loss.
@@ -1237,6 +1301,15 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
return;
}
+ // Check if the compaction/sc tablet job has finished
+ if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
+ !request->tablet_job_id().empty()) {
+ if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id,
rowset_id,
+ request->tablet_job_id())) {
+ return;
+ }
+ }
+
// Check if the commit rowset request is invalid.
// If the transaction has been finished, it means this commit rowset is a
timeout retry request.
// In this case, do not write the recycle key again, otherwise it may
cause data loss.
@@ -2882,4 +2955,4 @@ void
MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* control
response->mutable_schema_dict()->Swap(&schema_dict);
}
-} // namespace doris::cloud
\ No newline at end of file
+} // namespace doris::cloud
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 669d84025a9..b5e609be3a8 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -320,6 +320,32 @@ static void add_tablet_metas(MetaServiceProxy*
meta_service, std::string instanc
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
+static void start_compaction_job(MetaService* meta_service, int64_t tablet_id,
+ const std::string& job_id, const std::string&
initiator,
+ int base_compaction_cnt, int
cumu_compaction_cnt,
+ TabletCompactionJobPB::CompactionType type,
+ StartTabletJobResponse& res,
+ std::pair<int64_t, int64_t> input_version =
{0, 0}) {
+ brpc::Controller cntl;
+ StartTabletJobRequest req;
+ req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+ auto compaction = req.mutable_job()->add_compaction();
+ compaction->set_id(job_id);
+ compaction->set_initiator(initiator);
+ compaction->set_base_compaction_cnt(base_compaction_cnt);
+ compaction->set_cumulative_compaction_cnt(cumu_compaction_cnt);
+ compaction->set_type(type);
+ long now = time(nullptr);
+ compaction->set_expiration(now + 12);
+ compaction->set_lease(now + 3);
+ if (input_version.second > 0) {
+ compaction->add_input_versions(input_version.first);
+ compaction->add_input_versions(input_version.second);
+ compaction->set_check_input_versions_range(true);
+ }
+ meta_service->start_tablet_job(&cntl, &req, &res, nullptr);
+};
+
TEST(MetaServiceTest, GetInstanceIdTest) {
extern std::string get_instance_id(const std::shared_ptr<ResourceManager>&
rc_mgr,
const std::string& cloud_unique_id);
@@ -9042,6 +9068,317 @@ TEST(MetaServiceTest, AddObjInfoWithRole) {
SyncPoint::get_instance()->clear_all_call_backs();
}
+TEST(MetaServiceTest, CheckJobExisted) {
+ auto meta_service = get_meta_service();
+
+ std::string instance_id = "check_job_existed_instance_id";
+ auto sp = SyncPoint::get_instance();
+ std::unique_ptr<int, std::function<void(int*)>> defer(
+ (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ sp->set_call_back("get_instance_id", [&](auto&& args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ // OK
+ {
+ constexpr auto table_id = 952701, index_id = 952702, partition_id =
952703,
+ tablet_id = 952704;
+ int64_t txn_id = 952705;
+ std::string label = "update_rowset_meta_test_label1";
+ CreateRowsetResponse res;
+
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id));
+
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+ {
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, "compaction1",
"ip:port", 0, 0,
+ TabletCompactionJobPB::BASE, res);
+ }
+
+ brpc::Controller cntl;
+ auto arena = res.GetArena();
+ auto req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->set_tablet_job_id("compaction1");
+ req->mutable_rowset_meta()->CopyFrom(rowset);
+ meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
+ res.Clear();
+ }
+
+ // job does not exist,
+ {
+ constexpr auto table_id = 952801, index_id = 952802, partition_id =
952803,
+ tablet_id = 952804;
+ int64_t txn_id = 952805;
+ std::string label = "update_rowset_meta_test_label1";
+ CreateRowsetResponse res;
+
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id));
+
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+ brpc::Controller cntl;
+ auto arena = res.GetArena();
+ auto req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->set_tablet_job_id("compaction1");
+ req->mutable_rowset_meta()->CopyFrom(rowset);
+ meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+
+ ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET)
<< res.status().msg();
+ res.Clear();
+ }
+
+ // compaction job exists, job id not match
+ {
+ constexpr auto table_id = 952901, index_id = 952902, partition_id =
952903,
+ tablet_id = 952904;
+ int64_t txn_id = 952905;
+ std::string label = "update_rowset_meta_test_label1";
+ CreateRowsetResponse res;
+
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id));
+
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+ {
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, "compaction1",
"ip:port", 0, 0,
+ TabletCompactionJobPB::BASE, res);
+ }
+
+ brpc::Controller cntl;
+ auto arena = res.GetArena();
+ auto req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->set_tablet_job_id("compaction2");
+ req->mutable_rowset_meta()->CopyFrom(rowset);
+ meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+
+ ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET)
<< res.status().msg();
+ res.Clear();
+ }
+
+ // do not set job id
+ {
+ constexpr auto table_id = 953501, index_id = 953502, partition_id =
953503,
+ tablet_id = 953504;
+ int64_t txn_id = 953505;
+ std::string label = "update_rowset_meta_test_label1";
+ CreateRowsetResponse res;
+
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id));
+
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+ {
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, "compaction1",
"ip:port", 0, 0,
+ TabletCompactionJobPB::BASE, res);
+ }
+
+ brpc::Controller cntl;
+ auto arena = res.GetArena();
+ auto req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->mutable_rowset_meta()->CopyFrom(rowset);
+ meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
+ res.Clear();
+ }
+
+ // job id is empty string
+ {
+ constexpr auto table_id = 953601, index_id = 953602, partition_id =
953603,
+ tablet_id = 953604;
+ int64_t txn_id = 953605;
+ std::string label = "update_rowset_meta_test_label1";
+ CreateRowsetResponse res;
+
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id));
+
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+ {
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, "compaction1",
"ip:port", 0, 0,
+ TabletCompactionJobPB::BASE, res);
+ }
+
+ brpc::Controller cntl;
+ auto arena = res.GetArena();
+ auto req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->set_tablet_job_id("");
+ req->mutable_rowset_meta()->CopyFrom(rowset);
+ meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
+ res.Clear();
+ }
+
+ // commit rowset OK
+ {
+ constexpr auto table_id = 953001, index_id = 953002, partition_id =
953003,
+ tablet_id = 953004;
+ int64_t txn_id = 953005;
+ std::string label = "update_rowset_meta_test_label1";
+ CreateRowsetResponse res;
+
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id));
+
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+ {
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, "compaction1",
"ip:port", 0, 0,
+ TabletCompactionJobPB::BASE, res);
+ }
+
+ brpc::Controller cntl;
+ auto arena = res.GetArena();
+ auto req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->set_tablet_job_id("compaction1");
+ req->mutable_rowset_meta()->CopyFrom(rowset);
+ meta_service->commit_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
+ res.Clear();
+ }
+
+ // commit rowset, job does not exist,
+ {
+ constexpr auto table_id = 953101, index_id = 953102, partition_id =
953103,
+ tablet_id = 953104;
+ int64_t txn_id = 952805;
+ std::string label = "update_rowset_meta_test_label1";
+ CreateRowsetResponse res;
+
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id));
+
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+ brpc::Controller cntl;
+ auto arena = res.GetArena();
+ auto req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->set_tablet_job_id("compaction1");
+ req->mutable_rowset_meta()->CopyFrom(rowset);
+ meta_service->commit_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+
+ ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET)
<< res.status().msg();
+ res.Clear();
+ }
+
+ // commit rowset, compaction job exists, job id not match
+ {
+ constexpr auto table_id = 953201, index_id = 953202, partition_id =
953203,
+ tablet_id = 953204;
+ int64_t txn_id = 952905;
+ std::string label = "update_rowset_meta_test_label1";
+ CreateRowsetResponse res;
+
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id));
+
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+ {
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, "compaction1",
"ip:port", 0, 0,
+ TabletCompactionJobPB::BASE, res);
+ }
+
+ brpc::Controller cntl;
+ auto arena = res.GetArena();
+ auto req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->set_tablet_job_id("compaction2");
+ req->mutable_rowset_meta()->CopyFrom(rowset);
+ meta_service->commit_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+
+ ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET)
<< res.status().msg();
+ res.Clear();
+ }
+
+ // do not set job id when commit rowset
+ {
+ constexpr auto table_id = 953301, index_id = 953302, partition_id =
953303,
+ tablet_id = 953304;
+ int64_t txn_id = 953305;
+ std::string label = "update_rowset_meta_test_label1";
+ CreateRowsetResponse res;
+
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id));
+
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+ {
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, "compaction1",
"ip:port", 0, 0,
+ TabletCompactionJobPB::BASE, res);
+ }
+
+ brpc::Controller cntl;
+ auto arena = res.GetArena();
+ auto req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->mutable_rowset_meta()->CopyFrom(rowset);
+ meta_service->commit_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
+ res.Clear();
+ }
+
+ // job id is empty string when commit rowset
+ {
+ constexpr auto table_id = 953401, index_id = 953402, partition_id =
953403,
+ tablet_id = 953404;
+ int64_t txn_id = 953405;
+ std::string label = "update_rowset_meta_test_label1";
+ CreateRowsetResponse res;
+
+ ASSERT_NO_FATAL_FAILURE(
+ create_tablet(meta_service.get(), table_id, index_id,
partition_id, tablet_id));
+
+ auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+ {
+ StartTabletJobResponse res;
+ start_compaction_job(meta_service.get(), tablet_id, "compaction1",
"ip:port", 0, 0,
+ TabletCompactionJobPB::BASE, res);
+ }
+
+ brpc::Controller cntl;
+ auto arena = res.GetArena();
+ auto req =
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+ req->set_tablet_job_id("");
+ req->mutable_rowset_meta()->CopyFrom(rowset);
+ meta_service->commit_rowset(&cntl, req, &res, nullptr);
+ if (!arena) delete req;
+
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
+ res.Clear();
+ }
+}
+
TEST(MetaServiceTest, StalePrepareRowset) {
auto meta_service = get_meta_service();
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 87c3ef5e542..7b50747856c 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -976,6 +976,7 @@ message CreateRowsetRequest {
optional doris.RowsetMetaCloudPB rowset_meta = 2;
optional bool temporary = 3;
optional int64 txn_id = 4;
+ optional string tablet_job_id = 5;
}
message CreateRowsetResponse {
@@ -1376,6 +1377,7 @@ enum MetaServiceCode {
VERSION_NOT_FOUND = 2010;
TABLET_NOT_FOUND = 2011;
STALE_TABLET_CACHE = 2012;
+ STALE_PREPARE_ROWSET = 2013;
CLUSTER_NOT_FOUND = 3001;
ALREADY_EXISTED = 3002;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]