This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 06cbb96e79d [feature](merge-cloud) cloud meta mgr impl
prepare/commit/sync rowset (#30128)
06cbb96e79d is described below
commit 06cbb96e79d2c782b3533f0f236da7db9ab3455f
Author: walter <[email protected]>
AuthorDate: Tue Jan 23 18:56:54 2024 +0800
[feature](merge-cloud) cloud meta mgr impl prepare/commit/sync rowset
(#30128)
---
be/src/cloud/cloud_meta_mgr.cpp | 345 ++++++++++++++++++++-
be/src/cloud/cloud_meta_mgr.h | 2 +-
be/src/cloud/cloud_tablet.cpp | 54 +---
be/src/cloud/cloud_tablet.h | 22 +-
be/src/common/status.h | 3 +-
be/src/olap/base_tablet.cpp | 139 ++++++++-
be/src/olap/base_tablet.h | 26 +-
be/src/olap/compaction.cpp | 12 +-
be/src/olap/rowset/rowset_meta.cpp | 18 +-
be/src/olap/rowset/rowset_meta.h | 8 +-
be/src/olap/rowset_builder.cpp | 2 +-
be/src/olap/schema_change.cpp | 2 +-
be/src/olap/single_replica_compaction.cpp | 9 +-
be/src/olap/snapshot_manager.cpp | 2 +-
be/src/olap/tablet.cpp | 138 +--------
be/src/olap/tablet.h | 23 +-
be/src/olap/tablet_manager.cpp | 4 +-
be/src/olap/tablet_meta.cpp | 4 +
be/src/olap/tablet_meta.h | 6 +
be/src/olap/task/engine_clone_task.cpp | 7 +-
be/src/olap/task/engine_storage_migration_task.cpp | 2 +-
be/test/olap/delta_writer_test.cpp | 18 +-
.../olap/engine_storage_migration_task_test.cpp | 4 +-
be/test/olap/remote_rowset_gc_test.cpp | 4 +-
be/test/olap/tablet_cooldown_test.cpp | 4 +-
25 files changed, 588 insertions(+), 270 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index d6eb54e5c41..eb2802b99b0 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -38,6 +38,7 @@
#include "gen_cpp/cloud.pb.h"
#include "gen_cpp/olap_file.pb.h"
#include "olap/olap_common.h"
+#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/tablet_meta.h"
#include "runtime/stream_load/stream_load_context.h"
@@ -137,7 +138,7 @@ private:
auto channel = std::make_unique<brpc::Channel>();
Status s = init_channel(channel.get());
- if (UNLIKELY(!s.ok())) {
+ if (!s.ok()) [[unlikely]] {
return s;
}
@@ -189,6 +190,8 @@ static std::string debug_info(const Request& req) {
return fmt::format(" tablet_id={}", req.tablet_id());
} else if constexpr (is_any_v<Request, GetObjStoreInfoRequest>) {
return "";
+ } else if constexpr (is_any_v<Request, CreateRowsetRequest>) {
+ return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id());
} else {
static_assert(!sizeof(Request));
}
@@ -224,7 +227,7 @@ static Status retry_rpc(std::string_view op_name, const
Request& req, Response*
cntl.set_max_retry(BRPC_RETRY_TIMES);
res->Clear();
(stub.get()->*method)(&cntl, &req, res, nullptr);
- if (UNLIKELY(cntl.Failed())) {
+ if (cntl.Failed()) [[unlikely]] {
error_msg = cntl.ErrorText();
} else if (res->status().code() == MetaServiceCode::OK) {
return Status::OK();
@@ -271,28 +274,339 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id,
TabletMetaSharedPtr* tab
}
Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool
warmup_delta_data) {
- return Status::NotSupported("CloudMetaMgr::sync_tablet_rowsets is not
implemented");
+ using namespace std::chrono;
+
+ TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets",
Status::OK(), tablet);
+
+ std::shared_ptr<MetaService_Stub> stub;
+ RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
+
+ int tried = 0;
+ while (true) {
+ brpc::Controller cntl;
+ cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
+ GetRowsetRequest req;
+ GetRowsetResponse resp;
+
+ int64_t tablet_id = tablet->tablet_id();
+ int64_t table_id = tablet->table_id();
+ int64_t index_id = tablet->index_id();
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ auto* idx = req.mutable_idx();
+ idx->set_tablet_id(tablet_id);
+ idx->set_table_id(table_id);
+ idx->set_index_id(index_id);
+ idx->set_partition_id(tablet->partition_id());
+ {
+ std::shared_lock rlock(tablet->get_header_lock());
+ req.set_start_version(tablet->local_max_version() + 1);
+ req.set_base_compaction_cnt(tablet->base_compaction_cnt());
+
req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt());
+ req.set_cumulative_point(tablet->cumulative_layer_point());
+ }
+ req.set_end_version(-1);
+ VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString();
+
+ stub->get_rowset(&cntl, &req, &resp, nullptr);
+ int64_t latency = cntl.latency_us();
+ g_get_rowset_latency << latency;
+ int retry_times = config::meta_service_rpc_retry_times;
+ if (cntl.Failed()) {
+ if (tried++ < retry_times) {
+ auto rng = make_random_engine();
+ std::uniform_int_distribution<uint32_t> u(20, 200);
+ std::uniform_int_distribution<uint32_t> u1(500, 1000);
+ uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng);
+ std::this_thread::sleep_for(milliseconds(duration_ms));
+ LOG_INFO("failed to get rowset meta")
+ .tag("reason", cntl.ErrorText())
+ .tag("tablet_id", tablet_id)
+ .tag("table_id", table_id)
+ .tag("index_id", index_id)
+ .tag("partition_id", tablet->partition_id())
+ .tag("tried", tried)
+ .tag("sleep", duration_ms);
+ continue;
+ }
+ return Status::RpcError("failed to get rowset meta: {}",
cntl.ErrorText());
+ }
+ if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
+ return Status::NotFound("failed to get rowset meta: {}",
resp.status().msg());
+ }
+ if (resp.status().code() != MetaServiceCode::OK) {
+ return Status::InternalError("failed to get rowset meta: {}",
resp.status().msg());
+ }
+ if (latency > 100 * 1000) { // 100ms
+ LOG(INFO) << "finish get_rowset rpc. rowset_meta.size()=" <<
resp.rowset_meta().size()
+ << ", latency=" << latency << "us";
+ } else {
+ LOG_EVERY_N(INFO, 100)
+ << "finish get_rowset rpc. rowset_meta.size()=" <<
resp.rowset_meta().size()
+ << ", latency=" << latency << "us";
+ }
+
+ int64_t now =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+ tablet->last_sync_time_s = now;
+
+ if (tablet->enable_unique_key_merge_on_write()) {
+ DeleteBitmap delete_bitmap(tablet_id);
+ int64_t old_max_version = req.start_version() - 1;
+ auto st = sync_tablet_delete_bitmap(tablet, old_max_version,
resp.rowset_meta(),
+ resp.stats(), req.idx(),
&delete_bitmap);
+ if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) {
+ LOG_WARNING("rowset meta is expired, need to retry")
+ .tag("tablet", tablet->tablet_id())
+ .tag("tried", tried)
+ .error(st);
+ continue;
+ }
+ if (!st.ok()) {
+ LOG_WARNING("failed to get delete bimtap")
+ .tag("tablet", tablet->tablet_id())
+ .error(st);
+ return st;
+ }
+ tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap);
+ }
+ {
+ const auto& stats = resp.stats();
+ std::unique_lock wlock(tablet->get_header_lock());
+
+ // ATTN: we are facing following data race
+ //
+ //
resp_base_compaction_cnt=0|base_compaction_cnt=0|resp_cumulative_compaction_cnt=0|cumulative_compaction_cnt=1|resp_max_version=11|max_version=8
+ //
+ // BE-compaction-thread meta-service
BE-query-thread
+ // | |
|
+ // local | commit cumu-compaction |
|
+ // cc_cnt=0 | ---------------------------> | sync rowset
(long rpc, local cc_cnt=0 ) | local
+ // | |
<----------------------------------------- | cc_cnt=0
+ // | | -.
|
+ // local | done cc_cnt=1 | \
|
+ // cc_cnt=1 | <--------------------------- | \
|
+ // | | \ returned
with resp cc_cnt=0 (snapshot) |
+ // | |
'------------------------------------> | local
+ // | |
| cc_cnt=1
+ // | |
|
+ // | |
| CHECK FAIL
+ // | |
| need retry
+ // To get rid of just retry syncing tablet
+ if (stats.base_compaction_cnt() < tablet->base_compaction_cnt() ||
+ stats.cumulative_compaction_cnt() <
tablet->cumulative_compaction_cnt())
+ [[unlikely]] {
+ // stale request, ignore
+ LOG_WARNING("stale get rowset meta request")
+ .tag("resp_base_compaction_cnt",
stats.base_compaction_cnt())
+ .tag("base_compaction_cnt",
tablet->base_compaction_cnt())
+ .tag("resp_cumulative_compaction_cnt",
stats.cumulative_compaction_cnt())
+ .tag("cumulative_compaction_cnt",
tablet->cumulative_compaction_cnt())
+ .tag("tried", tried);
+ if (tried++ < 10) continue;
+ return Status::OK();
+ }
+ std::vector<RowsetSharedPtr> rowsets;
+ rowsets.reserve(resp.rowset_meta().size());
+ for (const auto& cloud_rs_meta_pb : resp.rowset_meta()) {
+ VLOG_DEBUG << "get rowset meta, tablet_id=" <<
cloud_rs_meta_pb.tablet_id()
+ << ", version=[" <<
cloud_rs_meta_pb.start_version() << '-'
+ << cloud_rs_meta_pb.end_version() << ']';
+ auto existed_rowset = tablet->get_rowset_by_version(
+ {cloud_rs_meta_pb.start_version(),
cloud_rs_meta_pb.end_version()});
+ if (existed_rowset &&
+ existed_rowset->rowset_id().to_string() ==
cloud_rs_meta_pb.rowset_id_v2()) {
+ continue; // Same rowset, skip it
+ }
+ RowsetMetaPB meta_pb =
cloud_rowset_meta_to_doris(cloud_rs_meta_pb);
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->init_from_pb(meta_pb);
+ RowsetSharedPtr rowset;
+ // schema is nullptr implies using RowsetMeta.tablet_schema
+ Status s = RowsetFactory::create_rowset(nullptr,
tablet->tablet_path(), rs_meta,
+ &rowset);
+ if (!s.ok()) {
+ LOG_WARNING("create rowset").tag("status", s);
+ return s;
+ }
+ rowsets.push_back(std::move(rowset));
+ }
+ if (!rowsets.empty()) {
+ // `rowsets.empty()` could happen after doing EMPTY_CUMULATIVE
compaction. e.g.:
+ // BE has [0-1][2-11][12-12], [12-12] is delete predicate,
cp is 2;
+ // after doing EMPTY_CUMULATIVE compaction, MS cp is 13,
get_rowset will return [2-11][12-12].
+ bool version_overlap =
+ tablet->local_max_version() >=
rowsets.front()->start_version();
+ tablet->add_rowsets(std::move(rowsets), version_overlap,
wlock, warmup_delta_data);
+ }
+ tablet->last_base_compaction_success_time_ms =
stats.last_base_compaction_time_ms();
+ tablet->last_cumu_compaction_success_time_ms =
stats.last_cumu_compaction_time_ms();
+ tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
+
tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
+ tablet->set_cumulative_layer_point(stats.cumulative_point());
+ tablet->reset_approximate_stats(stats.num_rowsets(),
stats.num_segments(),
+ stats.num_rows(),
stats.data_size());
+ }
+ return Status::OK();
+ }
}
Status CloudMetaMgr::sync_tablet_delete_bitmap(
CloudTablet* tablet, int64_t old_max_version,
- const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas,
+ const google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>& rs_metas,
const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap*
delete_bitmap) {
- return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is
not implemented");
+ if (rs_metas.empty()) {
+ return Status::OK();
+ }
+
+ std::shared_ptr<MetaService_Stub> stub;
+ RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
+
+ int64_t new_max_version = std::max(old_max_version,
rs_metas.rbegin()->end_version());
+ brpc::Controller cntl;
+ // When there are many delete bitmaps that need to be synchronized, it
+ // may take a longer time, especially when loading the tablet for the
+ // first time, so set a relatively long timeout time.
+ cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
+ GetDeleteBitmapRequest req;
+ GetDeleteBitmapResponse res;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ req.set_tablet_id(tablet->tablet_id());
+ req.set_base_compaction_cnt(stats.base_compaction_cnt());
+ req.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
+ req.set_cumulative_point(stats.cumulative_point());
+ *(req.mutable_idx()) = idx;
+ // New rowset sync all versions of delete bitmap
+ for (const auto& rs_meta : rs_metas) {
+ req.add_rowset_ids(rs_meta.rowset_id_v2());
+ req.add_begin_versions(0);
+ req.add_end_versions(new_max_version);
+ }
+
+ // old rowset sync incremental versions of delete bitmap
+ if (old_max_version > 0 && old_max_version < new_max_version) {
+ RowsetIdUnorderedSet all_rs_ids;
+ RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
+ for (const auto& rs_id : all_rs_ids) {
+ req.add_rowset_ids(rs_id.to_string());
+ req.add_begin_versions(old_max_version + 1);
+ req.add_end_versions(new_max_version);
+ }
+ }
+
+ VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();
+ stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
+ if (cntl.Failed()) {
+ return Status::RpcError("failed to get delete bitmap: {}",
cntl.ErrorText());
+ }
+ if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
+ return Status::NotFound("failed to get delete bitmap: {}",
res.status().msg());
+ }
+ // The delete bitmap of stale rowsets will be removed when commit
compaction job,
+ // then delete bitmap of stale rowsets cannot be obtained. But the rowsets
obtained
+ // by sync_tablet_rowsets may include these stale rowsets. When this case
happend, the
+ // error code of ROWSETS_EXPIRED will be returned, we need to retry sync
rowsets again.
+ //
+ // Be query thread meta-service Be compaction thread
+ // | | |
+ // | get rowset | |
+ // |--------------------------->| |
+ // | return get rowset | |
+ // |<---------------------------| |
+ // | | commit job |
+ // | |<------------------------|
+ // | | return commit job |
+ // | |------------------------>|
+ // | get delete bitmap | |
+ // |--------------------------->| |
+ // | return get delete bitmap | |
+ // |<---------------------------| |
+ // | | |
+ if (res.status().code() == MetaServiceCode::ROWSETS_EXPIRED) {
+ return Status::Error<ErrorCode::ROWSETS_EXPIRED, false>("failed to get
delete bitmap: {}",
+
res.status().msg());
+ }
+ if (res.status().code() != MetaServiceCode::OK) {
+ return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to get
delete bitmap: {}",
+
res.status().msg());
+ }
+ const auto& rowset_ids = res.rowset_ids();
+ const auto& segment_ids = res.segment_ids();
+ const auto& vers = res.versions();
+ const auto& delete_bitmaps = res.segment_delete_bitmaps();
+ for (size_t i = 0; i < rowset_ids.size(); i++) {
+ RowsetId rst_id;
+ rst_id.init(rowset_ids[i]);
+ delete_bitmap->merge({rst_id, segment_ids[i], vers[i]},
+ roaring::Roaring::read(delete_bitmaps[i].data()));
+ }
+ return Status::OK();
}
Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp,
RowsetMetaSharedPtr* existed_rs_meta) {
- return Status::NotSupported("CloudMetaMgr::prepare_rowset is not
implemented");
+ VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id()
+ << ", rowset_id: " << rs_meta.rowset_id() << ", is_tmp: " <<
is_tmp;
+
+ CreateRowsetRequest req;
+ CreateRowsetResponse resp;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ req.set_temporary(is_tmp);
+
+ RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true);
+ rs_meta.to_rowset_pb(&doris_rs_meta, true);
+ doris_rowset_meta_to_cloud(req.mutable_rowset_meta(),
std::move(doris_rs_meta));
+
+ Status st = retry_rpc("prepare rowset", req, &resp,
&MetaService_Stub::prepare_rowset);
+ if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
+ if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
+ RowsetMetaPB doris_rs_meta =
+
cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta()));
+ *existed_rs_meta = std::make_shared<RowsetMeta>();
+ (*existed_rs_meta)->init_from_pb(doris_rs_meta);
+ }
+ return Status::AlreadyExist("failed to prepare rowset: {}",
resp.status().msg());
+ }
+ return st;
}
Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, bool is_tmp,
RowsetMetaSharedPtr* existed_rs_meta) {
- return Status::NotSupported("CloudMetaMgr::commit_rowset is not
implemented");
+ VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
+ << ", rowset_id: " << rs_meta.rowset_id() << ", is_tmp: " <<
is_tmp;
+ CreateRowsetRequest req;
+ CreateRowsetResponse resp;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ req.set_temporary(is_tmp);
+
+ RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb();
+ doris_rowset_meta_to_cloud(req.mutable_rowset_meta(),
std::move(rs_meta_pb));
+ Status st = retry_rpc("commit rowset", req, &resp,
&MetaService_Stub::commit_rowset);
+ if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
+ if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
+ RowsetMetaPB doris_rs_meta =
+
cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta()));
+ *existed_rs_meta = std::make_shared<RowsetMeta>();
+ (*existed_rs_meta)->init_from_pb(doris_rs_meta);
+ }
+ return Status::AlreadyExist("failed to commit rowset: {}",
resp.status().msg());
+ }
+ return st;
}
Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {
- return Status::NotSupported("CloudMetaMgr::update_tmp_rowset is not
implemented");
+ VLOG_DEBUG << "update committed rowset, tablet_id: " << rs_meta.tablet_id()
+ << ", rowset_id: " << rs_meta.rowset_id();
+ CreateRowsetRequest req;
+ CreateRowsetResponse resp;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+
+ RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(true);
+ doris_rowset_meta_to_cloud(req.mutable_rowset_meta(),
std::move(rs_meta_pb));
+ Status st =
+ retry_rpc("update committed rowset", req, &resp,
&MetaService_Stub::update_tmp_rowset);
+ if (!st.ok() && resp.status().code() ==
MetaServiceCode::ROWSET_META_NOT_FOUND) {
+ return Status::InternalError("failed to update committed rowset: {}",
resp.status().msg());
+ }
+ return st;
}
Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
@@ -436,15 +750,14 @@ Status CloudMetaMgr::update_delete_bitmap(const
CloudTablet& tablet, int64_t loc
req.set_tablet_id(tablet.tablet_id());
req.set_lock_id(lock_id);
req.set_initiator(initiator);
- for (auto iter = delete_bitmap->delete_bitmap.begin();
- iter != delete_bitmap->delete_bitmap.end(); ++iter) {
- req.add_rowset_ids(std::get<0>(iter->first).to_string());
- req.add_segment_ids(std::get<1>(iter->first));
- req.add_versions(std::get<2>(iter->first));
+ for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
+ req.add_rowset_ids(std::get<0>(key).to_string());
+ req.add_segment_ids(std::get<1>(key));
+ req.add_versions(std::get<2>(key));
// To save space, convert array and bitmap containers to run containers
- iter->second.runOptimize();
- std::string bitmap_data(iter->second.getSizeInBytes(), '\0');
- iter->second.write(bitmap_data.data());
+ bitmap.runOptimize();
+ std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
+ bitmap.write(bitmap_data.data());
*(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
}
auto st = retry_rpc("update delete bitmap", req, &res,
&MetaService_Stub::update_delete_bitmap);
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index af5b048b2f0..59f11c17152 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -88,7 +88,7 @@ public:
private:
Status sync_tablet_delete_bitmap(
CloudTablet* tablet, int64_t old_max_version,
- const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas,
+ const google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>&
rs_metas,
const TabletStatsPB& stas, const TabletIndexPB& idx, DeleteBitmap*
delete_bitmap);
};
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 6beb1e45d94..c83731fe83f 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -53,7 +53,7 @@ Status CloudTablet::capture_rs_readers(const Version&
spec_version,
if (!st.ok()) {
rlock.unlock(); // avoid logging in lock range
// Check no missed versions or req version is merged
- auto missed_versions = calc_missed_versions(spec_version.second);
+ auto missed_versions = get_missed_versions(spec_version.second);
if (missed_versions.empty()) {
st.set_code(VERSION_ALREADY_MERGED); // Reset error code
}
@@ -67,50 +67,6 @@ Status CloudTablet::capture_rs_readers(const Version&
spec_version,
return capture_rs_readers_unlocked(version_path, rs_splits);
}
-// for example:
-// [0-4][5-5][8-8][9-9][13-13]
-// if spec_version = 12, it will return [6-7],[10-12]
-Versions CloudTablet::calc_missed_versions(int64_t spec_version) {
- DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
-
- Versions missed_versions;
- Versions existing_versions;
- {
- std::shared_lock rdlock(_meta_lock);
- for (const auto& rs : _tablet_meta->all_rs_metas()) {
- existing_versions.emplace_back(rs->version());
- }
- }
-
- // sort the existing versions in ascending order
- std::sort(existing_versions.begin(), existing_versions.end(),
- [](const Version& a, const Version& b) {
- // simple because 2 versions are certainly not overlapping
- return a.first < b.first;
- });
-
- auto min_version = existing_versions.front().first;
- if (min_version > 0) {
- missed_versions.emplace_back(0, std::min(spec_version, min_version -
1));
- }
- for (auto it = existing_versions.begin(); it != existing_versions.end() -
1; ++it) {
- auto prev_v = it->second;
- if (prev_v >= spec_version) {
- return missed_versions;
- }
- auto next_v = (it + 1)->first;
- if (next_v > prev_v + 1) {
- // there is a hole between versions
- missed_versions.emplace_back(prev_v + 1, std::min(spec_version,
next_v - 1));
- }
- }
- auto max_version = existing_versions.back().second;
- if (max_version < spec_version) {
- missed_versions.emplace_back(max_version + 1, spec_version);
- }
- return missed_versions;
-}
-
Status CloudTablet::sync_meta() {
// TODO(lightman): FileCache
return Status::NotSupported("CloudTablet::sync_meta is not implemented");
@@ -451,4 +407,12 @@ void CloudTablet::get_compaction_status(std::string*
json_result) {
*json_result = std::string(strbuf.GetString());
}
+inline void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
+ // cumulative point should only be reset to -1, or be increased
+ CHECK(new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >=
_cumulative_point)
+ << "Unexpected cumulative point: " << new_point
+ << ", origin: " << _cumulative_point.load();
+ _cumulative_point = new_point;
+}
+
} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index bf8db3c9451..bfd22d2f54c 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -20,7 +20,6 @@
#include <atomic>
#include "olap/base_tablet.h"
-#include "olap/version_graph.h"
namespace doris {
@@ -64,7 +63,7 @@ public:
// If tablet state is not `TABLET_RUNNING`, sync tablet meta and all
visible rowsets.
// If `query_version` > 0 and local max_version of the tablet >=
`query_version`, do nothing.
// If 'need_download_data_async' is true, it means that we need to
download the new version
- // rowsets datas async.
+ // rowsets datum async.
Status sync_rowsets(int64_t query_version = -1, bool warmup_delta_data =
false);
// Synchronize the tablet meta from meta service.
@@ -74,7 +73,7 @@ public:
// If 'warmup_delta_data' is true, download the new version rowset data in
background.
// MUST hold EXCLUSIVE `_meta_lock`.
// If 'need_download_data_async' is true, it means that we need to
download the new version
- // rowsets datas async.
+ // rowsets datum async.
void add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
std::unique_lock<std::shared_mutex>& meta_lock,
bool warmup_delta_data = false);
@@ -98,6 +97,17 @@ public:
int64_t get_cloud_base_compaction_score() const;
int64_t get_cloud_cumu_compaction_score() const;
+ int64_t local_max_version() const { return _max_version; }
+ int64_t base_compaction_cnt() const { return _base_compaction_cnt; }
+ int64_t cumulative_compaction_cnt() const { return
_cumulative_compaction_cnt; }
+ int64_t cumulative_layer_point() const {
+ return _cumulative_point.load(std::memory_order_relaxed);
+ }
+
+ void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
+ void set_cumulative_compaction_cnt(int64_t cnt) {
_cumulative_compaction_cnt = cnt; }
+ void set_cumulative_layer_point(int64_t new_point);
+
int64_t last_sync_time_s = 0;
int64_t last_load_time_ms = 0;
int64_t last_base_compaction_success_time_ms = 0;
@@ -105,8 +115,6 @@ public:
int64_t last_cumu_no_suitable_version_ms = 0;
private:
- Versions calc_missed_versions(int64_t spec_version);
-
// FIXME(plat1ko): No need to record base size if rowsets are ordered by
version
void update_base_size(const Rowset& rs);
@@ -126,8 +134,8 @@ private:
// Number of sorted arrays (e.g. for rowset with N segments, if rowset is
overlapping, delta is N, otherwise 1) after cumu point
std::atomic<int64_t> _approximate_cumu_num_deltas {-1};
- [[maybe_unused]] int64_t _base_compaction_cnt = 0;
- [[maybe_unused]] int64_t _cumulative_compaction_cnt = 0;
+ int64_t _base_compaction_cnt = 0;
+ int64_t _cumulative_compaction_cnt = 0;
int64_t _max_version = -1;
int64_t _base_size = 0;
};
diff --git a/be/src/common/status.h b/be/src/common/status.h
index a4ab9f60b0f..3fcd0410090 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -276,7 +276,8 @@ namespace ErrorCode {
E(KEY_NOT_FOUND, -7000, false); \
E(KEY_ALREADY_EXISTS, -7001, false); \
E(ENTRY_NOT_FOUND, -7002, false); \
- E(INVALID_TABLET_STATE, -7211, false);
+ E(INVALID_TABLET_STATE, -7211, false); \
+ E(ROWSETS_EXPIRED, -7311, false);
// Define constexpr int error_code_name = error_code_value
#define M(NAME, ERRORCODE, ENABLESTACKTRACE) constexpr int NAME = ERRORCODE;
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 18445cb17a6..8612e4d83bd 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -22,7 +22,6 @@
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/tablet_fwd.h"
-#include "olap/tablet_schema_cache.h"
#include "util/doris_metrics.h"
#include "vec/common/schema_util.h"
@@ -81,7 +80,7 @@ Status BaseTablet::update_by_least_common_schema(const
TabletSchemaSPtr& update_
return Status::OK();
}
-Status BaseTablet::capture_rs_readers_unlocked(const std::vector<Version>&
version_path,
+Status BaseTablet::capture_rs_readers_unlocked(const Versions& version_path,
std::vector<RowSetSplits>*
rs_splits) const {
DCHECK(rs_splits != nullptr && rs_splits->empty());
for (auto version : version_path) {
@@ -104,11 +103,143 @@ Status BaseTablet::capture_rs_readers_unlocked(const
std::vector<Version>& versi
return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
"failed to create reader for rowset:{}",
it->second->rowset_id().to_string());
}
- rs_splits->push_back(RowSetSplits(std::move(rs_reader)));
+ rs_splits->emplace_back(std::move(rs_reader));
}
return Status::OK();
}
+// snapshot manager may call this api to check if version exists, so that
+// the version maybe not exist
+RowsetSharedPtr BaseTablet::get_rowset_by_version(const Version& version,
+ bool find_in_stale) const {
+ auto iter = _rs_version_map.find(version);
+ if (iter == _rs_version_map.end()) {
+ if (find_in_stale) {
+ return get_stale_rowset_by_version(version);
+ }
+ return nullptr;
+ }
+ return iter->second;
+}
+
+RowsetSharedPtr BaseTablet::get_stale_rowset_by_version(const Version&
version) const {
+ auto iter = _stale_rs_version_map.find(version);
+ if (iter == _stale_rs_version_map.end()) {
+ VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " <<
tablet_id();
+ return nullptr;
+ }
+ return iter->second;
+}
+
+// Already under _meta_lock
+RowsetSharedPtr BaseTablet::get_rowset_with_max_version() const {
+ Version max_version = _tablet_meta->max_version();
+ if (max_version.first == -1) {
+ return nullptr;
+ }
+
+ auto iter = _rs_version_map.find(max_version);
+ if (iter == _rs_version_map.end()) {
+ DCHECK(false) << "invalid version:" << max_version;
+ return nullptr;
+ }
+ return iter->second;
+}
+
+Status BaseTablet::get_all_rs_id(int64_t max_version, RowsetIdUnorderedSet*
rowset_ids) const {
+ std::shared_lock rlock(_meta_lock);
+ return get_all_rs_id_unlocked(max_version, rowset_ids);
+}
+
+Status BaseTablet::get_all_rs_id_unlocked(int64_t max_version,
+ RowsetIdUnorderedSet* rowset_ids)
const {
+ // Ensure that the obtained versions of rowsets are continuous
+ Version spec_version(0, max_version);
+ Versions version_path;
+ auto st =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
&version_path);
+ if (!st.ok()) [[unlikely]] {
+ return st;
+ }
+
+ for (auto& ver : version_path) {
+ if (ver.second == 1) {
+ // [0-1] rowset is empty for each tablet, skip it
+ continue;
+ }
+ auto it = _rs_version_map.find(ver);
+ if (it == _rs_version_map.end()) {
+ return Status::Error<CAPTURE_ROWSET_ERROR, false>(
+ "fail to find Rowset for version. tablet={}, version={}",
tablet_id(),
+ ver.to_string());
+ }
+ rowset_ids->emplace(it->second->rowset_id());
+ }
+ return Status::OK();
+}
+
+Versions BaseTablet::get_missed_versions(int64_t spec_version) const {
+ DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+ Versions existing_versions;
+ {
+ std::shared_lock rdlock(_meta_lock);
+ for (const auto& rs : _tablet_meta->all_rs_metas()) {
+ existing_versions.emplace_back(rs->version());
+ }
+ }
+ return calc_missed_versions(spec_version, existing_versions);
+}
+
+Versions BaseTablet::get_missed_versions_unlocked(int64_t spec_version) const {
+ DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+ Versions existing_versions;
+ for (const auto& rs : _tablet_meta->all_rs_metas()) {
+ existing_versions.emplace_back(rs->version());
+ }
+ return calc_missed_versions(spec_version, existing_versions);
+}
+
+Versions BaseTablet::calc_missed_versions(int64_t spec_version, Versions
existing_versions) {
+ DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+ // sort the existing versions in ascending order
+ std::sort(existing_versions.begin(), existing_versions.end(),
+ [](const Version& a, const Version& b) {
+ // simple because 2 versions are certainly not overlapping
+ return a.first < b.first;
+ });
+
+ // From the first version(=0), find the missing version until spec_version
+ int64_t last_version = -1;
+ Versions missed_versions;
+ for (const Version& version : existing_versions) {
+ if (version.first > last_version + 1) {
+ // there is a hole between versions
+ missed_versions.emplace_back(last_version + 1,
std::min(version.first, spec_version));
+ }
+ last_version = version.second;
+ if (last_version >= spec_version) {
+ break;
+ }
+ }
+ if (last_version < spec_version) {
+ // there is a hole between the last version and the specificed version.
+ missed_versions.emplace_back(last_version + 1, spec_version);
+ }
+ return missed_versions;
+}
+
+void BaseTablet::_print_missed_versions(const Versions& missed_versions) const
{
+ std::stringstream ss;
+ ss << tablet_id() << " has " << missed_versions.size() << " missed
version:";
+ // print at most 10 version
+ for (int i = 0; i < 10 && i < missed_versions.size(); ++i) {
+ ss << missed_versions[i] << ",";
+ }
+ LOG(WARNING) << ss.str();
+}
+
bool BaseTablet::_reconstruct_version_tracker_if_necessary() {
double orphan_vertex_ratio =
_timestamped_version_tracker.get_orphan_vertex_ratio();
if (orphan_vertex_ratio >=
config::tablet_version_graph_orphan_vertex_ratio) {
@@ -119,4 +250,4 @@ bool
BaseTablet::_reconstruct_version_tracker_if_necessary() {
return false;
}
-} /* namespace doris */
+} // namespace doris
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index bb327b39532..25c54c47cda 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -44,6 +44,7 @@ public:
TabletState tablet_state() const { return _tablet_meta->tablet_state(); }
Status set_tablet_state(TabletState state);
int64_t table_id() const { return _tablet_meta->table_id(); }
+ int64_t index_id() const { return _tablet_meta->index_id(); }
int64_t partition_id() const { return _tablet_meta->partition_id(); }
int64_t tablet_id() const { return _tablet_meta->tablet_id(); }
int32_t schema_hash() const { return _tablet_meta->schema_hash(); }
@@ -85,16 +86,37 @@ public:
virtual size_t tablet_footprint() = 0;
// MUST hold shared meta lock
- Status capture_rs_readers_unlocked(const std::vector<Version>&
version_path,
+ Status capture_rs_readers_unlocked(const Versions& version_path,
std::vector<RowSetSplits>* rs_splits)
const;
+ // _rs_version_map and _stale_rs_version_map should be protected by
_meta_lock
+ // The caller must call hold _meta_lock when call this three function.
+ RowsetSharedPtr get_rowset_by_version(const Version& version, bool
find_is_stale = false) const;
+ RowsetSharedPtr get_stale_rowset_by_version(const Version& version) const;
+ RowsetSharedPtr get_rowset_with_max_version() const;
+
+ Status get_all_rs_id(int64_t max_version, RowsetIdUnorderedSet*
rowset_ids) const;
+ Status get_all_rs_id_unlocked(int64_t max_version, RowsetIdUnorderedSet*
rowset_ids) const;
+
+ // Get the missed versions until the spec_version.
+ Versions get_missed_versions(int64_t spec_version) const;
+ Versions get_missed_versions_unlocked(int64_t spec_version) const;
+
protected:
+ // Find the missed versions until the spec_version.
+ //
+ // for example:
+ // [0-4][5-5][8-8][9-9][14-14]
+ // if spec_version = 12, it will return [6-7],[10-12]
+ static Versions calc_missed_versions(int64_t spec_version, Versions
existing_versions);
+
+ void _print_missed_versions(const Versions& missed_versions) const;
bool _reconstruct_version_tracker_if_necessary();
mutable std::shared_mutex _meta_lock;
TimestampedVersionTracker _timestamped_version_tracker;
// After version 0.13, all newly created rowsets are saved in
_rs_version_map.
- // And if rowset being compacted, the old rowsetis will be saved in
_stale_rs_version_map;
+ // And if rowset being compacted, the old rowsets will be saved in
_stale_rs_version_map;
std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>
_rs_version_map;
// This variable _stale_rs_version_map is used to record these rowsets
which are be compacted.
// These _stale rowsets are been removed when rowsets' pathVersion is
expired,
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index bc84bbcb84e..53d68c56647 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -560,18 +560,16 @@ Status Compaction::do_compaction_impl(int64_t permits) {
_tablet->set_last_full_compaction_success_time(now);
}
- int64_t current_max_version;
+ int64_t current_max_version = -1;
{
std::shared_lock rdlock(_tablet->get_header_lock());
- RowsetSharedPtr max_rowset = _tablet->rowset_with_max_version();
- if (max_rowset == nullptr) {
- current_max_version = -1;
- } else {
- current_max_version =
_tablet->rowset_with_max_version()->end_version();
+ current_max_version = -1;
+ if (RowsetSharedPtr max_rowset =
_tablet->get_rowset_with_max_version()) {
+ current_max_version = max_rowset->end_version();
}
}
- auto cumu_policy = _tablet->cumulative_compaction_policy();
+ auto* cumu_policy = _tablet->cumulative_compaction_policy();
DCHECK(cumu_policy);
LOG(INFO) << "succeed to do " << compaction_name() << " is_vertical=" <<
vertical_compaction
<< ". tablet=" << _tablet->tablet_id() << ", output_version=" <<
_output_version
diff --git a/be/src/olap/rowset/rowset_meta.cpp
b/be/src/olap/rowset/rowset_meta.cpp
index 7f4798f97e9..d762a438bf9 100644
--- a/be/src/olap/rowset/rowset_meta.cpp
+++ b/be/src/olap/rowset/rowset_meta.cpp
@@ -102,18 +102,20 @@ void RowsetMeta::set_fs(io::FileSystemSPtr fs) {
_fs = std::move(fs);
}
-void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) const {
+void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema)
const {
*rs_meta_pb = _rowset_meta_pb;
- if (_schema) {
- _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
+ if (_schema) [[likely]] {
+ rs_meta_pb->set_schema_version(_schema->schema_version());
+ if (!skip_schema) {
+ // For cloud, separate tablet schema from rowset meta to reduce
persistent size.
+ _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
+ }
}
}
-RowsetMetaPB RowsetMeta::get_rowset_pb() {
- RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
- if (_schema) {
- _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
- }
+RowsetMetaPB RowsetMeta::get_rowset_pb(bool skip_schema) const {
+ RowsetMetaPB rowset_meta_pb;
+ to_rowset_pb(&rowset_meta_pb, skip_schema);
return rowset_meta_pb;
}
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 7e1dfaa57c3..d2180a46329 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -197,9 +197,11 @@ public:
void set_num_segments(int64_t num_segments) {
_rowset_meta_pb.set_num_segments(num_segments); }
- void to_rowset_pb(RowsetMetaPB* rs_meta_pb) const;
+ // Convert to RowsetMetaPB, skip_schema is only used by cloud to separate
schema from rowset meta.
+ void to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema = false)
const;
- RowsetMetaPB get_rowset_pb();
+ // Convert to RowsetMetaPB, skip_schema is only used by cloud to separate
schema from rowset meta.
+ RowsetMetaPB get_rowset_pb(bool skip_schema = false) const;
inline DeletePredicatePB* mutable_delete_pred_pb() {
return _rowset_meta_pb.mutable_delete_predicate();
@@ -302,7 +304,7 @@ public:
void set_tablet_schema(const TabletSchemaSPtr& tablet_schema);
void set_tablet_schema(const TabletSchemaPB& tablet_schema);
- const TabletSchemaSPtr& tablet_schema() { return _schema; }
+ const TabletSchemaSPtr& tablet_schema() const { return _schema; }
// Because the member field '_handle' is a raw pointer, use member func
'init' to replace copy ctor
RowsetMeta(const RowsetMeta&) = delete;
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index e7fb520c0bf..0c951405b28 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -128,7 +128,7 @@ Status
RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
}
_rowset_ids.clear();
} else {
- RETURN_IF_ERROR(tablet()->all_rs_id(cur_max_version, &_rowset_ids));
+ RETURN_IF_ERROR(tablet()->get_all_rs_id_unlocked(cur_max_version,
&_rowset_ids));
}
_delete_bitmap = std::make_shared<DeleteBitmap>(tablet()->tablet_id());
mow_context =
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 327afd6a8c4..982a91c38e0 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -979,7 +979,7 @@ bool SchemaChangeHandler::tablet_in_converting(int64_t
tablet_id) {
Status SchemaChangeHandler::_get_versions_to_be_changed(
TabletSharedPtr base_tablet, std::vector<Version>*
versions_to_be_changed,
RowsetSharedPtr* max_rowset) {
- RowsetSharedPtr rowset = base_tablet->rowset_with_max_version();
+ RowsetSharedPtr rowset = base_tablet->get_rowset_with_max_version();
if (rowset == nullptr) {
return Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>("Tablet has no
version. base_tablet={}",
base_tablet->tablet_id());
diff --git a/be/src/olap/single_replica_compaction.cpp
b/be/src/olap/single_replica_compaction.cpp
index 72a0cdfc01c..7d78ae1006b 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -157,14 +157,11 @@ Status
SingleReplicaCompaction::_do_single_replica_compaction_impl() {
_tablet->set_last_full_compaction_success_time(UnixMillis());
}
- int64_t current_max_version;
+ int64_t current_max_version = -1;
{
std::shared_lock rdlock(_tablet->get_header_lock());
- RowsetSharedPtr max_rowset = _tablet->rowset_with_max_version();
- if (max_rowset == nullptr) {
- current_max_version = -1;
- } else {
- current_max_version =
_tablet->rowset_with_max_version()->end_version();
+ if (RowsetSharedPtr max_rowset =
_tablet->get_rowset_with_max_version()) {
+ current_max_version = max_rowset->end_version();
}
}
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index eb42ad4ac72..93b8c30ff54 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -492,7 +492,7 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
consistent_rowsets.clear(); // reset vector
// get latest version
- const RowsetSharedPtr last_version =
ref_tablet->rowset_with_max_version();
+ const RowsetSharedPtr last_version =
ref_tablet->get_rowset_with_max_version();
if (last_version == nullptr) {
res = Status::InternalError("tablet has not any version.
path={}",
ref_tablet->tablet_id());
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 50ec2304096..96f00ccd380 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -601,44 +601,6 @@ void Tablet::delete_rowsets(const
std::vector<RowsetSharedPtr>& to_delete, bool
}
}
-// snapshot manager may call this api to check if version exists, so that
-// the version maybe not exist
-const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version,
- bool find_in_stale) const {
- auto iter = _rs_version_map.find(version);
- if (iter == _rs_version_map.end()) {
- if (find_in_stale) {
- return get_stale_rowset_by_version(version);
- }
- return nullptr;
- }
- return iter->second;
-}
-
-const RowsetSharedPtr Tablet::get_stale_rowset_by_version(const Version&
version) const {
- auto iter = _stale_rs_version_map.find(version);
- if (iter == _stale_rs_version_map.end()) {
- VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " <<
tablet_id();
- return nullptr;
- }
- return iter->second;
-}
-
-// Already under _meta_lock
-const RowsetSharedPtr Tablet::rowset_with_max_version() const {
- Version max_version = _tablet_meta->max_version();
- if (max_version.first == -1) {
- return nullptr;
- }
-
- auto iter = _rs_version_map.find(max_version);
- if (iter == _rs_version_map.end()) {
- DCHECK(false) << "invalid version:" << max_version;
- return nullptr;
- }
- return iter->second;
-}
-
TabletSchemaSPtr Tablet::tablet_schema_with_merged_max_schema_version(
const std::vector<RowsetMetaSharedPtr>& rowset_metas) {
RowsetMetaSharedPtr max_schema_version_rs = *std::max_element(
@@ -729,16 +691,14 @@ void Tablet::delete_expired_stale_rowset() {
return;
}
- const RowsetSharedPtr lastest_delta = rowset_with_max_version();
+ const RowsetSharedPtr lastest_delta = get_rowset_with_max_version();
if (lastest_delta == nullptr) {
LOG(WARNING) << "lastest_delta is null " << tablet_id();
return;
}
// fetch missing version before delete
- std::vector<Version> missed_versions;
- calc_missed_versions_unlocked(lastest_delta->end_version(),
&missed_versions);
-
+ Versions missed_versions =
get_missed_versions_unlocked(lastest_delta->end_version());
if (!missed_versions.empty()) {
LOG(WARNING) << "tablet:" << tablet_id()
<< ", missed version for version:" <<
lastest_delta->end_version();
@@ -762,8 +722,8 @@ void Tablet::delete_expired_stale_rowset() {
// 1. When there is no consistent versions, we must reconstruct
the tracker.
if (!status.ok()) {
// 2. fetch missing version after delete
- std::vector<Version> after_missed_versions;
- calc_missed_versions_unlocked(lastest_delta->end_version(),
&after_missed_versions);
+ Versions after_missed_versions =
+
get_missed_versions_unlocked(lastest_delta->end_version());
// 2.1 check whether missed_versions and after_missed_versions
are the same.
// when they are the same, it means we can delete the path
securely.
@@ -788,9 +748,8 @@ void Tablet::delete_expired_stale_rowset() {
// 4. double check the consistent versions
// fetch missing version after recover
- std::vector<Version> recover_missed_versions;
- calc_missed_versions_unlocked(lastest_delta->end_version(),
- &recover_missed_versions);
+ Versions recover_missed_versions =
+
get_missed_versions_unlocked(lastest_delta->end_version());
// 4.1 check whether missed_versions and
recover_missed_versions are the same.
// when they are the same, it means we recover
successfully.
@@ -869,13 +828,12 @@ void Tablet::delete_expired_stale_rowset() {
}
Status Tablet::capture_consistent_versions_unlocked(const Version&
spec_version,
- std::vector<Version>*
version_path,
+ Versions* version_path,
bool skip_missing_version,
bool quiet) const {
Status status =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
version_path);
if (!status.ok() && !quiet) {
- std::vector<Version> missed_versions;
- calc_missed_versions_unlocked(spec_version.second, &missed_versions);
+ Versions missed_versions =
get_missed_versions_unlocked(spec_version.second);
if (missed_versions.empty()) {
// if version_path is null, it may be a compaction check logic.
// so to avoid print too many logs.
@@ -1084,46 +1042,6 @@ uint32_t Tablet::_calc_base_compaction_score() const {
return base_rowset_exist ? score : 0;
}
-void Tablet::calc_missed_versions(int64_t spec_version, std::vector<Version>*
missed_versions) {
- std::shared_lock rdlock(_meta_lock);
- calc_missed_versions_unlocked(spec_version, missed_versions);
-}
-
-// for example:
-// [0-4][5-5][8-8][9-9]
-// if spec_version = 6, we still return {7} other than {6, 7}
-void Tablet::calc_missed_versions_unlocked(int64_t spec_version,
- std::vector<Version>*
missed_versions) const {
- DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
- std::list<Version> existing_versions;
- for (auto& rs : _tablet_meta->all_rs_metas()) {
- existing_versions.emplace_back(rs->version());
- }
-
- // sort the existing versions in ascending order
- existing_versions.sort([](const Version& a, const Version& b) {
- // simple because 2 versions are certainly not overlapping
- return a.first < b.first;
- });
-
- // From the first version(=0), find the missing version until spec_version
- int64_t last_version = -1;
- for (const Version& version : existing_versions) {
- if (version.first > last_version + 1) {
- for (int64_t i = last_version + 1; i < version.first && i <=
spec_version; ++i) {
- missed_versions->emplace_back(Version(i, i));
- }
- }
- last_version = version.second;
- if (last_version >= spec_version) {
- break;
- }
- }
- for (int64_t i = last_version + 1; i <= spec_version; ++i) {
- missed_versions->emplace_back(Version(i, i));
- }
-}
-
void Tablet::max_continuous_version_from_beginning(Version* version, Version*
max_version) {
bool has_version_cross;
std::shared_lock rdlock(_meta_lock);
@@ -1224,16 +1142,6 @@ bool Tablet::check_path(const std::string&
path_to_check) const {
return false;
}
-void Tablet::_print_missed_versions(const std::vector<Version>&
missed_versions) const {
- std::stringstream ss;
- ss << tablet_id() << " has " << missed_versions.size() << " missed
version:";
- // print at most 10 version
- for (int i = 0; i < 10 && i < missed_versions.size(); ++i) {
- ss << missed_versions[i] << ",";
- }
- LOG(WARNING) << ss.str();
-}
-
Status Tablet::_contains_version(const Version& version) {
// check if there exist a rowset contains the added rowset
for (auto& it : _rs_version_map) {
@@ -1984,13 +1892,12 @@ Status
Tablet::create_transient_rowset_writer(RowsetWriterContext& context,
void Tablet::_init_context_common_fields(RowsetWriterContext& context) {
context.tablet_uid = tablet_uid();
-
context.tablet_id = tablet_id();
context.partition_id = partition_id();
context.tablet_schema_hash = schema_hash();
context.rowset_type = tablet_meta()->preferred_rowset_type();
// Alpha Rowset will be removed in the future, so that if the tablet's
default rowset type is
- // alpah rowset, then set the newly created rowset to storage engine's
default rowset.
+ // alpha rowset, then set the newly created rowset to storage engine's
default rowset.
if (context.rowset_type == ALPHA_ROWSET) {
context.rowset_type = _engine.default_rowset_type();
}
@@ -3132,7 +3039,7 @@ Status Tablet::update_delete_bitmap_without_lock(const
RowsetSharedPtr& rowset)
return Status::OK();
}
RowsetIdUnorderedSet cur_rowset_ids;
- RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
+ RETURN_IF_ERROR(get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids));
DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(tablet_id());
RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments,
delete_bitmap));
@@ -3181,7 +3088,7 @@ Status Tablet::commit_phase_update_delete_bitmap(
{
std::shared_lock meta_rlock(_meta_lock);
cur_version = max_version_unlocked().second;
- RETURN_IF_ERROR(all_rs_id(cur_version, &cur_rowset_ids));
+ RETURN_IF_ERROR(get_all_rs_id_unlocked(cur_version, &cur_rowset_ids));
_rowset_ids_difference(cur_rowset_ids, pre_rowset_ids,
&rowset_ids_to_add,
&rowset_ids_to_del);
specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
@@ -3227,7 +3134,7 @@ Status Tablet::update_delete_bitmap(const
RowsetSharedPtr& rowset,
<< tablet_id();
return Status::OK();
}
- RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
+ RETURN_IF_ERROR(get_all_rs_id_unlocked(cur_version - 1,
&cur_rowset_ids));
}
auto t2 = watch.get_elapse_time_us();
@@ -3394,27 +3301,6 @@ Status Tablet::check_rowid_conversion(
return Status::OK();
}
-Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet*
rowset_ids) const {
- // Ensure that the obtained versions of rowsets are continuous
- std::vector<Version> version_path;
- RETURN_IF_ERROR(capture_consistent_versions_unlocked(Version(0,
max_version), &version_path,
- false, false));
- for (auto& ver : version_path) {
- if (ver.second == 1) {
- // [0-1] rowset is empty for each tablet, skip it
- continue;
- }
- auto it = _rs_version_map.find(ver);
- if (it == _rs_version_map.end()) {
- return Status::Error<CAPTURE_ROWSET_ERROR, false>(
- "fail to find Rowset for version. tablet={}, version={}",
tablet_id(),
- ver.to_string());
- }
- rowset_ids->emplace(it->second->rowset_id());
- }
- return Status::OK();
-}
-
bool Tablet::check_all_rowset_segment() {
std::shared_lock rdlock(_meta_lock);
for (auto& version_rowset : _rs_version_map) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index d953d8fce4f..e17c8b8ff85 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -149,14 +149,6 @@ public:
Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
std::vector<RowsetSharedPtr>& to_delete, bool
check_delete = false);
- // _rs_version_map and _stale_rs_version_map should be protected by
_meta_lock
- // The caller must call hold _meta_lock when call this two function.
- const RowsetSharedPtr get_rowset_by_version(const Version& version,
- bool find_is_stale = false)
const;
- const RowsetSharedPtr get_stale_rowset_by_version(const Version& version)
const;
-
- const RowsetSharedPtr rowset_with_max_version() const;
-
static TabletSchemaSPtr tablet_schema_with_merged_max_schema_version(
const std::vector<RowsetMetaSharedPtr>& rowset_metas);
@@ -170,9 +162,9 @@ public:
// Given spec_version, find a continuous version path and store it in
version_path.
// If quiet is true, then only "does this path exist" is returned.
// If skip_missing_version is true, return ok even there are missing
versions.
- Status capture_consistent_versions_unlocked(const Version& spec_version,
- std::vector<Version>*
version_path,
+ Status capture_consistent_versions_unlocked(const Version& spec_version,
Versions* version_path,
bool skip_missing_version,
bool quiet) const;
+
// if quiet is true, no error log will be printed if there are missing
versions
Status check_version_integrity(const Version& version, bool quiet = false);
bool check_version_exist(const Version& version) const;
@@ -204,11 +196,6 @@ public:
CompactionType compaction_type,
std::shared_ptr<CumulativeCompactionPolicy>
cumulative_compaction_policy);
- // operation for clone
- void calc_missed_versions(int64_t spec_version, std::vector<Version>*
missed_versions);
- void calc_missed_versions_unlocked(int64_t spec_version,
- std::vector<Version>* missed_versions)
const;
-
// This function to find max continuous version from the beginning.
// For example: If there are 1, 2, 3, 5, 6, 7 versions belongs tablet,
then 3 is target.
// 3 will be saved in "version", and 7 will be saved in "max_version", if
max_version != nullptr
@@ -483,7 +470,6 @@ public:
RowsetSharedPtr dst_rowset,
const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation,
RowLocation>>>&
location_map);
- Status all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids)
const;
void sort_block(vectorized::Block& in_block, vectorized::Block&
output_block);
bool check_all_rowset_segment();
@@ -538,7 +524,7 @@ public:
int64_t get_table_id() { return _tablet_meta->table_id(); }
- // binlog releated functions
+ // binlog related functions
bool is_enable_binlog();
bool is_binlog_enabled() { return
_tablet_meta->binlog_config().is_enable(); }
int64_t binlog_ttl_ms() const { return
_tablet_meta->binlog_config().ttl_seconds(); }
@@ -560,7 +546,6 @@ public:
private:
Status _init_once_action();
- void _print_missed_versions(const std::vector<Version>& missed_versions)
const;
bool _contains_rowset(const RowsetId rowset_id);
Status _contains_version(const Version& version);
@@ -758,7 +743,7 @@ inline Version Tablet::max_version() const {
inline uint64_t Tablet::segment_count() const {
std::shared_lock rdlock(_meta_lock);
uint64_t segment_nums = 0;
- for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
+ for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
segment_nums += rs_meta->num_segments();
}
return segment_nums;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 19cfc6fa964..df9a45b03c8 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -146,8 +146,8 @@ Status TabletManager::_add_tablet_unlocked(TTabletId
tablet_id, const TabletShar
int32_t old_version, new_version;
{
std::shared_lock rdlock(existed_tablet->get_header_lock());
- const RowsetSharedPtr old_rowset =
existed_tablet->rowset_with_max_version();
- const RowsetSharedPtr new_rowset = tablet->rowset_with_max_version();
+ const RowsetSharedPtr old_rowset =
existed_tablet->get_rowset_with_max_version();
+ const RowsetSharedPtr new_rowset =
tablet->get_rowset_with_max_version();
// If new tablet is empty, it is a newly created schema change tablet.
// the old tablet is dropped before add tablet. it should not exist
old tablet
if (new_rowset == nullptr) {
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 64bf7d4c198..2d131afa870 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -295,6 +295,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t
partition_id, int64_t tablet_id
TabletMeta::TabletMeta(const TabletMeta& b)
: _table_id(b._table_id),
+ _index_id(b._index_id),
_partition_id(b._partition_id),
_tablet_id(b._tablet_id),
_replica_id(b._replica_id),
@@ -500,6 +501,7 @@ Status TabletMeta::deserialize(const string& meta_binary) {
void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
_table_id = tablet_meta_pb.table_id();
+ _index_id = tablet_meta_pb.index_id();
_partition_id = tablet_meta_pb.partition_id();
_tablet_id = tablet_meta_pb.tablet_id();
_replica_id = tablet_meta_pb.replica_id();
@@ -614,6 +616,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB&
tablet_meta_pb) {
void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
tablet_meta_pb->set_table_id(table_id());
+ tablet_meta_pb->set_index_id(index_id());
tablet_meta_pb->set_partition_id(partition_id());
tablet_meta_pb->set_tablet_id(tablet_id());
tablet_meta_pb->set_replica_id(replica_id());
@@ -857,6 +860,7 @@ Status TabletMeta::set_partition_id(int64_t partition_id) {
bool operator==(const TabletMeta& a, const TabletMeta& b) {
if (a._table_id != b._table_id) return false;
+ if (a._index_id != b._index_id) return false;
if (a._partition_id != b._partition_id) return false;
if (a._tablet_id != b._tablet_id) return false;
if (a._replica_id != b._replica_id) return false;
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 094bb21507d..d5d1322d341 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -141,6 +141,7 @@ public:
TabletUid tablet_uid() const;
void set_tablet_uid(TabletUid uid) { _tablet_uid = uid; }
int64_t table_id() const;
+ int64_t index_id() const;
int64_t partition_id() const;
int64_t tablet_id() const;
int64_t replica_id() const;
@@ -272,6 +273,7 @@ private:
private:
int64_t _table_id = 0;
+ int64_t _index_id = 0;
int64_t _partition_id = 0;
int64_t _tablet_id = 0;
int64_t _replica_id = 0;
@@ -526,6 +528,10 @@ inline int64_t TabletMeta::table_id() const {
return _table_id;
}
+inline int64_t TabletMeta::index_id() const {
+ return _index_id;
+}
+
inline int64_t TabletMeta::partition_id() const {
return _partition_id;
}
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index d8c7a54cb74..8d1d9c966ba 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -188,7 +188,7 @@ Status EngineCloneTask::_do_clone() {
}
bool is_new_tablet = tablet == nullptr;
// try to incremental clone
- std::vector<Version> missed_versions;
+ Versions missed_versions;
// try to repair a tablet with missing version
if (tablet != nullptr) {
std::shared_lock migration_rlock(tablet->get_migration_lock(),
std::try_to_lock);
@@ -218,7 +218,7 @@ Status EngineCloneTask::_do_clone() {
}
}
- tablet->calc_missed_versions(specified_version, &missed_versions);
+ missed_versions = tablet->get_missed_versions(specified_version);
// if missed version size is 0, then it is useless to clone from
remote be, it means local data is
// completed. Or remote be will just return header not the rowset
files. clone will failed.
@@ -740,8 +740,7 @@ Status EngineCloneTask::_finish_incremental_clone(Tablet*
tablet,
/// Get missing versions again from local tablet.
/// We got it before outside the lock, so it has to be got again.
- std::vector<Version> missed_versions;
- tablet->calc_missed_versions_unlocked(version, &missed_versions);
+ Versions missed_versions = tablet->get_missed_versions_unlocked(version);
VLOG_NOTICE << "get missed versions again when finish incremental clone. "
<< "tablet=" << tablet->tablet_id() << ", clone version=" <<
version
<< ", missed_versions_size=" << missed_versions.size();
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
index efa85406c69..37857efe4df 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -69,7 +69,7 @@ Status EngineStorageMigrationTask::_get_versions(int32_t
start_version, int32_t*
return Status::NotSupported(
"currently not support migrate tablet with cooldowned remote
data");
}
- const RowsetSharedPtr last_version = _tablet->rowset_with_max_version();
+ const RowsetSharedPtr last_version =
_tablet->get_rowset_with_max_version();
if (last_version == nullptr) {
return Status::InternalError("failed to get rowset with max version,
tablet={}",
_tablet->tablet_id());
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index c48ebf98a48..4e6553975fa 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -658,8 +658,8 @@ TEST_F(TestDeltaWriter, vec_write) {
std::cout << "before publish, tablet row nums:" << tablet->num_rows() <<
std::endl;
OlapMeta* meta = tablet->data_dir()->get_meta();
Version version;
- version.first = tablet->rowset_with_max_version()->end_version() + 1;
- version.second = tablet->rowset_with_max_version()->end_version() + 1;
+ version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
+ version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
std::cout << "start to add rowset version:" << version.first << "-" <<
version.second
<< std::endl;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
@@ -751,8 +751,8 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
std::cout << "before publish, tablet row nums:" << tablet->num_rows() <<
std::endl;
OlapMeta* meta = tablet->data_dir()->get_meta();
Version version;
- version.first = tablet->rowset_with_max_version()->end_version() + 1;
- version.second = tablet->rowset_with_max_version()->end_version() + 1;
+ version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
+ version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
std::cout << "start to add rowset version:" << version.first << "-" <<
version.second
<< std::endl;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
@@ -899,8 +899,8 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
// publish version on delta writer 1 success
{
Version version;
- version.first = tablet->rowset_with_max_version()->end_version() + 1;
- version.second = tablet->rowset_with_max_version()->end_version() + 1;
+ version.first = tablet->get_rowset_with_max_version()->end_version() +
1;
+ version.second = tablet->get_rowset_with_max_version()->end_version()
+ 1;
std::cout << "start to add rowset version:" << version.first << "-" <<
version.second
<< std::endl;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
@@ -950,8 +950,8 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
ASSERT_TRUE(res.ok());
Version version;
- version.first = tablet->rowset_with_max_version()->end_version() + 1;
- version.second = tablet->rowset_with_max_version()->end_version() + 1;
+ version.first = tablet->get_rowset_with_max_version()->end_version() +
1;
+ version.second = tablet->get_rowset_with_max_version()->end_version()
+ 1;
std::cout << "start to add rowset version:" << version.first << "-" <<
version.second
<< std::endl;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
@@ -982,7 +982,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
ASSERT_EQ(1, segments.size());
}
- auto cur_version = tablet->rowset_with_max_version()->end_version();
+ auto cur_version = tablet->get_rowset_with_max_version()->end_version();
// read data from rowset 1, verify the data correct
{
OlapReaderStatistics stats;
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp
b/be/test/olap/engine_storage_migration_task_test.cpp
index c65340522c6..46b4f18c28e 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -218,8 +218,8 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration)
{
TabletSharedPtr tablet =
k_engine->tablet_manager()->get_tablet(write_req.tablet_id);
OlapMeta* meta = tablet->data_dir()->get_meta();
Version version;
- version.first = tablet->rowset_with_max_version()->end_version() + 1;
- version.second = tablet->rowset_with_max_version()->end_version() + 1;
+ version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
+ version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
write_req.txn_id, write_req.partition_id, &tablet_related_rs);
diff --git a/be/test/olap/remote_rowset_gc_test.cpp
b/be/test/olap/remote_rowset_gc_test.cpp
index fe8ab3b0f10..19b02dca9c4 100644
--- a/be/test/olap/remote_rowset_gc_test.cpp
+++ b/be/test/olap/remote_rowset_gc_test.cpp
@@ -211,8 +211,8 @@ TEST_F(RemoteRowsetGcTest, normal) {
k_engine->tablet_manager()->get_tablet(write_req.tablet_id,
write_req.schema_hash);
OlapMeta* meta = tablet->data_dir()->get_meta();
Version version;
- version.first = tablet->rowset_with_max_version()->end_version() + 1;
- version.second = tablet->rowset_with_max_version()->end_version() + 1;
+ version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
+ version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
write_req.txn_id, write_req.partition_id, &tablet_related_rs);
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index d83ea3eb016..5f307611abe 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -384,8 +384,8 @@ void createTablet(TabletSharedPtr* tablet, int64_t
replica_id, int32_t schema_ha
*tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id,
write_req.schema_hash);
OlapMeta* meta = (*tablet)->data_dir()->get_meta();
Version version;
- version.first = (*tablet)->rowset_with_max_version()->end_version() + 1;
- version.second = (*tablet)->rowset_with_max_version()->end_version() + 1;
+ version.first = (*tablet)->get_rowset_with_max_version()->end_version() +
1;
+ version.second = (*tablet)->get_rowset_with_max_version()->end_version() +
1;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
k_engine->txn_manager()->get_txn_related_tablets(write_req.txn_id,
write_req.partition_id,
&tablet_related_rs);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]