This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0e477b6c8dc [opt](rowset) Remote fetch rowsets to avoid -230 error
when capturing rowsets (#52440)
0e477b6c8dc is described below
commit 0e477b6c8dc5ddf699bc86c32d9ab646871a0f40
Author: Siyang Tang <[email protected]>
AuthorDate: Mon Jul 7 11:04:12 2025 +0800
[opt](rowset) Remote fetch rowsets to avoid -230 error when capturing
rowsets (#52440)
### What problem does this PR solve?
Problem Summary:
Make stale rowsets accessible across BEs to avoid E-230 (versions
already merged) in Read-Write Splitting senario.
---
be/src/cloud/cloud_base_compaction.cpp | 2 +-
be/src/cloud/cloud_cumulative_compaction.cpp | 10 +-
be/src/cloud/cloud_full_compaction.cpp | 10 +-
be/src/cloud/cloud_meta_mgr.cpp | 2 +-
be/src/cloud/cloud_schema_change_job.cpp | 26 +-
be/src/cloud/cloud_tablet.cpp | 45 +-
be/src/cloud/cloud_tablet.h | 3 -
be/src/cloud/cloud_tablet_mgr.cpp | 20 +-
be/src/cloud/cloud_tablet_mgr.h | 2 +-
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/http/action/delete_bitmap_action.cpp | 6 +-
be/src/olap/base_tablet.cpp | 47 +--
be/src/olap/base_tablet.h | 43 +-
be/src/olap/compaction.cpp | 6 +-
be/src/olap/data_dir.cpp | 6 +-
be/src/olap/full_compaction.cpp | 8 +-
be/src/olap/merger.cpp | 8 +-
be/src/olap/parallel_scanner_builder.cpp | 21 +-
be/src/olap/parallel_scanner_builder.h | 8 +-
be/src/olap/rowset/rowset_reader_context.h | 2 +-
be/src/olap/rowset_version_mgr.cpp | 452 +++++++++++++++++++++
be/src/olap/schema_change.cpp | 22 +-
be/src/olap/snapshot_manager.cpp | 21 +-
be/src/olap/tablet.cpp | 94 +----
be/src/olap/tablet.h | 9 -
be/src/olap/tablet_manager.cpp | 2 +-
be/src/olap/tablet_meta.cpp | 54 ++-
be/src/olap/tablet_meta.h | 14 +-
be/src/olap/tablet_reader.cpp | 15 +-
be/src/olap/tablet_reader.h | 14 +-
be/src/olap/task/engine_checksum_task.cpp | 13 +-
be/src/olap/task/engine_clone_task.cpp | 2 +-
be/src/olap/task/engine_storage_migration_task.cpp | 10 +-
be/src/olap/task/index_builder.cpp | 4 +-
be/src/pipeline/exec/olap_scan_operator.cpp | 9 +-
be/src/pipeline/exec/olap_scan_operator.h | 2 +-
be/src/service/internal_service.cpp | 67 +++
be/src/service/internal_service.h | 5 +
be/src/vec/exec/scan/new_olap_scanner.cpp | 24 +-
be/src/vec/exec/scan/new_olap_scanner.h | 2 +-
be/test/olap/delta_writer_test.cpp | 4 +-
be/test/olap/segcompaction_mow_test.cpp | 2 +-
be/test/olap/tablet_meta_test.cpp | 18 +-
be/test/olap/test_data/rowset_meta.json | 4 +
.../apache/doris/cloud/catalog/CloudReplica.java | 16 +
.../apache/doris/service/FrontendServiceImpl.java | 28 +-
gensrc/proto/internal_service.proto | 16 +
.../cloud/test_cloud_version_already_merged.out | Bin 0 -> 199 bytes
regression-test/pipeline/p0/conf/be.conf | 2 +
.../cloud/test_cloud_version_already_merged.groovy | 126 ++++++
51 files changed, 1010 insertions(+), 318 deletions(-)
diff --git a/be/src/cloud/cloud_base_compaction.cpp
b/be/src/cloud/cloud_base_compaction.cpp
index 3b861e1c944..a1f57d8ce3d 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -419,7 +419,7 @@ Status CloudBaseCompaction::modify_rowsets() {
// the tablet to be unable to synchronize the rowset meta changes
generated by cumu compaction.
cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt());
if (output_rowset_delete_bitmap) {
-
_tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap);
+
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
}
if (stats.cumulative_compaction_cnt() >=
cloud_tablet()->cumulative_compaction_cnt()) {
cloud_tablet()->reset_approximate_stats(stats.num_rowsets(),
stats.num_segments(),
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp
b/be/src/cloud/cloud_cumulative_compaction.cpp
index 39954097324..71234470298 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -376,7 +376,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
cloud_tablet()->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
if (output_rowset_delete_bitmap) {
-
_tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap);
+
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
}
if (stats.base_compaction_cnt() >=
cloud_tablet()->base_compaction_cnt()) {
cloud_tablet()->reset_approximate_stats(stats.num_rowsets(),
stats.num_segments(),
@@ -416,7 +416,7 @@ Status
CloudCumulativeCompaction::process_old_version_delete_bitmap() {
rowset->rowset_id().to_string();
DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0};
DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id,
pre_max_version};
- auto d = _tablet->tablet_meta()->delete_bitmap().get_agg(
+ auto d = _tablet->tablet_meta()->delete_bitmap()->get_agg(
{rowset->rowset_id(), seg_id, pre_max_version});
to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end));
if (d->isEmpty()) {
@@ -440,10 +440,10 @@ Status
CloudCumulativeCompaction::process_old_version_delete_bitmap() {
_input_rowsets.back()->end_version());
for (auto it = new_delete_bitmap->delete_bitmap.begin();
it != new_delete_bitmap->delete_bitmap.end(); it++) {
- _tablet->tablet_meta()->delete_bitmap().set(it->first,
it->second);
+ _tablet->tablet_meta()->delete_bitmap()->set(it->first,
it->second);
}
-
_tablet->tablet_meta()->delete_bitmap().add_to_remove_queue(version.to_string(),
-
to_remove_vec);
+
_tablet->tablet_meta()->delete_bitmap()->add_to_remove_queue(version.to_string(),
+
to_remove_vec);
DBUG_EXECUTE_IF(
"CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets",
{
static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets(); });
diff --git a/be/src/cloud/cloud_full_compaction.cpp
b/be/src/cloud/cloud_full_compaction.cpp
index 6bfab2ec698..7358f6d1915 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -26,6 +26,7 @@
#include "common/status.h"
#include "cpp/sync_point.h"
#include "gen_cpp/cloud.pb.h"
+#include "olap/base_tablet.h"
#include "olap/compaction.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/tablet_meta.h"
@@ -273,7 +274,7 @@ Status CloudFullCompaction::modify_rowsets() {
cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt());
cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
if (output_rowset_delete_bitmap) {
-
_tablet->tablet_meta()->delete_bitmap().merge(*output_rowset_delete_bitmap);
+
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
}
if (stats.cumulative_compaction_cnt() >=
cloud_tablet()->cumulative_compaction_cnt()) {
cloud_tablet()->reset_approximate_stats(stats.num_rowsets(),
stats.num_segments(),
@@ -340,8 +341,9 @@ Status
CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
int64_t max_version = cloud_tablet()->max_version().second;
DCHECK(max_version >= _output_rowset->version().second);
if (max_version > _output_rowset->version().second) {
- RETURN_IF_ERROR(cloud_tablet()->capture_consistent_rowsets_unlocked(
- {_output_rowset->version().second + 1, max_version},
&tmp_rowsets));
+ auto ret =
DORIS_TRY(cloud_tablet()->capture_consistent_rowsets_unlocked(
+ {_output_rowset->version().second + 1, max_version},
CaptureRowsetOps {}));
+ tmp_rowsets = std::move(ret.rowsets);
}
for (const auto& it : tmp_rowsets) {
int64_t cur_version = it->rowset_meta()->start_version();
@@ -372,7 +374,7 @@ Status
CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
.tag("input_segments", _input_segments)
.tag("input_rowsets_total_size", _input_rowsets_total_size)
.tag("update_bitmap_size", delete_bitmap->delete_bitmap.size());
- _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap);
+ _tablet->tablet_meta()->delete_bitmap()->merge(*delete_bitmap);
return Status::OK();
}
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index dff85b3f639..684d89cb01b 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -577,7 +577,7 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
.error(st);
return st;
}
- tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap);
+ tablet->tablet_meta()->delete_bitmap()->merge(delete_bitmap);
if (config::enable_mow_verbose_log && !resp.rowset_meta().empty()
&&
delete_bitmap.cardinality() > 0) {
std::vector<std::string> new_rowset_msgs;
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index a5707e51bb6..405dcbe1a0d 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -27,6 +27,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet_mgr.h"
#include "common/status.h"
+#include "olap/base_tablet.h"
#include "olap/delete_handler.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
@@ -186,7 +187,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const
TAlterTabletReqV2& reque
reader_context.sequence_id_idx =
reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS;
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
- reader_context.delete_bitmap =
&_base_tablet->tablet_meta()->delete_bitmap();
+ reader_context.delete_bitmap =
_base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, start_resp.alter_version());
for (auto& split : rs_splits) {
@@ -457,7 +458,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t
alter_version,
.tag("alter_version", alter_version);
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet,
initiator));
TabletMetaSharedPtr tmp_meta =
std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
- tmp_meta->delete_bitmap().delete_bitmap.clear();
+ tmp_meta->delete_bitmap()->delete_bitmap.clear();
std::shared_ptr<CloudTablet> tmp_tablet =
std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
{
@@ -466,22 +467,21 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
}
// step 1, process incremental rowset without delete bitmap update lock
- std::vector<RowsetSharedPtr> incremental_rowsets;
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
int64_t max_version = tmp_tablet->max_version().second;
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets without lock, version: " <<
start_calc_delete_bitmap_version
<< "-" << max_version << " new_table_id: " <<
_new_tablet->tablet_id();
if (max_version >= start_calc_delete_bitmap_version) {
- RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
- {start_calc_delete_bitmap_version, max_version},
&incremental_rowsets));
+ auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
+ {start_calc_delete_bitmap_version, max_version},
CaptureRowsetOps {}));
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
DBUG_BLOCK);
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
}
- for (auto rowset : incremental_rowsets) {
+ for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet,
rowset));
}
}
@@ -497,15 +497,14 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets with lock, version: " << max_version + 1
<< "-"
<< new_max_version << " new_tablet_id: " <<
_new_tablet->tablet_id();
- std::vector<RowsetSharedPtr> new_incremental_rowsets;
if (new_max_version > max_version) {
- RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
- {max_version + 1, new_max_version}, &new_incremental_rowsets));
+ auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
+ {max_version + 1, new_max_version}, CaptureRowsetOps {}));
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
}
- for (auto rowset : new_incremental_rowsets) {
+ for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet,
rowset));
}
}
@@ -522,13 +521,14 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
}
});
- auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap();
+ auto delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap();
// step4, store delete bitmap
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap(
- *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator,
&delete_bitmap));
+ *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator,
delete_bitmap.get()));
- _new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap;
+ auto original_dbm = _new_tablet->tablet_meta()->delete_bitmap();
+ *original_dbm = std::move(*delete_bitmap);
return Status::OK();
}
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 78663de6ed8..85b60e1588a 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -37,6 +37,7 @@
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
+#include "olap/base_tablet.h"
#include "olap/compaction.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_define.h"
@@ -69,23 +70,6 @@ bool CloudTablet::exceed_version_limit(int32_t limit) {
return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
}
-Status CloudTablet::capture_consistent_rowsets_unlocked(
- const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets)
const {
- Versions version_path;
- auto st =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
&version_path);
- if (!st.ok()) {
- // Check no missed versions or req version is merged
- auto missed_versions = get_missed_versions(spec_version.second);
- if (missed_versions.empty()) {
- st.set_code(VERSION_ALREADY_MERGED); // Reset error code
- }
- st.append(" tablet_id=" + std::to_string(tablet_id()));
- return st;
- }
- VLOG_DEBUG << "capture consitent versions: " << version_path;
- return _capture_consistent_rowsets_unlocked(version_path, rowsets);
-}
-
std::string CloudTablet::tablet_path() const {
return "";
}
@@ -97,25 +81,10 @@ Status CloudTablet::capture_rs_readers(const Version&
spec_version,
LOG_WARNING("CloudTablet.capture_rs_readers.return
e-230").tag("tablet_id", tablet_id());
return Status::Error<false>(-230, "injected error");
});
- Versions version_path;
std::shared_lock rlock(_meta_lock);
- auto st =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
&version_path);
- if (!st.ok()) {
- rlock.unlock(); // avoid logging in lock range
- // Check no missed versions or req version is merged
- auto missed_versions = get_missed_versions(spec_version.second);
- if (missed_versions.empty()) {
- st.set_code(VERSION_ALREADY_MERGED); // Reset error code
- st.append(" versions are already compacted, ");
- }
- st.append(" tablet_id=" + std::to_string(tablet_id()));
- // clang-format off
- LOG(WARNING) << st << '\n' << [this]() { std::string json;
get_compaction_status(&json); return json; }();
- // clang-format on
- return st;
- }
- VLOG_DEBUG << "capture consitent versions: " << version_path;
- return capture_rs_readers_unlocked(version_path, rs_splits);
+ *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
+ spec_version, CaptureRowsetOps {.skip_missing_versions =
skip_missing_version}));
+ return Status::OK();
}
Status CloudTablet::merge_rowsets_schema() {
@@ -469,7 +438,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
}
_reconstruct_version_tracker_if_necessary();
}
-
_tablet_meta->delete_bitmap().remove_stale_delete_bitmap_from_queue(version_to_delete);
+
_tablet_meta->delete_bitmap()->remove_stale_delete_bitmap_from_queue(version_to_delete);
recycle_cached_data(expired_rowsets);
if (config::enable_mow_verbose_log) {
LOG_INFO("finish delete_expired_stale_rowset for tablet={}",
tablet_id());
@@ -977,7 +946,7 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
std::size_t missed_rows_size = 0;
calc_compaction_output_rowset_delete_bitmap(
input_rowsets, rowid_conversion, 0, version.second + 1,
missed_rows.get(),
- location_map.get(), tablet_meta()->delete_bitmap(),
output_rowset_delete_bitmap.get());
+ location_map.get(), *tablet_meta()->delete_bitmap(),
output_rowset_delete_bitmap.get());
if (missed_rows) {
missed_rows_size = missed_rows->size();
if (!allow_delete_in_cumu_compaction) {
@@ -1013,7 +982,7 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
calc_compaction_output_rowset_delete_bitmap(
input_rowsets, rowid_conversion, version.second, UINT64_MAX,
missed_rows.get(),
- location_map.get(), tablet_meta()->delete_bitmap(),
output_rowset_delete_bitmap.get());
+ location_map.get(), *tablet_meta()->delete_bitmap(),
output_rowset_delete_bitmap.get());
int64_t t4 = MonotonicMicros();
if (location_map) {
RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 2dd1d3c4425..dc357eb7249 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -55,9 +55,6 @@ public:
Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) override;
- Status capture_consistent_rowsets_unlocked(
- const Version& spec_version, std::vector<RowsetSharedPtr>*
rowsets) const override;
-
size_t tablet_footprint() override {
return _approximate_data_size.load(std::memory_order_relaxed);
}
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp
b/be/src/cloud/cloud_tablet_mgr.cpp
index 25f50f7ef4b..1f155635c65 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -25,6 +25,7 @@
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
+#include "common/logging.h"
#include "common/status.h"
#include "olap/lru_cache.h"
#include "runtime/memory/cache_policy.h"
@@ -158,7 +159,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) {
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t
tablet_id, bool warmup_data,
bool
sync_delete_bitmap,
SyncRowsetStats* sync_stats,
- bool
force_use_cache) {
+ bool
local_only) {
// LRU value type. `Value`'s lifetime MUST NOT be longer than
`CloudTabletMgr`
class Value : public LRUCacheValueBase {
public:
@@ -176,12 +177,17 @@ Result<std::shared_ptr<CloudTablet>>
CloudTabletMgr::get_tablet(int64_t tablet_i
CacheKey key(tablet_id_str);
auto* handle = _cache->lookup(key);
- if (handle == nullptr && force_use_cache) {
- return ResultError(
- Status::InternalError("failed to get cloud tablet from cache
{}", tablet_id));
- }
-
if (handle == nullptr) {
+ if (local_only) {
+ LOG(INFO) << "tablet=" << tablet_id
+ << "does not exists in local tablet cache, because param
local_only=true, "
+ "treat it as an error";
+ return ResultError(Status::InternalError(
+ "tablet={} does not exists in local tablet cache, because
param "
+ "local_only=true, "
+ "treat it as an error",
+ tablet_id));
+ }
if (sync_stats) {
++sync_stats->tablet_meta_cache_miss;
}
@@ -485,7 +491,7 @@ void CloudTabletMgr::get_topn_tablet_delete_bitmap_score(
auto t = tablet_wk.lock();
if (!t) return;
uint64_t delete_bitmap_count =
-
t.get()->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
+
t.get()->tablet_meta()->delete_bitmap()->get_delete_bitmap_count();
total_delete_map_count += delete_bitmap_count;
if (delete_bitmap_count > *max_delete_bitmap_score) {
max_delete_bitmap_score_tablet_id = t->tablet_id();
diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h
index a1ce6d2b8cf..2e0ea79f424 100644
--- a/be/src/cloud/cloud_tablet_mgr.h
+++ b/be/src/cloud/cloud_tablet_mgr.h
@@ -47,7 +47,7 @@ public:
Result<std::shared_ptr<CloudTablet>> get_tablet(int64_t tablet_id, bool
warmup_data = false,
bool sync_delete_bitmap =
true,
SyncRowsetStats*
sync_stats = nullptr,
- bool force_use_cache =
false);
+ bool local_only = false);
void erase_tablet(int64_t tablet_id);
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0f9a60aec88..dc81ca48373 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1515,6 +1515,7 @@ DEFINE_mBool(enable_compaction_pause_on_high_memory,
"true");
DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false");
+DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, "false");
// the max length of segments key bounds, in bytes
// ATTENTION: as long as this conf has ever been enabled, cluster downgrade
and backup recovery will no longer be supported.
DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index af916f1f7bd..a1820494725 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1589,6 +1589,7 @@ DECLARE_mBool(enable_compaction_pause_on_high_memory);
DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently);
+DECLARE_mBool(enable_fetch_rowsets_from_peer_replicas);
// the max length of segments key bounds, in bytes
// ATTENTION: as long as this conf has ever been enabled, cluster downgrade
and backup recovery will no longer be supported.
DECLARE_mInt32(segments_key_bounds_truncation_threshold);
diff --git a/be/src/http/action/delete_bitmap_action.cpp
b/be/src/http/action/delete_bitmap_action.cpp
index 59783d1c055..5fc4d0f4388 100644
--- a/be/src/http/action/delete_bitmap_action.cpp
+++ b/be/src/http/action/delete_bitmap_action.cpp
@@ -159,7 +159,7 @@ Status
DeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* r
if (tablet == nullptr) {
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
}
- auto dm = tablet->tablet_meta()->delete_bitmap().snapshot();
+ auto dm = tablet->tablet_meta()->delete_bitmap()->snapshot();
_show_delete_bitmap(dm, verbose, json_result);
return Status::OK();
}
@@ -183,7 +183,7 @@ Status
DeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st;
return st;
}
- auto dm = tablet->tablet_meta()->delete_bitmap().snapshot();
+ auto dm = tablet->tablet_meta()->delete_bitmap()->snapshot();
_show_delete_bitmap(dm, verbose, json_result);
return Status::OK();
}
@@ -210,4 +210,4 @@ void DeleteBitmapAction::handle(HttpRequest* req) {
}
#include "common/compile_check_end.h"
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 1aa74582d84..56ffc2d100e 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -516,7 +516,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key,
TabletSchema* latest
if (!s.ok() && !s.is<KEY_ALREADY_EXISTS>()) {
return s;
}
- if (s.ok() &&
_tablet_meta->delete_bitmap().contains_agg_without_cache(
+ if (s.ok() &&
_tablet_meta->delete_bitmap()->contains_agg_without_cache(
{loc.rowset_id, loc.segment_id, version},
loc.row_id)) {
// if has sequence col, we continue to compare the sequence_id
of
// all rowsets, util we find an existing key.
@@ -1114,37 +1114,6 @@ void BaseTablet::_rowset_ids_difference(const
RowsetIdUnorderedSet& cur,
}
}
-Status BaseTablet::_capture_consistent_rowsets_unlocked(
- const std::vector<Version>& version_path,
std::vector<RowsetSharedPtr>* rowsets) const {
- DCHECK(rowsets != nullptr);
- rowsets->reserve(version_path.size());
- for (const auto& version : version_path) {
- bool is_find = false;
- do {
- auto it = _rs_version_map.find(version);
- if (it != _rs_version_map.end()) {
- is_find = true;
- rowsets->push_back(it->second);
- break;
- }
-
- auto it_expired = _stale_rs_version_map.find(version);
- if (it_expired != _stale_rs_version_map.end()) {
- is_find = true;
- rowsets->push_back(it_expired->second);
- break;
- }
- } while (false);
-
- if (!is_find) {
- return Status::Error<CAPTURE_ROWSET_ERROR>(
- "fail to find Rowset for version. tablet={}, version={}",
tablet_id(),
- version.to_string());
- }
- }
- return Status::OK();
-}
-
Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr
delete_bitmap,
int64_t max_version,
int64_t txn_id,
const RowsetIdUnorderedSet&
rowset_ids,
@@ -1564,7 +1533,7 @@ Status BaseTablet::update_delete_bitmap_without_lock(
delete_bitmap->remove_sentinel_marks();
}
for (auto& iter : delete_bitmap->delete_bitmap) {
- self->_tablet_meta->delete_bitmap().merge(
+ self->_tablet_meta->delete_bitmap()->merge(
{std::get<0>(iter.first), std::get<1>(iter.first),
cur_version}, iter.second);
}
@@ -1723,7 +1692,7 @@ void BaseTablet::get_base_rowset_delete_bitmap_count(
}
base_found = true;
uint64_t base_rowset_delete_bitmap_count =
- this->tablet_meta()->delete_bitmap().get_count_with_range(
+ this->tablet_meta()->delete_bitmap()->get_count_with_range(
{rowset->rowset_id(), 0, 0},
{rowset->rowset_id(), UINT32_MAX, UINT64_MAX});
if (base_rowset_delete_bitmap_count >
*max_base_rowset_delete_bitmap_score) {
@@ -1737,6 +1706,16 @@ void BaseTablet::get_base_rowset_delete_bitmap_count(
}
}
+void TabletReadSource::fill_delete_predicates() {
+ DCHECK_EQ(delete_predicates.size(), 0);
+ auto delete_pred_view =
+ rs_splits | std::views::transform([](auto&& split) {
+ return split.rs_reader->rowset()->rowset_meta();
+ }) |
+ std::views::filter([](const auto& rs_meta) { return
rs_meta->has_delete_predicate(); });
+ delete_predicates = {delete_pred_view.begin(), delete_pred_view.end()};
+}
+
int32_t BaseTablet::max_version_config() {
int32_t max_version = tablet_meta()->compaction_policy() ==
CUMULATIVE_TIME_SERIES_POLICY
? config::time_series_max_tablet_version_num
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 9dd69d0bd9a..6c797e0478b 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -42,6 +42,9 @@ class SegmentCacheHandle;
class RowIdConversion;
struct PartialUpdateInfo;
class PartialUpdateReadPlan;
+struct CaptureRowsetOps;
+struct CaptureRowsetResult;
+struct TabletReadSource;
struct TabletWithVersion {
BaseTabletSPtr tablet;
@@ -107,9 +110,6 @@ public:
virtual Result<std::unique_ptr<RowsetWriter>>
create_rowset_writer(RowsetWriterContext& context,
bool
vertical) = 0;
- virtual Status capture_consistent_rowsets_unlocked(
- const Version& spec_version, std::vector<RowsetSharedPtr>*
rowsets) const = 0;
-
virtual Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) = 0;
@@ -309,6 +309,20 @@ public:
return Status::OK();
}
+ [[nodiscard]] Result<CaptureRowsetResult>
capture_consistent_rowsets_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options)
const;
+
+ [[nodiscard]] Result<std::vector<Version>>
capture_consistent_versions_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options)
const;
+
+ [[nodiscard]] Result<std::vector<RowSetSplits>>
capture_rs_readers_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options)
const;
+
+ [[nodiscard]] Result<TabletReadSource> capture_read_source(const Version&
version_range,
+ const
CaptureRowsetOps& options);
+
+ Result<CaptureRowsetResult> _remote_capture_rowsets(const Version&
version_range) const;
+
protected:
// Find the missed versions until the spec_version.
//
@@ -326,9 +340,6 @@ protected:
const RowsetIdUnorderedSet& pre,
RowsetIdUnorderedSet* to_add,
RowsetIdUnorderedSet* to_del);
- Status _capture_consistent_rowsets_unlocked(const std::vector<Version>&
version_path,
- std::vector<RowsetSharedPtr>*
rowsets) const;
-
Status sort_block(vectorized::Block& in_block, vectorized::Block&
output_block);
mutable std::shared_mutex _meta_lock;
@@ -369,4 +380,24 @@ public:
Status last_compaction_status = Status::OK();
};
+struct CaptureRowsetOps {
+ bool skip_missing_versions = false;
+ bool quiet = false;
+ bool include_stale_rowsets = true;
+ bool enable_fetch_rowsets_from_peers = false;
+};
+
+struct CaptureRowsetResult {
+ std::vector<RowsetSharedPtr> rowsets;
+ std::shared_ptr<DeleteBitmap> delete_bitmap;
+};
+
+struct TabletReadSource {
+ std::vector<RowSetSplits> rs_splits;
+ std::vector<RowsetMetaSharedPtr> delete_predicates;
+ std::shared_ptr<DeleteBitmap> delete_bitmap;
+ // Fill delete predicates with `rs_splits`
+ void fill_delete_predicates();
+};
+
} /* namespace doris */
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 7e312c5847f..77af2b30fe1 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1069,7 +1069,7 @@ Status CompactionMixin::modify_rowsets() {
std::size_t missed_rows_size = 0;
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, *_rowid_conversion, 0, version.second + 1,
missed_rows.get(),
- location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
+ location_map.get(), *_tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
if (missed_rows) {
missed_rows_size = missed_rows->size();
@@ -1112,7 +1112,7 @@ Status CompactionMixin::modify_rowsets() {
ss << ", debug info: ";
DeleteBitmap subset_map(_tablet->tablet_id());
for (auto rs : _input_rowsets) {
- _tablet->tablet_meta()->delete_bitmap().subset(
+ _tablet->tablet_meta()->delete_bitmap()->subset(
{rs->rowset_id(), 0, 0},
{rs->rowset_id(), rs->num_segments(),
version.second + 1},
&subset_map);
@@ -1187,7 +1187,7 @@ Status CompactionMixin::modify_rowsets() {
// incremental data.
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, *_rowid_conversion, version.second,
UINT64_MAX,
- missed_rows.get(), location_map.get(),
_tablet->tablet_meta()->delete_bitmap(),
+ missed_rows.get(), location_map.get(),
*_tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
if (missed_rows) {
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 5cb2a6105d9..6389f1ad133 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -635,14 +635,14 @@ Status DataDir::load() {
}
++dbm_cnt;
auto seg_id = delete_bitmap_pb.segment_ids(i);
- auto iter =
tablet->tablet_meta()->delete_bitmap().delete_bitmap.find(
+ auto iter =
tablet->tablet_meta()->delete_bitmap()->delete_bitmap.find(
{rst_id, seg_id, version});
// This version of delete bitmap already exists
- if (iter !=
tablet->tablet_meta()->delete_bitmap().delete_bitmap.end()) {
+ if (iter !=
tablet->tablet_meta()->delete_bitmap()->delete_bitmap.end()) {
continue;
}
auto bitmap = delete_bitmap_pb.segment_delete_bitmaps(i).data();
- tablet->tablet_meta()->delete_bitmap().delete_bitmap[{rst_id,
seg_id, version}] =
+ tablet->tablet_meta()->delete_bitmap()->delete_bitmap[{rst_id,
seg_id, version}] =
roaring::Roaring::read(bitmap);
}
return true;
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 06641e7a22c..7d3c497f2a2 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -27,6 +27,7 @@
#include "common/config.h"
#include "common/status.h"
+#include "olap/base_tablet.h"
#include "olap/compaction.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/olap_common.h"
@@ -143,8 +144,9 @@ Status FullCompaction::modify_rowsets() {
int64_t max_version = tablet()->max_version().second;
DCHECK(max_version >= _output_rowset->version().second);
if (max_version > _output_rowset->version().second) {
- RETURN_IF_ERROR(_tablet->capture_consistent_rowsets_unlocked(
- {_output_rowset->version().second + 1, max_version},
&tmp_rowsets));
+ auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked(
+ {_output_rowset->version().second + 1, max_version},
CaptureRowsetOps {}));
+ tmp_rowsets = std::move(ret.rowsets);
}
for (const auto& it : tmp_rowsets) {
@@ -219,7 +221,7 @@ Status
FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
for (const auto& [k, v] : delete_bitmap->delete_bitmap) {
if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) {
- _tablet->tablet_meta()->delete_bitmap().merge(
+ _tablet->tablet_meta()->delete_bitmap()->merge(
{std::get<0>(k), std::get<1>(k), cur_version}, v);
}
}
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 01f36d77dc2..c3f59872f62 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -68,7 +68,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet,
ReaderType reader_type,
reader_params.tablet = tablet;
reader_params.reader_type = reader_type;
- TabletReader::ReadSource read_source;
+ TabletReadSource read_source;
read_source.rs_splits.reserve(src_rowset_readers.size());
for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
read_source.rs_splits.emplace_back(rs_reader);
@@ -87,7 +87,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet,
ReaderType reader_type,
}
reader_params.tablet_schema = merge_tablet_schema;
if (!tablet->tablet_schema()->cluster_key_idxes().empty()) {
- reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
+ reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap();
}
if (stats_output && stats_output->rowid_conversion) {
@@ -249,7 +249,7 @@ Status Merger::vertical_compact_one_group(
reader_params.tablet = tablet;
reader_params.reader_type = reader_type;
- TabletReader::ReadSource read_source;
+ TabletReadSource read_source;
read_source.rs_splits.reserve(src_rowset_readers.size());
for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
read_source.rs_splits.emplace_back(rs_reader);
@@ -268,7 +268,7 @@ Status Merger::vertical_compact_one_group(
reader_params.tablet_schema = merge_tablet_schema;
if (!tablet->tablet_schema()->cluster_key_idxes().empty()) {
- reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
+ reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap();
}
if (is_key && stats_output && stats_output->rowid_conversion) {
diff --git a/be/src/olap/parallel_scanner_builder.cpp
b/be/src/olap/parallel_scanner_builder.cpp
index 103e6341d7c..f4cd210bbd2 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -57,7 +57,7 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
// `rs_splits` in `entire read source` will be devided into several
partitial read sources
// to build several parallel scanners, based on segment rows number.
All the partitial read sources
// share the same delete predicates from their corresponding entire
read source.
- TabletReader::ReadSource partitial_read_source;
+ TabletReadSource partitial_read_source;
int64_t rows_collected = 0;
for (auto& rs_split : entire_read_source.rs_splits) {
auto reader = rs_split.rs_reader;
@@ -106,10 +106,11 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
partitial_read_source.rs_splits.emplace_back(std::move(split));
- scanners.emplace_back(
- _build_scanner(tablet, version, _key_ranges,
-
{std::move(partitial_read_source.rs_splits),
-
entire_read_source.delete_predicates}));
+ scanners.emplace_back(_build_scanner(
+ tablet, version, _key_ranges,
+ {.rs_splits =
std::move(partitial_read_source.rs_splits),
+ .delete_predicates =
entire_read_source.delete_predicates,
+ .delete_bitmap =
entire_read_source.delete_bitmap}));
partitial_read_source = {};
split = RowSetSplits(reader->clone());
@@ -150,9 +151,11 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
split.segment_offsets.second -
split.segment_offsets.first);
}
#endif
- scanners.emplace_back(_build_scanner(tablet, version, _key_ranges,
-
{std::move(partitial_read_source.rs_splits),
-
entire_read_source.delete_predicates}));
+ scanners.emplace_back(
+ _build_scanner(tablet, version, _key_ranges,
+ {.rs_splits =
std::move(partitial_read_source.rs_splits),
+ .delete_predicates =
entire_read_source.delete_predicates,
+ .delete_bitmap =
entire_read_source.delete_bitmap}));
}
}
@@ -199,7 +202,7 @@ Status ParallelScannerBuilder::_load() {
std::shared_ptr<NewOlapScanner> ParallelScannerBuilder::_build_scanner(
BaseTabletSPtr tablet, int64_t version, const
std::vector<OlapScanRange*>& key_ranges,
- TabletReader::ReadSource&& read_source) {
+ TabletReadSource&& read_source) {
NewOlapScanner::Params params {_state, _scanner_profile.get(),
key_ranges, std::move(tablet),
version, std::move(read_source), _limit,
_is_preaggregation};
return NewOlapScanner::create_shared(_parent, std::move(params));
diff --git a/be/src/olap/parallel_scanner_builder.h
b/be/src/olap/parallel_scanner_builder.h
index 1f371e3129a..a746ff5ba5d 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -44,7 +44,7 @@ class ParallelScannerBuilder {
public:
ParallelScannerBuilder(pipeline::OlapScanLocalState* parent,
const std::vector<TabletWithVersion>& tablets,
- std::vector<TabletReader::ReadSource>& read_sources,
+ std::vector<TabletReadSource>& read_sources,
const std::shared_ptr<RuntimeProfile>& profile,
const std::vector<OlapScanRange*>& key_ranges,
RuntimeState* state,
int64_t limit, bool is_dup_mow_key, bool
is_preaggregation)
@@ -71,7 +71,7 @@ private:
std::shared_ptr<vectorized::NewOlapScanner> _build_scanner(
BaseTabletSPtr tablet, int64_t version, const
std::vector<OlapScanRange*>& key_ranges,
- TabletReader::ReadSource&& read_source);
+ TabletReadSource&& read_source);
pipeline::OlapScanLocalState* _parent;
@@ -94,8 +94,8 @@ private:
bool _is_preaggregation;
std::vector<TabletWithVersion> _tablets;
std::vector<OlapScanRange*> _key_ranges;
- std::unordered_map<int64_t, TabletReader::ReadSource> _all_read_sources;
- std::vector<TabletReader::ReadSource>& _read_sources;
+ std::unordered_map<int64_t, TabletReadSource> _all_read_sources;
+ std::vector<TabletReadSource>& _read_sources;
};
} // namespace doris
diff --git a/be/src/olap/rowset/rowset_reader_context.h
b/be/src/olap/rowset/rowset_reader_context.h
index fd3b4fed56f..2faed632349 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -74,7 +74,7 @@ struct RowsetReaderContext {
uint64_t* merged_rows = nullptr;
// for unique key merge on write
bool enable_unique_key_merge_on_write = false;
- const DeleteBitmap* delete_bitmap = nullptr;
+ std::shared_ptr<DeleteBitmap> delete_bitmap = nullptr;
bool record_rowids = false;
RowIdConversion* rowid_conversion;
bool is_vertical_compaction = false;
diff --git a/be/src/olap/rowset_version_mgr.cpp
b/be/src/olap/rowset_version_mgr.cpp
new file mode 100644
index 00000000000..27df2ede4b2
--- /dev/null
+++ b/be/src/olap/rowset_version_mgr.cpp
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <brpc/controller.h>
+#include <bthread/bthread.h>
+#include <bthread/countdown_event.h>
+#include <bthread/mutex.h>
+#include <bthread/types.h>
+#include <bvar/latency_recorder.h>
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/HeartbeatService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <glog/logging.h>
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <ranges>
+#include <sstream>
+#include <utility>
+
+#include "cloud/config.h"
+#include "common/status.h"
+#include "cpp/sync_point.h"
+#include "olap/base_tablet.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_reader.h"
+#include "runtime/client_cache.h"
+#include "service/backend_options.h"
+#include "service/internal_service.h"
+#include "util/brpc_client_cache.h"
+#include "util/debug_points.h"
+#include "util/thrift_rpc_helper.h"
+#include "util/time.h"
+
+namespace doris {
+
+using namespace ErrorCode;
+using namespace std::ranges;
+
+static bvar::LatencyRecorder
g_remote_fetch_tablet_rowsets_single_request_latency(
+ "remote_fetch_rowsets_single_rpc");
+static bvar::LatencyRecorder
g_remote_fetch_tablet_rowsets_latency("remote_fetch_rowsets");
+
+[[nodiscard]] Result<std::vector<Version>>
BaseTablet::capture_consistent_versions_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options) const {
+ std::vector<Version> version_path;
+ auto st =
+
_timestamped_version_tracker.capture_consistent_versions(version_range,
&version_path);
+ if (!st && !options.quiet) {
+ auto missed_versions =
get_missed_versions_unlocked(version_range.second);
+ if (missed_versions.empty()) {
+ LOG(WARNING) << fmt::format(
+ "version already has been merged. version_range={},
max_version={}, "
+ "tablet_id={}",
+ version_range.to_string(),
_tablet_meta->max_version().second, tablet_id());
+ return ResultError(Status::Error<VERSION_ALREADY_MERGED>(
+ "missed versions is empty, version_range={},
max_version={}, tablet_id={}",
+ version_range.to_string(),
_tablet_meta->max_version().second, tablet_id()));
+ }
+ LOG(WARNING) << fmt::format("missed version for version_range={},
tablet_id={}, st={}",
+ version_range.to_string(), tablet_id(),
st);
+ _print_missed_versions(missed_versions);
+ if (!options.skip_missing_versions) {
+ return ResultError(std::move(st));
+ }
+ LOG(WARNING) << "force skipping missing version for tablet:" <<
tablet_id();
+ }
+ DBUG_EXECUTE_IF("Tablet::capture_consistent_versions.inject_failure", {
+ auto tablet_id = dp->param<int64>("tablet_id", -1);
+ auto skip_by_option = dp->param<bool>("skip_by_option", false);
+ if (skip_by_option && !options.enable_fetch_rowsets_from_peers) {
+ return version_path;
+ }
+ if (tablet_id != -1 && (tablet_id == _tablet_meta->tablet_id()) ||
tablet_id == -2) {
+ return ResultError(Status::Error<VERSION_ALREADY_MERGED>("version
already merged"));
+ }
+ });
+ return version_path;
+}
+
+[[nodiscard]] Result<CaptureRowsetResult>
BaseTablet::capture_consistent_rowsets_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options) const {
+ CaptureRowsetResult result;
+ auto& rowsets = result.rowsets;
+ auto maybe_versions = capture_consistent_versions_unlocked(version_range,
options);
+ if (maybe_versions) {
+ const auto& version_paths = maybe_versions.value();
+ rowsets.reserve(version_paths.size());
+
+ auto rowset_for_version = [&](const Version& version,
+ bool include_stale) ->
Result<RowsetSharedPtr> {
+ if (auto it = _rs_version_map.find(version); it !=
_rs_version_map.end()) {
+ return it->second;
+ } else {
+ VLOG_NOTICE << "fail to find Rowset in rs_version for version.
tablet="
+ << tablet_id() << ", version='" << version.first
<< "-"
+ << version.second;
+ }
+ if (include_stale) {
+ if (auto it = _stale_rs_version_map.find(version);
+ it != _stale_rs_version_map.end()) {
+ return it->second;
+ } else {
+ LOG(WARNING) << fmt::format(
+ "fail to find Rowset in stale_rs_version for
version. tablet={}, "
+ "version={}-{}",
+ tablet_id(), version.first, version.second);
+ }
+ }
+ return ResultError(Status::Error<CAPTURE_ROWSET_ERROR>(
+ "failed to find rowset for version={}",
version.to_string()));
+ };
+
+ for (const auto& version : version_paths) {
+ auto ret = rowset_for_version(version,
options.include_stale_rowsets);
+ if (!ret) {
+ return ResultError(std::move(ret.error()));
+ }
+
+ rowsets.push_back(std::move(ret.value()));
+ }
+ if (keys_type() == KeysType::UNIQUE_KEYS &&
enable_unique_key_merge_on_write()) {
+ result.delete_bitmap = _tablet_meta->delete_bitmap();
+ }
+ return result;
+ }
+
+ if (!config::is_cloud_mode() || !options.enable_fetch_rowsets_from_peers) {
+ return ResultError(std::move(maybe_versions.error()));
+ }
+ auto ret = _remote_capture_rowsets(version_range);
+ if (!ret) {
+ auto st = Status::Error<VERSION_ALREADY_MERGED>(
+ "version already merged, meet error during remote capturing
rowsets, "
+ "error={}, version_range={}",
+ ret.error().to_string(), version_range.to_string());
+ return ResultError(std::move(st));
+ }
+ return ret;
+}
+
+[[nodiscard]] Result<std::vector<RowSetSplits>>
BaseTablet::capture_rs_readers_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options) const {
+ auto maybe_rs_list = capture_consistent_rowsets_unlocked(version_range,
options);
+ if (!maybe_rs_list) {
+ return ResultError(std::move(maybe_rs_list.error()));
+ }
+ const auto& rs_list = maybe_rs_list.value().rowsets;
+ std::vector<RowSetSplits> rs_splits;
+ rs_splits.reserve(rs_list.size());
+ for (const auto& rs : rs_list) {
+ RowsetReaderSharedPtr rs_reader;
+ auto st = rs->create_reader(&rs_reader);
+ if (!st) {
+ return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+ "failed to create reader for rowset={}, reason={}",
rs->rowset_id().to_string(),
+ st.to_string()));
+ }
+ rs_splits.emplace_back(std::move(rs_reader));
+ }
+ return rs_splits;
+}
+
+[[nodiscard]] Result<TabletReadSource> BaseTablet::capture_read_source(
+ const Version& version_range, const CaptureRowsetOps& options) {
+ std::shared_lock rdlock(get_header_lock());
+ auto maybe_result = capture_consistent_rowsets_unlocked(version_range,
options);
+ if (!maybe_result) {
+ return ResultError(std::move(maybe_result.error()));
+ }
+ auto rowsets_result = std::move(maybe_result.value());
+ TabletReadSource read_source;
+ read_source.delete_bitmap = std::move(rowsets_result.delete_bitmap);
+ const auto& rowsets = rowsets_result.rowsets;
+ read_source.rs_splits.reserve(rowsets.size());
+ for (const auto& rs : rowsets) {
+ RowsetReaderSharedPtr rs_reader;
+ auto st = rs->create_reader(&rs_reader);
+ if (!st) {
+ return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+ "failed to create reader for rowset={}, reason={}",
rs->rowset_id().to_string(),
+ st.to_string()));
+ }
+ read_source.rs_splits.emplace_back(std::move(rs_reader));
+ }
+ return read_source;
+}
+
+template <typename Fn, typename... Args>
+bool call_bthread(bthread_t& th, const bthread_attr_t* attr, Fn&& fn,
Args&&... args) {
+ auto p_wrap_fn = new auto([=] { fn(args...); });
+ auto call_back = [](void* ar) -> void* {
+ auto f = reinterpret_cast<decltype(p_wrap_fn)>(ar);
+ (*f)();
+ delete f;
+ return nullptr;
+ };
+ return bthread_start_background(&th, attr, call_back, p_wrap_fn) == 0;
+}
+
+struct GetRowsetsCntl : std::enable_shared_from_this<GetRowsetsCntl> {
+ struct RemoteGetRowsetResult {
+ std::vector<RowsetMetaSharedPtr> rowsets;
+ std::unique_ptr<DeleteBitmap> delete_bitmap;
+ };
+
+ Status start_req_bg() {
+ task_cnt = req_addrs.size();
+ for (const auto& [ip, port] : req_addrs) {
+ bthread_t tid;
+ bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+
+ bool succ = call_bthread(tid, &attr, [self = shared_from_this(),
&ip, port]() {
+ LOG(INFO) << "start to get tablet rowsets from peer BE, ip="
<< ip;
+ Defer defer_log {[&ip, port]() {
+ LOG(INFO) << "finish to get rowsets from peer BE, ip=" <<
ip
+ << ", port=" << port;
+ }};
+
+ PGetTabletRowsetsRequest req;
+ req.set_tablet_id(self->tablet_id);
+ req.set_version_start(self->version_range.first);
+ req.set_version_end(self->version_range.second);
+ if (self->delete_bitmap_keys.has_value()) {
+
req.mutable_delete_bitmap_keys()->CopyFrom(self->delete_bitmap_keys.value());
+ }
+ brpc::Controller cntl;
+ cntl.set_timeout_ms(60000);
+ cntl.set_max_retry(3);
+ PGetTabletRowsetsResponse response;
+ auto start_tm_us = MonotonicMicros();
+#ifndef BE_TEST
+ std::shared_ptr<PBackendService_Stub> stub =
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(ip, port);
+ if (stub == nullptr) {
+ self->result = ResultError(Status::InternalError(
+ "failed to fetch get_tablet_rowsets stub, ip={},
port={}", ip, port));
+ return;
+ }
+ stub->get_tablet_rowsets(&cntl, &req, &response, nullptr);
+#else
+ TEST_SYNC_POINT_CALLBACK("get_tablet_rowsets", &response);
+#endif
+ g_remote_fetch_tablet_rowsets_single_request_latency
+ << MonotonicMicros() - start_tm_us;
+
+ std::unique_lock l(self->butex);
+ if (self->done) {
+ return;
+ }
+ --self->task_cnt;
+ auto resp_st = Status::create(response.status());
+ DBUG_EXECUTE_IF("GetRowsetCntl::start_req_bg.inject_failure",
+ { resp_st = Status::InternalError("inject
error"); });
+ if (cntl.Failed() || !resp_st) {
+ if (self->task_cnt != 0) {
+ return;
+ }
+ std::stringstream reason;
+ reason << "failed to get rowsets from all replicas,
tablet_id="
+ << self->tablet_id;
+ if (cntl.Failed()) {
+ reason << ", reason=[" << cntl.ErrorCode() << "] " <<
cntl.ErrorText();
+ } else {
+ reason << ", reason=" << resp_st.to_string();
+ }
+ self->result =
ResultError(Status::InternalError(reason.str()));
+ self->done = true;
+ self->event.signal();
+ return;
+ }
+
+ Defer done_cb {[&]() {
+ self->done = true;
+ self->event.signal();
+ }};
+ std::vector<RowsetMetaSharedPtr> rs_metas;
+ for (auto&& rs_pb : response.rowsets()) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ if (!rs_meta->init_from_pb(rs_pb)) {
+ self->result =
+ ResultError(Status::InternalError("failed to
init rowset from pb"));
+ return;
+ }
+ rs_metas.push_back(std::move(rs_meta));
+ }
+ CaptureRowsetResult result;
+ self->result->rowsets = std::move(rs_metas);
+
+ if (response.has_delete_bitmap()) {
+ self->result->delete_bitmap =
std::make_unique<DeleteBitmap>(
+ DeleteBitmap::from_pb(response.delete_bitmap(),
self->tablet_id));
+ }
+ });
+
+ if (!succ) {
+ return Status::InternalError(
+ "failed to create bthread when request rowsets for
tablet={}", tablet_id);
+ }
+ if (bthread_join(tid, nullptr) != 0) {
+ return Status::InternalError("failed to join bthread tid={}",
tid);
+ }
+ }
+ return Status::OK();
+ }
+
+ Result<RemoteGetRowsetResult> wait_for_ret() {
+ event.wait();
+ return std::move(result);
+ }
+
+ int64_t tablet_id;
+ std::vector<std::pair<std::string, int32_t>> req_addrs;
+ Version version_range;
+ std::optional<DeleteBitmapPB> delete_bitmap_keys = std::nullopt;
+
+private:
+ size_t task_cnt;
+
+ bthread::Mutex butex;
+ bthread::CountdownEvent event {1};
+ bool done = false;
+
+ Result<RemoteGetRowsetResult> result;
+};
+
+Result<std::vector<std::pair<std::string, int32_t>>>
get_peer_replicas_addresses(
+ const int64_t tablet_id) {
+ auto* cluster_info = ExecEnv::GetInstance()->cluster_info();
+ DCHECK_NE(cluster_info, nullptr);
+ auto master_addr = cluster_info->master_fe_addr;
+ TGetTabletReplicaInfosRequest req;
+ req.tablet_ids.push_back(tablet_id);
+ TGetTabletReplicaInfosResult resp;
+ auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
+ master_addr.hostname, master_addr.port,
+ [&](FrontendServiceConnection& client) {
client->getTabletReplicaInfos(resp, req); });
+ if (!st) {
+ return ResultError(Status::InternalError(
+ "failed to get tablet replica infos, rpc error={},
tablet_id={}", st.to_string(),
+ tablet_id));
+ }
+
+ auto it = resp.tablet_replica_infos.find(tablet_id);
+ if (it == resp.tablet_replica_infos.end()) {
+ return ResultError(Status::InternalError("replicas not found,
tablet_id={}", tablet_id));
+ }
+ auto replicas = it->second;
+ auto local_host = BackendOptions::get_localhost();
+ bool include_local_host = false;
+ DBUG_EXECUTE_IF("get_peer_replicas_address.enable_local_host", {
include_local_host = true; });
+ auto ret_view =
+ replicas | std::views::filter([&local_host,
include_local_host](const auto& replica) {
+ return local_host.find(replica.host) == std::string::npos ||
include_local_host;
+ }) |
+ std::views::transform([](auto& replica) {
+ return std::make_pair(std::move(replica.host),
replica.brpc_port);
+ });
+ return std::vector(ret_view.begin(), ret_view.end());
+}
+
+Result<CaptureRowsetResult> BaseTablet::_remote_capture_rowsets(
+ const Version& version_range) const {
+ auto start_tm_us = MonotonicMicros();
+ Defer defer {
+ [&]() { g_remote_fetch_tablet_rowsets_latency << MonotonicMicros()
- start_tm_us; }};
+#ifndef BE_TEST
+ auto maybe_be_addresses = get_peer_replicas_addresses(tablet_id());
+#else
+ Result<std::vector<std::pair<std::string, int32_t>>> maybe_be_addresses;
+ TEST_SYNC_POINT_CALLBACK("get_peer_replicas_addresses",
&maybe_be_addresses);
+#endif
+
DBUG_EXECUTE_IF("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail",
+ { maybe_be_addresses =
ResultError(Status::InternalError("inject failure")); });
+ if (!maybe_be_addresses) {
+ return ResultError(std::move(maybe_be_addresses.error()));
+ }
+ auto be_addresses = std::move(maybe_be_addresses.value());
+ if (be_addresses.empty()) {
+ LOG(WARNING) << "no peers replica for tablet=" << tablet_id();
+ return ResultError(Status::InternalError("no replicas for tablet={}",
tablet_id()));
+ }
+
+ auto cntl = std::make_shared<GetRowsetsCntl>();
+ cntl->tablet_id = tablet_id();
+ cntl->req_addrs = std::move(be_addresses);
+ cntl->version_range = version_range;
+ bool is_mow = keys_type() == KeysType::UNIQUE_KEYS &&
enable_unique_key_merge_on_write();
+ CaptureRowsetResult result;
+ if (is_mow) {
+ result.delete_bitmap =
+
std::make_unique<DeleteBitmap>(_tablet_meta->delete_bitmap()->snapshot());
+ DeleteBitmapPB delete_bitmap_keys;
+ auto keyset = result.delete_bitmap->delete_bitmap |
+ std::views::transform([](const auto& kv) { return
kv.first; });
+ for (const auto& key : keyset) {
+ const auto& [rs_id, seg_id, version] = key;
+ delete_bitmap_keys.mutable_rowset_ids()->Add(rs_id.to_string());
+ delete_bitmap_keys.mutable_segment_ids()->Add(seg_id);
+ delete_bitmap_keys.mutable_versions()->Add(version);
+ }
+ cntl->delete_bitmap_keys = std::move(delete_bitmap_keys);
+ }
+
+ RETURN_IF_ERROR_RESULT(cntl->start_req_bg());
+ auto maybe_meta = cntl->wait_for_ret();
+ if (!maybe_meta) {
+ auto err = Status::InternalError(
+ "tried to get rowsets from peer replicas and failed, "
+ "reason={}",
+ maybe_meta.error());
+ return ResultError(std::move(err));
+ }
+
+ auto& remote_meta = maybe_meta.value();
+ const auto& rs_metas = remote_meta.rowsets;
+ for (const auto& rs_meta : rs_metas) {
+ RowsetSharedPtr rs;
+ auto st = RowsetFactory::create_rowset(_tablet_meta->tablet_schema(),
{}, rs_meta, &rs);
+ if (!st) {
+ return ResultError(std::move(st));
+ }
+ result.rowsets.push_back(std::move(rs));
+ }
+ if (is_mow) {
+ DCHECK_NE(result.delete_bitmap, nullptr);
+ result.delete_bitmap->merge(*remote_meta.delete_bitmap);
+ }
+ return result;
+}
+
+} // namespace doris
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index ae285a94aff..7fd6e96730e 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -496,12 +496,12 @@ Status LinkedSchemaChange::process(RowsetReaderSharedPtr
rowset_reader, RowsetWr
// copy delete bitmap to new tablet.
if (new_tablet->keys_type() == UNIQUE_KEYS &&
new_tablet->enable_unique_key_merge_on_write()) {
DeleteBitmap origin_delete_bitmap(base_tablet->tablet_id());
- base_tablet->tablet_meta()->delete_bitmap().subset(
+ base_tablet->tablet_meta()->delete_bitmap()->subset(
{rowset_reader->rowset()->rowset_id(), 0, 0},
{rowset_reader->rowset()->rowset_id(), UINT32_MAX, INT64_MAX},
&origin_delete_bitmap);
for (auto& iter : origin_delete_bitmap.delete_bitmap) {
- int ret = new_tablet->tablet_meta()->delete_bitmap().set(
+ int ret = new_tablet->tablet_meta()->delete_bitmap()->set(
{rowset_writer->rowset_id(), std::get<1>(iter.first),
std::get<2>(iter.first)},
iter.second);
DCHECK(ret == 1);
@@ -958,7 +958,7 @@ Status SchemaChangeJob::_do_process_alter_tablet(const
TAlterTabletReqV2& reques
reader_context.sequence_id_idx =
reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = _base_tablet->keys_type() ==
UNIQUE_KEYS;
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
- reader_context.delete_bitmap =
&_base_tablet->tablet_meta()->delete_bitmap();
+ reader_context.delete_bitmap =
_base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, end_version);
for (auto& rs_split : rs_splits) {
res = rs_split.rs_reader->init(&reader_context);
@@ -1073,10 +1073,8 @@ Status
SchemaChangeJob::_get_versions_to_be_changed(std::vector<Version>* versio
_base_tablet->tablet_id());
}
*max_rowset = rowset;
-
- RETURN_IF_ERROR(_base_tablet->capture_consistent_versions_unlocked(
- Version(0, rowset->version().second), versions_to_be_changed,
false, false));
-
+ *versions_to_be_changed =
DORIS_TRY(_base_tablet->capture_consistent_versions_unlocked(
+ Version(0, rowset->version().second), {}));
return Status::OK();
}
@@ -1478,8 +1476,9 @@ Status
SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version)
<< "double write rowsets for version: " << alter_version + 1
<< "-" << max_version
<< " new_tablet=" << _new_tablet->tablet_id();
std::shared_lock rlock(_new_tablet->get_header_lock());
- RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked(
- {alter_version + 1, max_version}, &rowsets));
+ auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked(
+ {alter_version + 1, max_version}, CaptureRowsetOps {}));
+ rowsets = std::move(ret.rowsets);
}
for (auto rowset_ptr : rowsets) {
std::lock_guard rwlock(_new_tablet->get_rowset_update_lock());
@@ -1497,8 +1496,9 @@ Status
SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version)
LOG(INFO) << "alter table for unique with merge-on-write, calculate
delete bitmap of "
<< "incremental rowsets for version: " << max_version + 1 <<
"-"
<< new_max_version << " new_tablet=" <<
_new_tablet->tablet_id();
- RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked(
- {max_version + 1, new_max_version}, &rowsets));
+ auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked(
+ {max_version + 1, new_max_version}, CaptureRowsetOps {}));
+ rowsets = std::move(ret.rowsets);
}
for (auto&& rowset_ptr : rowsets) {
RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet,
rowset_ptr));
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index a59ed36bb82..6d2c956ac04 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -40,6 +40,7 @@
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/local_file_system.h"
+#include "olap/base_tablet.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@@ -567,13 +568,23 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
res = check_version_continuity(consistent_rowsets);
if (res.ok() && max_cooldowned_version < version) {
// Pick consistent rowsets of remaining required
version
- res = ref_tablet->capture_consistent_rowsets_unlocked(
- {max_cooldowned_version + 1, version},
&consistent_rowsets);
+ auto ret =
ref_tablet->capture_consistent_rowsets_unlocked(
+ {max_cooldowned_version + 1, version},
CaptureRowsetOps {});
+ if (ret) {
+ consistent_rowsets = std::move(ret->rowsets);
+ } else {
+ res = std::move(ret.error());
+ }
}
} else {
// get shortest version path
- res =
ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version),
-
&consistent_rowsets);
+ auto ret =
ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version),
+
CaptureRowsetOps {});
+ if (ret) {
+ consistent_rowsets = std::move(ret->rowsets);
+ } else {
+ res = std::move(ret.error());
+ }
}
if (!res.ok()) {
LOG(WARNING) << "fail to select versions to span. res=" <<
res;
@@ -594,7 +605,7 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
if (ref_tablet->keys_type() == UNIQUE_KEYS &&
ref_tablet->enable_unique_key_merge_on_write()) {
delete_bitmap_snapshot =
-
ref_tablet->tablet_meta()->delete_bitmap().snapshot(version);
+
ref_tablet->tablet_meta()->delete_bitmap()->snapshot(version);
}
}
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 5c4770e3a33..403438f1dad 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -391,12 +391,14 @@ Status Tablet::revise_tablet_meta(const
std::vector<RowsetSharedPtr>& to_add,
}
if (calc_delete_bitmap_ver.first <= calc_delete_bitmap_ver.second) {
- calc_bm_status =
capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver,
-
&calc_delete_bitmap_rowsets);
- if (!calc_bm_status.ok()) {
- LOG(WARNING) << "fail to capture_consistent_rowsets, res: " <<
calc_bm_status;
+ auto ret =
capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver,
+ CaptureRowsetOps
{});
+ if (!ret) {
+ LOG(WARNING) << "fail to capture_consistent_rowsets, res: " <<
ret.error();
+ calc_bm_status = std::move(ret.error());
break;
}
+ calc_delete_bitmap_rowsets = std::move(ret->rowsets);
// FIXME(plat1ko): Use `const TabletSharedPtr&` as parameter
auto self = _engine.tablet_manager()->get_tablet(tablet_id());
CHECK(self);
@@ -451,17 +453,16 @@ Status Tablet::revise_tablet_meta(const
std::vector<RowsetSharedPtr>& to_add,
// that we can capture by version
if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
Version full_version = Version(0, max_version_unlocked());
- std::vector<RowsetSharedPtr> expected_rowsets;
- auto st = capture_consistent_rowsets_unlocked(full_version,
&expected_rowsets);
- DCHECK(st.ok()) << st;
- DCHECK_EQ(base_rowsets_for_full_clone.size(),
expected_rowsets.size());
- if (st.ok() && base_rowsets_for_full_clone.size() !=
expected_rowsets.size())
- [[unlikely]] {
+ auto ret = capture_consistent_rowsets_unlocked(full_version,
CaptureRowsetOps {});
+ DCHECK(ret) << ret.error();
+ DCHECK_EQ(base_rowsets_for_full_clone.size(), ret->rowsets.size());
+
+ if (ret && base_rowsets_for_full_clone.size() !=
ret->rowsets.size()) [[unlikely]] {
LOG(WARNING) << "full clone succeeded, but the count("
<< base_rowsets_for_full_clone.size()
<< ") of base rowsets used for delete bitmap
calculation is not match "
"expect count("
- << expected_rowsets.size() << ") we capture from
tablet meta";
+ << ret->rowsets.size() << ") we capture from
tablet meta";
}
}
}
@@ -747,10 +748,9 @@ void Tablet::delete_expired_stale_rowset() {
Version test_version = Version(0, lastest_delta->end_version());
stale_version_path_map[*path_id_iter] = version_path;
- Status status =
- capture_consistent_versions_unlocked(test_version,
nullptr, false, false);
+ auto ret = capture_consistent_versions_unlocked(test_version, {});
// 1. When there is no consistent versions, we must reconstruct
the tracker.
- if (!status.ok()) {
+ if (!ret) {
// 2. fetch missing version after delete
Versions after_missed_versions =
get_missed_versions_unlocked(lastest_delta->end_version());
@@ -865,51 +865,11 @@ void Tablet::delete_expired_stale_rowset() {
{ _engine.start_delete_unused_rowset(); });
}
-Status Tablet::capture_consistent_versions_unlocked(const Version&
spec_version,
- Versions* version_path,
- bool skip_missing_version,
bool quiet) const {
- Status status =
-
_timestamped_version_tracker.capture_consistent_versions(spec_version,
version_path);
- if (!status.ok() && !quiet) {
- Versions missed_versions =
get_missed_versions_unlocked(spec_version.second);
- if (missed_versions.empty()) {
- // if version_path is null, it may be a compaction check logic.
- // so to avoid print too many logs.
- if (version_path != nullptr) {
- LOG(WARNING) << "tablet:" << tablet_id()
- << ", version already has been merged.
spec_version: " << spec_version
- << ", max_version: " << max_version_unlocked();
- }
- status = Status::Error<VERSION_ALREADY_MERGED, false>(
- "versions are already compacted, spec_version "
- "{}, max_version {}, tablet_id {}",
- spec_version.second, max_version_unlocked(), tablet_id());
- } else {
- if (version_path != nullptr) {
- LOG(WARNING) << "status:" << status << ", tablet:" <<
tablet_id()
- << ", missed version for version:" <<
spec_version;
- _print_missed_versions(missed_versions);
- if (skip_missing_version) {
- LOG(WARNING) << "force skipping missing version for
tablet:" << tablet_id();
- return Status::OK();
- }
- }
- }
- }
-
- DBUG_EXECUTE_IF("TTablet::capture_consistent_versions.inject_failure", {
- auto tablet_id = dp->param<int64>("tablet_id", -1);
- if (tablet_id != -1 && tablet_id == _tablet_meta->tablet_id()) {
- status = Status::Error<VERSION_ALREADY_MERGED>("version already
merged");
- }
- });
-
- return status;
-}
-
Status Tablet::check_version_integrity(const Version& version, bool quiet) {
std::shared_lock rdlock(_meta_lock);
- return capture_consistent_versions_unlocked(version, nullptr, false,
quiet);
+ [[maybe_unused]] auto _versions = DORIS_TRY(
+ capture_consistent_versions_unlocked(version, CaptureRowsetOps
{.quiet = quiet}));
+ return Status::OK();
}
bool Tablet::exceed_version_limit(int32_t limit) {
@@ -939,22 +899,12 @@ void Tablet::acquire_version_and_rowsets(
}
}
-Status Tablet::capture_consistent_rowsets_unlocked(const Version& spec_version,
-
std::vector<RowsetSharedPtr>* rowsets) const {
- std::vector<Version> version_path;
- RETURN_IF_ERROR(
- capture_consistent_versions_unlocked(spec_version, &version_path,
false, false));
- RETURN_IF_ERROR(_capture_consistent_rowsets_unlocked(version_path,
rowsets));
- return Status::OK();
-}
-
Status Tablet::capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) {
std::shared_lock rlock(_meta_lock);
std::vector<Version> version_path;
- RETURN_IF_ERROR(capture_consistent_versions_unlocked(spec_version,
&version_path,
- skip_missing_version,
false));
- RETURN_IF_ERROR(capture_rs_readers_unlocked(version_path, rs_splits));
+ *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
+ spec_version, CaptureRowsetOps {.skip_missing_versions =
skip_missing_version}));
return Status::OK();
}
@@ -2514,8 +2464,8 @@ Status Tablet::save_delete_bitmap(const TabletTxnInfo*
txn_info, int64_t txn_id,
for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
// skip sentinel mark, which is used for delete bitmap correctness
check
if (std::get<1>(key) != DeleteBitmap::INVALID_SEGMENT_ID) {
- _tablet_meta->delete_bitmap().merge({std::get<0>(key),
std::get<1>(key), cur_version},
- bitmap);
+ _tablet_meta->delete_bitmap()->merge({std::get<0>(key),
std::get<1>(key), cur_version},
+ bitmap);
}
}
@@ -2523,7 +2473,7 @@ Status Tablet::save_delete_bitmap(const TabletTxnInfo*
txn_info, int64_t txn_id,
}
void Tablet::merge_delete_bitmap(const DeleteBitmap& delete_bitmap) {
- _tablet_meta->delete_bitmap().merge(delete_bitmap);
+ _tablet_meta->delete_bitmap()->merge(delete_bitmap);
}
bool Tablet::check_all_rowset_segment() {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 467fc51f985..a8c9df89ff0 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -180,21 +180,12 @@ public:
/// need to delete flag.
void delete_expired_stale_rowset();
- // Given spec_version, find a continuous version path and store it in
version_path.
- // If quiet is true, then only "does this path exist" is returned.
- // If skip_missing_version is true, return ok even there are missing
versions.
- Status capture_consistent_versions_unlocked(const Version& spec_version,
Versions* version_path,
- bool skip_missing_version,
bool quiet) const;
-
// if quiet is true, no error log will be printed if there are missing
versions
Status check_version_integrity(const Version& version, bool quiet = false);
bool check_version_exist(const Version& version) const;
void acquire_version_and_rowsets(
std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets)
const;
- Status capture_consistent_rowsets_unlocked(
- const Version& spec_version, std::vector<RowsetSharedPtr>*
rowsets) const override;
-
// If skip_missing_version is true, skip versions if they are missing.
Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) override;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index f74cf2bf1f6..3e7d48e44af 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -1764,7 +1764,7 @@ void TabletManager::get_topn_tablet_delete_bitmap_score(
buf.reserve(n + 1);
auto handler = [&](const TabletSharedPtr& tablet) {
uint64_t delete_bitmap_count =
-
tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
+
tablet->tablet_meta()->delete_bitmap()->get_delete_bitmap_count();
total_delete_map_count += delete_bitmap_count;
if (delete_bitmap_count > *max_delete_bitmap_score) {
max_delete_bitmap_score_tablet_id = tablet->tablet_id();
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 3a0ff3419ee..c1dbc8a9394 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -29,6 +29,7 @@
#include <cstdint>
#include <memory>
#include <random>
+#include <ranges>
#include <set>
#include <utility>
@@ -455,12 +456,12 @@ void TabletMeta::init_column_from_tcolumn(uint32_t
unique_id, const TColumn& tco
void TabletMeta::remove_rowset_delete_bitmap(const RowsetId& rowset_id, const
Version& version) {
if (_enable_unique_key_merge_on_write) {
- delete_bitmap().remove({rowset_id, 0, 0}, {rowset_id, UINT32_MAX, 0});
+ delete_bitmap()->remove({rowset_id, 0, 0}, {rowset_id, UINT32_MAX, 0});
if (config::enable_mow_verbose_log) {
LOG_INFO("delete rowset delete bitmap. tablet={}, rowset={},
version={}", tablet_id(),
rowset_id.to_string(), version.to_string());
}
- size_t rowset_cache_version_size =
delete_bitmap().remove_rowset_cache_version(rowset_id);
+ size_t rowset_cache_version_size =
delete_bitmap()->remove_rowset_cache_version(rowset_id);
_check_mow_rowset_cache_version_size(rowset_cache_version_size);
}
}
@@ -705,7 +706,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB&
tablet_meta_pb) {
auto seg_id = tablet_meta_pb.delete_bitmap().segment_ids(i);
auto ver = tablet_meta_pb.delete_bitmap().versions(i);
auto bitmap =
tablet_meta_pb.delete_bitmap().segment_delete_bitmaps(i).data();
- delete_bitmap().delete_bitmap[{rst_id, seg_id, ver}] =
roaring::Roaring::read(bitmap);
+ delete_bitmap()->delete_bitmap[{rst_id, seg_id, ver}] =
roaring::Roaring::read(bitmap);
}
}
@@ -789,7 +790,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
stale_rs_ids.insert(rowset->rowset_id());
}
DeleteBitmapPB* delete_bitmap_pb =
tablet_meta_pb->mutable_delete_bitmap();
- for (auto& [id, bitmap] : delete_bitmap().snapshot().delete_bitmap) {
+ for (auto& [id, bitmap] : delete_bitmap()->snapshot().delete_bitmap) {
auto& [rowset_id, segment_id, ver] = id;
if (stale_rs_ids.count(rowset_id) != 0) {
continue;
@@ -1121,6 +1122,35 @@ DeleteBitmap& DeleteBitmap::operator=(DeleteBitmap&& o) {
return *this;
}
+DeleteBitmap DeleteBitmap::from_pb(const DeleteBitmapPB& pb, int64_t
tablet_id) {
+ size_t len = pb.rowset_ids().size();
+ DCHECK_EQ(len, pb.segment_ids().size());
+ DCHECK_EQ(len, pb.versions().size());
+ DeleteBitmap delete_bitmap(tablet_id);
+ for (int32_t i = 0; i < len; ++i) {
+ RowsetId rs_id;
+ rs_id.init(pb.rowset_ids(i));
+ BitmapKey key = {rs_id, pb.segment_ids(i), pb.versions(i)};
+ delete_bitmap.delete_bitmap[key] =
+ roaring::Roaring::read(pb.segment_delete_bitmaps(i).data());
+ }
+ return delete_bitmap;
+}
+
+DeleteBitmapPB DeleteBitmap::to_pb() {
+ std::shared_lock l(lock);
+ DeleteBitmapPB ret;
+ for (const auto& [k, v] : delete_bitmap) {
+ ret.mutable_rowset_ids()->Add(std::get<0>(k).to_string());
+ ret.mutable_segment_ids()->Add(std::get<1>(k));
+ ret.mutable_versions()->Add(std::get<2>(k));
+ std::string bitmap_data(v.getSizeInBytes(), '\0');
+ v.write(bitmap_data.data());
+ ret.mutable_segment_delete_bitmaps()->Add(std::move(bitmap_data));
+ }
+ return ret;
+}
+
DeleteBitmap DeleteBitmap::snapshot() const {
std::shared_lock l(lock);
return DeleteBitmap(*this);
@@ -1508,6 +1538,22 @@ std::shared_ptr<roaring::Roaring>
DeleteBitmap::get_agg_without_cache(const Bitm
return bitmap;
}
+DeleteBitmap DeleteBitmap::diffset(const std::set<BitmapKey>& key_set) const {
+ std::shared_lock l(lock);
+ auto diff_key_set_view =
+ delete_bitmap | std::ranges::views::transform([](const auto& kv) {
return kv.first; }) |
+ std::ranges::views::filter(
+ [&key_set](const auto& key) { return
!key_set.contains(key); });
+
+ DeleteBitmap dbm(_tablet_id);
+ for (const auto& key : diff_key_set_view) {
+ const auto* bitmap = get(key);
+ DCHECK_NE(bitmap, nullptr);
+ dbm.delete_bitmap[key] = *bitmap;
+ }
+ return dbm;
+}
+
std::atomic<DeleteBitmap::AggCachePolicy*> DeleteBitmap::AggCache::s_repr
{nullptr};
std::string tablet_state_name(TabletState state) {
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 388ddc439dc..63ad0124f86 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -241,7 +241,7 @@ public:
static void init_column_from_tcolumn(uint32_t unique_id, const TColumn&
tcolumn,
ColumnPB* column);
- DeleteBitmap& delete_bitmap() { return *_delete_bitmap; }
+ std::shared_ptr<DeleteBitmap> delete_bitmap() { return _delete_bitmap; }
void remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version&
version);
bool enable_unique_key_merge_on_write() const { return
_enable_unique_key_merge_on_write; }
@@ -417,6 +417,10 @@ public:
DeleteBitmap(DeleteBitmap&& r);
DeleteBitmap& operator=(DeleteBitmap&& r);
+ static DeleteBitmap from_pb(const DeleteBitmapPB& pb, int64_t tablet_id);
+
+ DeleteBitmapPB to_pb();
+
/**
* Makes a snapshot of delete bitmap, read lock will be acquired in this
* process
@@ -566,6 +570,14 @@ public:
std::set<RowsetId> get_rowset_cache_version();
+ /**
+ * Calculate diffset with given `key_set`. All entries with keys contained
in this delete bitmap but not
+ * in given key_set will be added to the output delete bitmap.
+ *
+ * @return Deletebitmap containning all entries in diffset
+ */
+ DeleteBitmap diffset(const std::set<BitmapKey>& key_set) const;
+
class AggCachePolicy : public LRUCachePolicy {
public:
AggCachePolicy(size_t capacity)
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index a453634ca83..7654a37de16 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -100,16 +100,6 @@ std::string TabletReader::KeysParam::to_string() const {
return ss.str();
}
-void TabletReader::ReadSource::fill_delete_predicates() {
- DCHECK_EQ(delete_predicates.size(), 0);
- for (auto&& split : rs_splits) {
- auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
- if (rs_meta->has_delete_predicate()) {
- delete_predicates.push_back(rs_meta);
- }
- }
-}
-
TabletReader::~TabletReader() {
for (auto* pred : _col_predicates) {
delete pred;
@@ -658,7 +648,7 @@ Status TabletReader::init_reader_params_and_create_block(
reader_params->version =
Version(input_rowsets.front()->start_version(),
input_rowsets.back()->end_version());
- ReadSource read_source;
+ TabletReadSource read_source;
for (const auto& rowset : input_rowsets) {
RowsetReaderSharedPtr rs_reader;
RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
@@ -680,9 +670,6 @@ Status TabletReader::init_reader_params_and_create_block(
merge_tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
}
reader_params->tablet_schema = merge_tablet_schema;
- if (tablet->enable_unique_key_merge_on_write()) {
- reader_params->delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
- }
reader_params->return_columns.resize(read_tablet_schema->num_columns());
std::iota(reader_params->return_columns.begin(),
reader_params->return_columns.end(), 0);
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index 87af3bb08eb..d5aac0b89b5 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -34,6 +34,7 @@
#include "exprs/function_filter.h"
#include "gutil/strings/substitute.h"
#include "io/io_common.h"
+#include "olap/base_tablet.h"
#include "olap/delete_handler.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
@@ -91,12 +92,6 @@ class TabletReader {
};
public:
- struct ReadSource {
- std::vector<RowSetSplits> rs_splits;
- std::vector<RowsetMetaSharedPtr> delete_predicates;
- // Fill delete predicates with `rs_splits`
- void fill_delete_predicates();
- };
// Params for Reader,
// mainly include tablet, data version and fetch range.
struct ReaderParams {
@@ -117,9 +112,12 @@ public:
return BeExecVersionManager::get_newest_version();
}
- void set_read_source(ReadSource read_source) {
+ void set_read_source(TabletReadSource read_source, bool
skip_delete_bitmap = false) {
rs_splits = std::move(read_source.rs_splits);
delete_predicates = std::move(read_source.delete_predicates);
+ if (tablet->enable_unique_key_merge_on_write() &&
!skip_delete_bitmap) {
+ delete_bitmap = std::move(read_source.delete_bitmap);
+ }
}
BaseTabletSPtr tablet;
@@ -148,7 +146,7 @@ public:
std::vector<RowSetSplits> rs_splits;
// For unique key table with merge-on-write
- DeleteBitmap* delete_bitmap = nullptr;
+ std::shared_ptr<DeleteBitmap> delete_bitmap = nullptr;
// return_columns is init from query schema
std::vector<uint32_t> return_columns;
diff --git a/be/src/olap/task/engine_checksum_task.cpp
b/be/src/olap/task/engine_checksum_task.cpp
index 05ecfc0401b..c6cf69d54d9 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -24,6 +24,7 @@
#include <vector>
#include "io/io_common.h"
+#include "olap/base_tablet.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset.h"
@@ -81,13 +82,15 @@ Status EngineChecksumTask::_compute_checksum() {
vectorized::Block block;
{
std::shared_lock rdlock(tablet->get_header_lock());
- Status acquire_reader_st =
- tablet->capture_consistent_rowsets_unlocked(version,
&input_rowsets);
- if (!acquire_reader_st.ok()) {
+ auto ret = tablet->capture_consistent_rowsets_unlocked(version,
CaptureRowsetOps {});
+ if (ret) {
+ input_rowsets = std::move(ret->rowsets);
+ } else {
LOG(WARNING) << "fail to captute consistent rowsets. tablet=" <<
tablet->tablet_id()
- << "res=" << acquire_reader_st;
- return acquire_reader_st;
+ << "res=" << ret.error();
+ return std::move(ret.error());
}
+
RETURN_IF_ERROR(TabletReader::init_reader_params_and_create_block(
tablet, ReaderType::READER_CHECKSUM, input_rowsets,
&reader_params, &block));
}
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index ecf1bdfc6d5..6a9e66f1d38 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -984,7 +984,7 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet,
}
}
if (tablet->enable_unique_key_merge_on_write()) {
-
tablet->tablet_meta()->delete_bitmap().merge(cloned_tablet_meta->delete_bitmap());
+
tablet->tablet_meta()->delete_bitmap()->merge(*cloned_tablet_meta->delete_bitmap());
}
return tablet->revise_tablet_meta(to_add, to_delete, false);
// TODO(plat1ko): write cooldown meta to remote if this replica is
cooldown replica
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
index 210aa6a8c56..d9352a0ea9d 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -32,8 +32,10 @@
#include "common/config.h"
#include "common/logging.h"
+#include "common/status.h"
#include "gutil/strings/numbers.h"
#include "io/fs/local_file_system.h"
+#include "olap/base_tablet.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@@ -87,8 +89,10 @@ Status EngineStorageMigrationTask::_get_versions(int32_t
start_version, int32_t*
<< ", start_version=" << start_version << ", end_version="
<< *end_version;
return Status::OK();
}
- return _tablet->capture_consistent_rowsets_unlocked(Version(start_version,
*end_version),
- consistent_rowsets);
+ auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked(
+ Version(start_version, *end_version), CaptureRowsetOps {}));
+ *consistent_rowsets = std::move(ret.rowsets);
+ return Status::OK();
}
bool EngineStorageMigrationTask::_is_timeout() {
@@ -354,7 +358,7 @@ void EngineStorageMigrationTask::_generate_new_header(
}
new_tablet_meta->revise_rs_metas(std::move(rs_metas));
if (_tablet->keys_type() == UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
- DeleteBitmap bm =
_tablet->tablet_meta()->delete_bitmap().snapshot(end_version);
+ DeleteBitmap bm =
_tablet->tablet_meta()->delete_bitmap()->snapshot(end_version);
new_tablet_meta->revise_delete_bitmap_unlocked(bm);
}
new_tablet_meta->set_shard_id(new_shard);
diff --git a/be/src/olap/task/index_builder.cpp
b/be/src/olap/task/index_builder.cpp
index 9f72056af8d..ffb226bee58 100644
--- a/be/src/olap/task/index_builder.cpp
+++ b/be/src/olap/task/index_builder.cpp
@@ -822,7 +822,7 @@ Status IndexBuilder::modify_rowsets(const
Merger::Statistics* stats) {
for (auto i = 0; i < _input_rowsets.size(); ++i) {
RowsetId input_rowset_id = _input_rowsets[i]->rowset_id();
RowsetId output_rowset_id = _output_rowsets[i]->rowset_id();
- for (const auto& [k, v] :
_tablet->tablet_meta()->delete_bitmap().delete_bitmap) {
+ for (const auto& [k, v] :
_tablet->tablet_meta()->delete_bitmap()->delete_bitmap) {
RowsetId rs_id = std::get<0>(k);
if (rs_id == input_rowset_id) {
DeleteBitmap::BitmapKey output_rs_key = {output_rowset_id,
std::get<1>(k),
@@ -832,7 +832,7 @@ Status IndexBuilder::modify_rowsets(const
Merger::Statistics* stats) {
}
}
}
- _tablet->tablet_meta()->delete_bitmap().merge(*delete_bitmap);
+ _tablet->tablet_meta()->delete_bitmap()->merge(*delete_bitmap);
// modify_rowsets will remove the delete_bitmap for input rowsets,
// should call it after merge delete_bitmap
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 53a7d8cd0f3..13d2044cc2a 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -27,6 +27,7 @@
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_hotspot.h"
#include "cloud/config.h"
+#include "common/config.h"
#include "olap/parallel_scanner_builder.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
@@ -528,9 +529,11 @@ Status OlapScanLocalState::hold_tablets() {
}
for (size_t i = 0; i < _scan_ranges.size(); i++) {
- RETURN_IF_ERROR(_tablets[i].tablet->capture_rs_readers(
- {0, _tablets[i].version}, &_read_sources[i].rs_splits,
- RuntimeFilterConsumer::_state->skip_missing_version()));
+ _read_sources[i] = DORIS_TRY(_tablets[i].tablet->capture_read_source(
+ {0, _tablets[i].version},
+ {.skip_missing_versions =
RuntimeFilterConsumer::_state->skip_missing_version(),
+ .enable_fetch_rowsets_from_peers =
+ config::enable_fetch_rowsets_from_peer_replicas}));
if (!PipelineXLocalState<>::_state->skip_delete_predicate()) {
_read_sources[i].fill_delete_predicates();
}
diff --git a/be/src/pipeline/exec/olap_scan_operator.h
b/be/src/pipeline/exec/olap_scan_operator.h
index 24a1b1b876a..e1eea0c7822 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -229,7 +229,7 @@ private:
std::mutex _profile_mtx;
std::vector<TabletWithVersion> _tablets;
- std::vector<TabletReader::ReadSource> _read_sources;
+ std::vector<TabletReadSource> _read_sources;
};
class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 423c62c6c4b..6d5d9055fc3 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -53,6 +53,9 @@
#include <utility>
#include <vector>
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet_mgr.h"
+#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
#include "common/exception.h"
@@ -162,6 +165,8 @@
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit:
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads,
MetricUnit::NOUNIT);
+static bvar::LatencyRecorder
g_process_remote_fetch_rowsets_latency("process_remote_fetch_rowsets");
+
bthread_key_t btls_key;
static void thread_context_deleter(void* d) {
@@ -2200,4 +2205,66 @@ void
PInternalService::get_be_resource(google::protobuf::RpcController* controll
}
}
+void PInternalService::get_tablet_rowsets(google::protobuf::RpcController*
controller,
+ const PGetTabletRowsetsRequest*
request,
+ PGetTabletRowsetsResponse* response,
+ google::protobuf::Closure* done) {
+ DCHECK(config::is_cloud_mode());
+ auto start_time = GetMonoTimeMicros();
+ Defer defer {
+ [&]() { g_process_remote_fetch_rowsets_latency <<
GetMonoTimeMicros() - start_time; }};
+ brpc::ClosureGuard closure_guard(done);
+ LOG(INFO) << "process get tablet rowsets, request=" <<
request->ShortDebugString();
+ if (!request->has_tablet_id() || !request->has_version_start() ||
!request->has_version_end()) {
+ Status::InvalidArgument("missing params
tablet/version_start/version_end")
+ .to_protobuf(response->mutable_status());
+ return;
+ }
+ CloudStorageEngine& storage =
ExecEnv::GetInstance()->storage_engine().to_cloud();
+
+ auto maybe_tablet =
+ storage.tablet_mgr().get_tablet(request->tablet_id(), /*warmup
data*/ false,
+ /*syn_delete_bitmap*/ false,
/*delete_bitmap*/ nullptr,
+ /*local_only*/ true);
+ if (!maybe_tablet) {
+ maybe_tablet.error().to_protobuf(response->mutable_status());
+ return;
+ }
+ auto tablet = maybe_tablet.value();
+ Result<CaptureRowsetResult> ret;
+ {
+ std::shared_lock l(tablet->get_header_lock());
+ ret = tablet->capture_consistent_rowsets_unlocked(
+ {request->version_start(), request->version_end()},
+ CaptureRowsetOps {.enable_fetch_rowsets_from_peers = false});
+ }
+ if (!ret) {
+ ret.error().to_protobuf(response->mutable_status());
+ return;
+ }
+ auto rowsets = std::move(ret.value().rowsets);
+ for (const auto& rs : rowsets) {
+ RowsetMetaPB meta;
+ rs->rowset_meta()->to_rowset_pb(&meta);
+ response->mutable_rowsets()->Add(std::move(meta));
+ }
+ if (request->has_delete_bitmap_keys()) {
+ DCHECK(tablet->enable_unique_key_merge_on_write());
+ auto delete_bitmap = std::move(ret.value().delete_bitmap);
+ auto keys_pb = request->delete_bitmap_keys();
+ size_t len = keys_pb.rowset_ids().size();
+ DCHECK_EQ(len, keys_pb.segment_ids().size());
+ DCHECK_EQ(len, keys_pb.versions().size());
+ std::set<DeleteBitmap::BitmapKey> keys;
+ for (size_t i = 0; i < len; ++i) {
+ RowsetId rs_id;
+ rs_id.init(keys_pb.rowset_ids(i));
+ keys.emplace(rs_id, keys_pb.segment_ids(i), keys_pb.versions(i));
+ }
+ auto diffset = delete_bitmap->diffset(keys).to_pb();
+ *response->mutable_delete_bitmap() = std::move(diffset);
+ }
+ Status::OK().to_protobuf(response->mutable_status());
+}
+
} // namespace doris
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 66a0f867393..7262f4eed2d 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -240,6 +240,11 @@ public:
const PGetBeResourceRequest* request,
PGetBeResourceResponse* response,
google::protobuf::Closure* done) override;
+ void get_tablet_rowsets(google::protobuf::RpcController* controller,
+ const PGetTabletRowsetsRequest* request,
+ PGetTabletRowsetsResponse* response,
+ google::protobuf::Closure* done) override;
+
private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController*
controller,
const PExecPlanFragmentRequest*
request,
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index e9c199074ec..c8008c98525 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -68,7 +68,7 @@
namespace doris::vectorized {
-using ReadSource = TabletReader::ReadSource;
+using ReadSource = TabletReadSource;
NewOlapScanner::NewOlapScanner(pipeline::ScanLocalStateBase* parent,
NewOlapScanner::Params&& params)
@@ -97,7 +97,8 @@ NewOlapScanner::NewOlapScanner(pipeline::ScanLocalStateBase*
parent,
.filter_block_conjuncts {},
.key_group_cluster_key_idxes {},
}) {
- _tablet_reader_params.set_read_source(std::move(params.read_source));
+ _tablet_reader_params.set_read_source(std::move(params.read_source),
+ _state->skip_delete_bitmap());
_is_init = false;
}
@@ -193,12 +194,14 @@ Status NewOlapScanner::init() {
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
}
- auto st = tablet->capture_rs_readers(_tablet_reader_params.version,
- &read_source.rs_splits,
-
_state->skip_missing_version());
- if (!st.ok()) {
- LOG(WARNING) << "fail to init reader.res=" << st;
- return st;
+ auto maybe_read_source = tablet->capture_read_source(
+ _tablet_reader_params.version,
+ {.skip_missing_versions = _state->skip_missing_version(),
+ .enable_fetch_rowsets_from_peers =
+ config::enable_fetch_rowsets_from_peer_replicas});
+ if (!maybe_read_source) {
+ LOG(WARNING) << "fail to init reader. res=" <<
maybe_read_source.error();
+ return maybe_read_source.error();
}
if (config::enable_mow_verbose_log &&
tablet->enable_unique_key_merge_on_write()) {
LOG_INFO("finish capture_rs_readers for tablet={},
query_id={}",
@@ -309,7 +312,6 @@ Status NewOlapScanner::_init_tablet_reader_params(
std::inserter(_tablet_reader_params.function_filters,
_tablet_reader_params.function_filters.begin()));
- auto& tablet = _tablet_reader_params.tablet;
auto& tablet_schema = _tablet_reader_params.tablet_schema;
// Merge the columns in delete predicate that not in latest schema in to
current tablet schema
for (auto& del_pred : _tablet_reader_params.delete_predicates) {
@@ -369,10 +371,6 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.use_page_cache = _state->enable_page_cache();
- if (tablet->enable_unique_key_merge_on_write() &&
!_state->skip_delete_bitmap()) {
- _tablet_reader_params.delete_bitmap =
&tablet->tablet_meta()->delete_bitmap();
- }
-
DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block",
DBUG_BLOCK);
if (!_state->skip_storage_engine_merge()) {
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h
b/be/src/vec/exec/scan/new_olap_scanner.h
index fd1246b120b..a997ae2adf5 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -62,7 +62,7 @@ public:
std::vector<OlapScanRange*> key_ranges;
BaseTabletSPtr tablet;
int64_t version;
- TabletReader::ReadSource read_source;
+ TabletReadSource read_source;
int64_t limit;
bool aggregation;
};
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index 7f6aadd6070..d3e9115baf1 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -996,7 +996,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
StorageReadOptions opts;
opts.stats = &stats;
opts.tablet_schema = rowset1->tablet_schema();
- opts.delete_bitmap.emplace(0,
tablet->tablet_meta()->delete_bitmap().get_agg(
+ opts.delete_bitmap.emplace(0,
tablet->tablet_meta()->delete_bitmap()->get_agg(
{rowset1->rowset_id(), 0,
cur_version}));
std::unique_ptr<RowwiseIterator> iter;
std::shared_ptr<Schema> schema =
std::make_shared<Schema>(rowset1->tablet_schema());
@@ -1024,7 +1024,7 @@ TEST_F(TestDeltaWriter,
vec_sequence_col_concurrent_write) {
StorageReadOptions opts;
opts.stats = &stats;
opts.tablet_schema = rowset2->tablet_schema();
- opts.delete_bitmap.emplace(0,
tablet->tablet_meta()->delete_bitmap().get_agg(
+ opts.delete_bitmap.emplace(0,
tablet->tablet_meta()->delete_bitmap()->get_agg(
{rowset2->rowset_id(), 0,
cur_version}));
std::unique_ptr<RowwiseIterator> iter;
std::shared_ptr<Schema> schema =
std::make_shared<Schema>(rowset2->tablet_schema());
diff --git a/be/test/olap/segcompaction_mow_test.cpp
b/be/test/olap/segcompaction_mow_test.cpp
index 62a3232889d..efe40dcb859 100644
--- a/be/test/olap/segcompaction_mow_test.cpp
+++ b/be/test/olap/segcompaction_mow_test.cpp
@@ -237,7 +237,7 @@ protected:
std::vector<uint32_t> return_columns = {0, 1, 2};
reader_context.return_columns = &return_columns;
reader_context.stats = &_stats;
- reader_context.delete_bitmap = delete_bitmap.get();
+ reader_context.delete_bitmap = delete_bitmap;
std::vector<uint32_t> segment_num_rows;
Status s;
diff --git a/be/test/olap/tablet_meta_test.cpp
b/be/test/olap/tablet_meta_test.cpp
index b85c63ef714..b350e5e0061 100644
--- a/be/test/olap/tablet_meta_test.cpp
+++ b/be/test/olap/tablet_meta_test.cpp
@@ -73,23 +73,23 @@ TEST(TabletMetaTest, TestReviseMeta) {
}
ASSERT_EQ(4, tablet_meta.all_rs_metas().size());
- tablet_meta.delete_bitmap().add({rsids[0], 1, 1}, 1);
- tablet_meta.delete_bitmap().add({rsids[1], 0, 2}, 2);
- tablet_meta.delete_bitmap().add({rsids[2], 1, 1}, 1);
- tablet_meta.delete_bitmap().add({rsids[3], 0, 2}, 3);
- tablet_meta.delete_bitmap().add({rsids[3], 0, 4}, 4);
- ASSERT_EQ(5, tablet_meta.delete_bitmap().delete_bitmap.size());
+ tablet_meta.delete_bitmap()->add({rsids[0], 1, 1}, 1);
+ tablet_meta.delete_bitmap()->add({rsids[1], 0, 2}, 2);
+ tablet_meta.delete_bitmap()->add({rsids[2], 1, 1}, 1);
+ tablet_meta.delete_bitmap()->add({rsids[3], 0, 2}, 3);
+ tablet_meta.delete_bitmap()->add({rsids[3], 0, 4}, 4);
+ ASSERT_EQ(5, tablet_meta.delete_bitmap()->delete_bitmap.size());
std::vector<RowsetMetaSharedPtr> new_rowsets;
new_rowsets.push_back(src_rowsets[2]->rowset_meta());
new_rowsets.push_back(src_rowsets[3]->rowset_meta());
tablet_meta.revise_rs_metas(std::move(new_rowsets));
// Take a snapshot with max_version=3.
- DeleteBitmap snap = tablet_meta.delete_bitmap().snapshot(3);
+ DeleteBitmap snap = tablet_meta.delete_bitmap()->snapshot(3);
tablet_meta.revise_delete_bitmap_unlocked(snap);
ASSERT_EQ(2, tablet_meta.all_rs_metas().size());
- ASSERT_EQ(2, tablet_meta.delete_bitmap().delete_bitmap.size());
- for (auto entry : tablet_meta.delete_bitmap().delete_bitmap) {
+ ASSERT_EQ(2, tablet_meta.delete_bitmap()->delete_bitmap.size());
+ for (auto entry : tablet_meta.delete_bitmap()->delete_bitmap) {
RowsetId rsid = std::get<0>(entry.first);
ASSERT_TRUE(rsid == rsids[2] || rsid == rsids[3]);
int64_t version = std::get<2>(entry.first);
diff --git a/be/test/olap/test_data/rowset_meta.json
b/be/test/olap/test_data/rowset_meta.json
index d446e2a34e9..4fe585978aa 100644
--- a/be/test/olap/test_data/rowset_meta.json
+++ b/be/test/olap/test_data/rowset_meta.json
@@ -12,6 +12,10 @@
"data_disk_size": 0,
"index_disk_size": 0,
"empty": true,
+ "load_id": {
+ "hi": 0,
+ "lo": 0
+ },
"creation_time": 1552911435,
"tablet_uid": {
"hi": 10,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 358fc1023b2..fce12193e35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -602,6 +602,22 @@ public class CloudReplica extends Replica {
secondaryClusterToBackends.remove(cluster);
}
+ public List<Backend> getAllPrimaryBes() {
+ List<Backend> result = new ArrayList<Backend>();
+ primaryClusterToBackends.keySet().forEach(clusterId -> {
+ List<Long> backendIds = primaryClusterToBackends.get(clusterId);
+ if (backendIds == null || backendIds.isEmpty()) {
+ return;
+ }
+ Long beId = backendIds.get(0);
+ if (beId != -1) {
+ Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+ result.add(backend);
+ }
+ });
+ return result;
+ }
+
// ATTN: This func is only used by redundant tablet report clean in bes.
// Only the master node will do the diff logic,
// so just only need to clean up secondaryClusterToBackends on the master
node.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 92cfe98e535..ddb49a0ee0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -48,6 +48,7 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.catalog.View;
import org.apache.doris.cloud.catalog.CloudPartition;
+import org.apache.doris.cloud.catalog.CloudReplica;
import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
import org.apache.doris.cluster.ClusterNamespace;
@@ -2756,15 +2757,24 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
LOG.warn("replica {} not normal", replica.getId());
continue;
}
- Backend backend =
Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException());
- if (backend != null) {
- TReplicaInfo replicaInfo = new TReplicaInfo();
- replicaInfo.setHost(backend.getHost());
- replicaInfo.setBePort(backend.getBePort());
- replicaInfo.setHttpPort(backend.getHttpPort());
- replicaInfo.setBrpcPort(backend.getBrpcPort());
- replicaInfo.setReplicaId(replica.getId());
- replicaInfos.add(replicaInfo);
+ List<Backend> backends;
+ if (Config.isCloudMode()) {
+ CloudReplica cloudReplica = (CloudReplica) replica;
+ backends = cloudReplica.getAllPrimaryBes();
+ } else {
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException());
+ backends = Lists.newArrayList(backend);
+ }
+ for (Backend backend : backends) {
+ if (backend != null) {
+ TReplicaInfo replicaInfo = new TReplicaInfo();
+ replicaInfo.setHost(backend.getHost());
+ replicaInfo.setBePort(backend.getBePort());
+ replicaInfo.setHttpPort(backend.getHttpPort());
+ replicaInfo.setBrpcPort(backend.getBrpcPort());
+ replicaInfo.setReplicaId(replica.getId());
+ replicaInfos.add(replicaInfo);
+ }
}
}
tabletReplicaInfos.put(tabletId, replicaInfos);
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 150c07fcf9a..20de386b3eb 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -989,6 +989,21 @@ message PGetBeResourceResponse {
optional PGlobalResourceUsage global_be_resource_usage = 2;
}
+message PGetTabletRowsetsRequest {
+ optional int64 tablet_id = 1;
+ optional int64 version_start = 2;
+ optional int64 version_end = 3;
+
+ optional DeleteBitmapPB delete_bitmap_keys = 4;
+}
+
+message PGetTabletRowsetsResponse {
+ required PStatus status = 1;
+ repeated RowsetMetaPB rowsets = 2;
+
+ optional DeleteBitmapPB delete_bitmap = 3;
+}
+
service PBackendService {
rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@@ -1041,5 +1056,6 @@ service PBackendService {
rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns
(PJdbcTestConnectionResult);
rpc alter_vault_sync(PAlterVaultSyncRequest) returns
(PAlterVaultSyncResponse);
rpc get_be_resource(PGetBeResourceRequest) returns
(PGetBeResourceResponse);
+ rpc get_tablet_rowsets(PGetTabletRowsetsRequest) returns
(PGetTabletRowsetsResponse);
};
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out
new file mode 100644
index 00000000000..78964812ebf
Binary files /dev/null and
b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out
differ
diff --git a/regression-test/pipeline/p0/conf/be.conf
b/regression-test/pipeline/p0/conf/be.conf
index 5627f048905..9b0bc0f8e71 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -80,3 +80,5 @@ large_cumu_compaction_task_min_thread_num=3
# This feature has bug, so by default is false, only open it in pipeline to
observe
enable_parquet_page_index=true
+
+enable_fetch_rowsets_from_peer_replicas = true
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy
new file mode 100644
index 00000000000..e1c6669b11b
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.NodeType
+
+suite("test_cloud_version_already_merged", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+ def tblName = "test_cloud_version_already_merged"
+ sql """ DROP TABLE IF EXISTS ${tblName} FORCE; """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tblName} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1");
+ """
+
+ sql "insert into ${tblName} values(1,-1,-1,-1);"
+ sql "insert into ${tblName} values(2,-2,-2,-2);"
+ sql "insert into ${tblName} values(3,-3,-3,-3);"
+ sql "insert into ${tblName} values(4,-4,-4,-4)"
+ sql "insert into ${tblName} values(5,-5,-5,-5)"
+ sql "insert into ${tblName} values(1,1,1,1);"
+ sql "insert into ${tblName} values(2,2,2,2);"
+ sql "insert into ${tblName} values(3,3,3,3);"
+ sql "insert into ${tblName} values(4,4,4,4)"
+ sql "insert into ${tblName} values(5,5,5,5)"
+
+
+ sql "sync;"
+ qt_sql "select * from ${tblName} order by k1;"
+
+ def backends = sql_return_maparray('show backends')
+ def tabletStats = sql_return_maparray("show tablets from ${tblName};")
+ assert tabletStats.size() == 1
+ def tabletId = tabletStats[0].TabletId
+ def tabletBackendId = tabletStats[0].BackendId
+ def tabletBackend
+ for (def be : backends) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with
backendId=${tabletBackend.BackendId}");
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ try {
+ GetDebugPoint().enableDebugPoint(tabletBackend.Host,
tabletBackend.HttpPort as int, NodeType.BE,
"Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId,
skip_by_option: true])
+
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+
+ qt_sql """ SELECT * from ${tblName} ORDER BY k1 """
+
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure",
[tablet_id: tabletId])
+
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+
+ test {
+ sql """ SELECT * from ${tblName} ORDER BY k1 """
+ exception "version already merged, meet error during remote
capturing rowsets"
+ }
+
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+ try {
+ GetDebugPoint().enableDebugPoint(tabletBackend.Host,
tabletBackend.HttpPort as int, NodeType.BE,
"Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId,
skip_by_option: true])
+
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+
GetDebugPoint().enableDebugPointForAllBEs("GetRowsetCntl::start_req_bg.inject_failure");
+
+ test {
+ sql """ SELECT * from ${tblName} ORDER BY k1 """
+ exception "version already merged, meet error during remote
capturing rowsets"
+ }
+
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+ try {
+ GetDebugPoint().enableDebugPoint(tabletBackend.Host,
tabletBackend.HttpPort as int, NodeType.BE,
"Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId,
skip_by_option: true])
+
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+
GetDebugPoint().enableDebugPointForAllBEs("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail");
+
+ test {
+ sql """ SELECT * from ${tblName} ORDER BY k1 """
+ exception "version already merged, meet error during remote
capturing rowsets"
+ }
+
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]