This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 75d45605d59 [opt](rowset) Remote fetch rowsets to avoid -230 error
when capturing rowsets (#52995)
75d45605d59 is described below
commit 75d45605d590aff22e5901e1f01170381573f649
Author: Siyang Tang <[email protected]>
AuthorDate: Thu Oct 23 15:26:45 2025 +0800
[opt](rowset) Remote fetch rowsets to avoid -230 error when capturing
rowsets (#52995)
Related PR: #52440
In read-write splitting scenarios, some BE (Backend) nodes may have
already merged certain rowset versions, while another BE still attempts
to capture or access those rowsets.
When this happens, the BE reports error E-230 (versions already merged),
causing data access or synchronization to fail.
This PR introduces a remote rowset fetching mechanism, allowing a BE
that lacks the required rowset to fetch it from other BE nodes, instead
of failing with E-230.
- Added a remote fetch mechanism in the rowset management layer:
When a BE detects that a rowset is missing locally but has already been
merged, it will try to fetch the rowset from other BE nodes.
- Updated version and state checking logic to correctly identify the
“merged but missing” condition.
- Adjusted the rowset access path to trigger remote fetch rather than
throwing an immediate error.
- Added tests (unit/integration) to cover the new logic where
applicable.
- Ensured backward compatibility: If the BE already has the rowset
locally or read-write splitting is not enabled, the behavior remains
unchanged.
### Release note
Introduce a remote rowset fetching mechanism to prevent E-230 (“versions
already merged”) errors in read-write splitting scenarios.
This improves BE fault tolerance when some nodes have merged versions
that others have not yet synchronized.
---
be/src/cloud/cloud_full_compaction.cpp | 5 +-
be/src/cloud/cloud_schema_change_job.cpp | 18 +-
be/src/cloud/cloud_tablet.cpp | 107 ++---
be/src/cloud/cloud_tablet.h | 28 +-
be/src/cloud/cloud_tablet_mgr.cpp | 17 +-
be/src/cloud/cloud_tablet_mgr.h | 2 +-
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/olap/base_tablet.cpp | 41 +-
be/src/olap/base_tablet.h | 85 ++--
be/src/olap/full_compaction.cpp | 5 +-
be/src/olap/merger.cpp | 8 +-
be/src/olap/parallel_scanner_builder.cpp | 43 +-
be/src/olap/parallel_scanner_builder.h | 9 +-
be/src/olap/rowset/rowset_reader_context.h | 2 +-
be/src/olap/rowset_version_mgr.cpp | 449 +++++++++++++++++++++
be/src/olap/schema_change.cpp | 17 +-
be/src/olap/snapshot_manager.cpp | 19 +-
be/src/olap/tablet.cpp | 99 ++---
be/src/olap/tablet.h | 13 +-
be/src/olap/tablet_meta.cpp | 45 +++
be/src/olap/tablet_meta.h | 12 +
be/src/olap/tablet_reader.cpp | 15 +-
be/src/olap/tablet_reader.h | 16 +-
be/src/olap/task/engine_checksum_task.cpp | 12 +-
be/src/olap/task/engine_storage_migration_task.cpp | 7 +-
be/src/pipeline/exec/olap_scan_operator.cpp | 17 +-
be/src/pipeline/exec/olap_scan_operator.h | 2 +-
be/src/service/internal_service.cpp | 68 ++++
be/src/service/internal_service.h | 5 +
be/src/vec/exec/scan/olap_scanner.cpp | 41 +-
be/src/vec/exec/scan/olap_scanner.h | 2 +-
.../cloud/cloud_tablet_query_prefer_cache_test.cpp | 14 +-
.../cloud_tablet_query_with_tolerance_test.cpp | 12 +-
be/test/olap/segcompaction_mow_test.cpp | 3 +-
.../apache/doris/cloud/catalog/CloudReplica.java | 16 +
.../apache/doris/service/FrontendServiceImpl.java | 47 ++-
gensrc/proto/internal_service.proto | 16 +
.../cloud/test_cloud_version_already_merged.out | 15 +
regression-test/pipeline/p0/conf/be.conf | 1 +
.../cloud/test_cloud_version_already_merged.groovy | 142 +++++++
41 files changed, 1095 insertions(+), 382 deletions(-)
diff --git a/be/src/cloud/cloud_full_compaction.cpp
b/be/src/cloud/cloud_full_compaction.cpp
index 382360bed3a..15160cfabd4 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -349,8 +349,9 @@ Status
CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
int64_t max_version = cloud_tablet()->max_version().second;
DCHECK(max_version >= _output_rowset->version().second);
if (max_version > _output_rowset->version().second) {
- RETURN_IF_ERROR(cloud_tablet()->capture_consistent_rowsets_unlocked(
- {_output_rowset->version().second + 1, max_version},
&tmp_rowsets));
+ auto ret =
DORIS_TRY(cloud_tablet()->capture_consistent_rowsets_unlocked(
+ {_output_rowset->version().second + 1, max_version},
CaptureRowsetOps {}));
+ tmp_rowsets = std::move(ret.rowsets);
}
for (const auto& it : tmp_rowsets) {
int64_t cur_version = it->rowset_meta()->start_version();
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index 7b78033b8e3..df59a78a583 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -150,7 +150,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const
TAlterTabletReqV2& reque
// [0-1] is a placeholder rowset, no need to convert
RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2,
start_resp.alter_version()},
&rs_splits,
-
{.skip_missing_version = false,
+
{.skip_missing_versions = false,
.enable_prefer_cached_rowset = false,
.query_freshness_tolerance_ms = -1}));
}
@@ -199,7 +199,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const
TAlterTabletReqV2& reque
reader_context.sequence_id_idx =
reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS;
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
- reader_context.delete_bitmap =
&_base_tablet->tablet_meta()->delete_bitmap();
+ reader_context.delete_bitmap =
_base_tablet->tablet_meta()->delete_bitmap_ptr();
reader_context.version = Version(0, start_resp.alter_version());
std::vector<uint32_t> cluster_key_idxes;
if (!_base_tablet_schema->cluster_key_uids().empty()) {
@@ -509,22 +509,21 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
}
// step 1, process incremental rowset without delete bitmap update lock
- std::vector<RowsetSharedPtr> incremental_rowsets;
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
int64_t max_version = tmp_tablet->max_version().second;
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets without lock, version: " <<
start_calc_delete_bitmap_version
<< "-" << max_version << " new_table_id: " <<
_new_tablet->tablet_id();
if (max_version >= start_calc_delete_bitmap_version) {
- RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
- {start_calc_delete_bitmap_version, max_version},
&incremental_rowsets));
+ auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
+ {start_calc_delete_bitmap_version, max_version},
CaptureRowsetOps {}));
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
DBUG_BLOCK);
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
}
- for (auto rowset : incremental_rowsets) {
+ for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet,
rowset));
}
}
@@ -540,15 +539,14 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
<< "incremental rowsets with lock, version: " << max_version + 1
<< "-"
<< new_max_version << " new_tablet_id: " <<
_new_tablet->tablet_id();
- std::vector<RowsetSharedPtr> new_incremental_rowsets;
if (new_max_version > max_version) {
- RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
- {max_version + 1, new_max_version}, &new_incremental_rowsets));
+ auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
+ {max_version + 1, new_max_version}, CaptureRowsetOps {}));
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
}
- for (auto rowset : new_incremental_rowsets) {
+ for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet,
rowset));
}
}
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index ba404d9138b..913f8127bd0 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -47,6 +47,7 @@
#include "cpp/sync_point.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
+#include "olap/base_tablet.h"
#include "olap/compaction.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_define.h"
@@ -146,83 +147,53 @@ std::string CloudTablet::tablet_path() const {
return "";
}
-Status CloudTablet::capture_consistent_rowsets_unlocked(
- const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets)
const {
- Versions version_path;
- auto st =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
&version_path);
- if (!st.ok()) {
- // Check no missed versions or req version is merged
- auto missed_versions = get_missed_versions(spec_version.second);
- if (missed_versions.empty()) {
- st.set_code(VERSION_ALREADY_MERGED); // Reset error code
- }
- st.append(" tablet_id=" + std::to_string(tablet_id()));
- return st;
- }
- VLOG_DEBUG << "capture consitent versions: " << version_path;
- return _capture_consistent_rowsets_unlocked(version_path, rowsets);
-}
-
Status CloudTablet::capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
- const CaptureRsReaderOptions& opts) {
- if (opts.query_freshness_tolerance_ms > 0) {
- return capture_rs_readers_with_freshness_tolerance(spec_version,
rs_splits,
-
opts.query_freshness_tolerance_ms);
- } else if (opts.enable_prefer_cached_rowset &&
!enable_unique_key_merge_on_write()) {
- return capture_rs_readers_prefer_cache(spec_version, rs_splits);
- }
- return capture_rs_readers_internal(spec_version, rs_splits);
-}
-
-Status CloudTablet::capture_rs_readers_internal(const Version& spec_version,
- std::vector<RowSetSplits>*
rs_splits) {
+ const CaptureRowsetOps& opts) {
DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
LOG_WARNING("CloudTablet.capture_rs_readers.return
e-230").tag("tablet_id", tablet_id());
return Status::Error<false>(-230, "injected error");
});
- Versions version_path;
std::shared_lock rlock(_meta_lock);
- auto st =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
&version_path);
- if (!st.ok()) {
- rlock.unlock(); // avoid logging in lock range
- // Check no missed versions or req version is merged
- auto missed_versions = get_missed_versions(spec_version.second);
- if (missed_versions.empty()) {
- st.set_code(VERSION_ALREADY_MERGED); // Reset error code
- st.append(" versions are already compacted, ");
- }
- st.append(" tablet_id=" + std::to_string(tablet_id()));
- // clang-format off
- LOG(WARNING) << st << '\n' << [this]() { std::string json;
get_compaction_status(&json); return json; }();
- // clang-format on
- return st;
+ *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
+ spec_version, CaptureRowsetOps {.skip_missing_versions =
opts.skip_missing_versions}));
+ return Status::OK();
+}
+
+[[nodiscard]] Result<std::vector<Version>>
CloudTablet::capture_consistent_versions_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options) const {
+ if (options.query_freshness_tolerance_ms > 0) {
+ return capture_versions_with_freshness_tolerance(version_range,
options);
+ } else if (options.enable_prefer_cached_rowset &&
!enable_unique_key_merge_on_write()) {
+ return capture_versions_prefer_cache(version_range);
}
- VLOG_DEBUG << "capture consitent versions: " << version_path;
- return capture_rs_readers_unlocked(version_path, rs_splits);
+ return BaseTablet::capture_consistent_versions_unlocked(version_range,
options);
}
-Status CloudTablet::capture_rs_readers_prefer_cache(const Version&
spec_version,
- std::vector<RowSetSplits>*
rs_splits) {
+Result<std::vector<Version>> CloudTablet::capture_versions_prefer_cache(
+ const Version& spec_version) const {
g_capture_prefer_cache_count << 1;
Versions version_path;
std::shared_lock rlock(_meta_lock);
-
RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_prefer_cache(
+ auto st =
_timestamped_version_tracker.capture_consistent_versions_prefer_cache(
spec_version, version_path,
- [&](int64_t start, int64_t end) { return
rowset_is_warmed_up_unlocked(start, end); }));
+ [&](int64_t start, int64_t end) { return
rowset_is_warmed_up_unlocked(start, end); });
+ if (!st.ok()) {
+ return ResultError(st);
+ }
int64_t path_max_version = version_path.back().second;
VLOG_DEBUG << fmt::format(
- "[verbose] CloudTablet::capture_rs_readers_prefer_cache, capture
path: {}, "
+ "[verbose] CloudTablet::capture_versions_prefer_cache, capture
path: {}, "
"tablet_id={}, spec_version={}, path_max_version={}",
fmt::join(version_path | std::views::transform([](const auto&
version) {
return fmt::format("{}", version.to_string());
}),
", "),
tablet_id(), spec_version.to_string(), path_max_version);
- return capture_rs_readers_unlocked(version_path, rs_splits);
+ return version_path;
}
-bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t
end_version) {
+bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t
end_version) const {
if (start_version > end_version) {
return false;
}
@@ -247,11 +218,11 @@ bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t
start_version, int64_t en
return is_rowset_warmed_up(rs->rowset_id());
};
-Status CloudTablet::capture_rs_readers_with_freshness_tolerance(
- const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
- int64_t query_freshness_tolerance_ms) {
+Result<std::vector<Version>>
CloudTablet::capture_versions_with_freshness_tolerance(
+ const Version& spec_version, const CaptureRowsetOps& options) const {
g_capture_with_freshness_tolerance_count << 1;
using namespace std::chrono;
+ auto query_freshness_tolerance_ms = options.query_freshness_tolerance_ms;
auto freshness_limit_tp = system_clock::now() -
milliseconds(query_freshness_tolerance_ms);
// find a version path where every edge(rowset) has been warmuped
Versions version_path;
@@ -259,15 +230,17 @@ Status
CloudTablet::capture_rs_readers_with_freshness_tolerance(
if (enable_unique_key_merge_on_write()) {
// For merge-on-write table, newly generated delete bitmap marks will
be on the rowsets which are in newest layout.
// So we can ony capture rowsets which are in newest data layout.
Otherwise there may be data correctness issue.
-
RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_with_validator_mow(
- spec_version, version_path, [&](int64_t start, int64_t end) {
- return rowset_is_warmed_up_unlocked(start, end);
- }));
+ RETURN_IF_ERROR_RESULT(
+
_timestamped_version_tracker.capture_consistent_versions_with_validator_mow(
+ spec_version, version_path, [&](int64_t start, int64_t
end) {
+ return rowset_is_warmed_up_unlocked(start, end);
+ }));
} else {
-
RETURN_IF_ERROR(_timestamped_version_tracker.capture_consistent_versions_with_validator(
- spec_version, version_path, [&](int64_t start, int64_t end) {
- return rowset_is_warmed_up_unlocked(start, end);
- }));
+ RETURN_IF_ERROR_RESULT(
+
_timestamped_version_tracker.capture_consistent_versions_with_validator(
+ spec_version, version_path, [&](int64_t start, int64_t
end) {
+ return rowset_is_warmed_up_unlocked(start, end);
+ }));
}
int64_t path_max_version = version_path.back().second;
auto should_be_visible_but_not_warmed_up = [&](const auto& rs_meta) ->
bool {
@@ -309,17 +282,17 @@ Status
CloudTablet::capture_rs_readers_with_freshness_tolerance(
g_capture_with_freshness_tolerance_fallback_count << 1;
// if there exists a rowset which satisfies freshness tolerance and
its start version is larger than the path max version
// but has not been warmuped up yet, fallback to capture rowsets as
usual
- return capture_rs_readers_internal(spec_version, rs_splits);
+ return BaseTablet::capture_consistent_versions_unlocked(spec_version,
options);
}
VLOG_DEBUG << fmt::format(
- "[verbose]
CloudTablet::capture_rs_readers_with_freshness_tolerance, capture path: {}, "
+ "[verbose] CloudTablet::capture_versions_with_freshness_tolerance,
capture path: {}, "
"tablet_id={}, spec_version={}, path_max_version={}",
fmt::join(version_path | std::views::transform([](const auto&
version) {
return fmt::format("{}", version.to_string());
}),
", "),
tablet_id(), spec_version.to_string(), path_max_version);
- return capture_rs_readers_unlocked(version_path, rs_splits);
+ return version_path;
}
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index e485a401221..7b35b7c4d3f 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -81,18 +81,18 @@ public:
bool vertical)
override;
Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
- const CaptureRsReaderOptions& opts) override;
- Status capture_rs_readers_internal(const Version& spec_version,
- std::vector<RowSetSplits>* rs_splits);
+ const CaptureRowsetOps& opts) override;
- // Capture rowset readers with cache preference optimization.
+ [[nodiscard]] Result<std::vector<Version>>
capture_consistent_versions_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options)
const override;
+
+ // Capture versions with cache preference optimization.
// This method prioritizes using cached/warmed-up rowsets when building
version paths,
// avoiding cold data reads when possible. It uses
capture_consistent_versions_prefer_cache
// to find a consistent version path that prefers already warmed-up
rowsets.
- Status capture_rs_readers_prefer_cache(const Version& spec_version,
- std::vector<RowSetSplits>*
rs_splits);
+ Result<std::vector<Version>> capture_versions_prefer_cache(const Version&
spec_version) const;
- // Capture rowset readers with query freshness tolerance.
+ // Capture versions with query freshness tolerance.
// This method finds a consistent version path where all rowsets are
warmed up,
// but allows fallback to normal capture if there are newer rowsets that
should be
// visible (based on freshness tolerance) but haven't been warmed up yet.
@@ -102,16 +102,12 @@ public:
// data hasn't been warmed up yet. This can cause different tablets in the
same query
// to read from different versions, potentially leading to inconsistent
query results.
//
- // @param query_freshness_tolerance_ms: Time tolerance in milliseconds.
Rowsets that
+ // @param options.query_freshness_tolerance_ms: Time tolerance in
milliseconds. Rowsets that
// became visible within this time range (after current_time -
query_freshness_tolerance_ms)
// can be skipped if not warmed up. However, if older rowsets
(before this time point)
// are not warmed up, the method will fallback to normal capture.
- Status capture_rs_readers_with_freshness_tolerance(const Version&
spec_version,
-
std::vector<RowSetSplits>* rs_splits,
- int64_t
query_freshness_tolerance_ms);
-
- Status capture_consistent_rowsets_unlocked(
- const Version& spec_version, std::vector<RowsetSharedPtr>*
rowsets) const override;
+ Result<std::vector<Version>> capture_versions_with_freshness_tolerance(
+ const Version& spec_version, const CaptureRowsetOps& options)
const;
size_t tablet_footprint() override {
return _approximate_data_size.load(std::memory_order_relaxed);
@@ -352,7 +348,7 @@ public:
void add_warmed_up_rowset(const RowsetId& rowset_id);
- std::string rowset_warmup_digest() {
+ std::string rowset_warmup_digest() const {
std::string res;
auto add_log = [&](const RowsetSharedPtr& rs) {
auto tmp = fmt::format("{}{}", rs->rowset_id().to_string(),
rs->version().to_string());
@@ -382,7 +378,7 @@ private:
std::chrono::steady_clock::time_point start_tp =
std::chrono::steady_clock::now());
// used by capture_rs_reader_xxx functions
- bool rowset_is_warmed_up_unlocked(int64_t start_version, int64_t
end_version);
+ bool rowset_is_warmed_up_unlocked(int64_t start_version, int64_t
end_version) const;
CloudStorageEngine& _engine;
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp
b/be/src/cloud/cloud_tablet_mgr.cpp
index d5414b6bac0..28f4b60e1da 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -159,7 +159,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) {
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t
tablet_id, bool warmup_data,
bool
sync_delete_bitmap,
SyncRowsetStats* sync_stats,
- bool
force_use_cache) {
+ bool
local_only) {
// LRU value type. `Value`'s lifetime MUST NOT be longer than
`CloudTabletMgr`
class Value : public LRUCacheValueBase {
public:
@@ -177,12 +177,17 @@ Result<std::shared_ptr<CloudTablet>>
CloudTabletMgr::get_tablet(int64_t tablet_i
CacheKey key(tablet_id_str);
auto* handle = _cache->lookup(key);
- if (handle == nullptr && force_use_cache) {
- return ResultError(
- Status::InternalError("failed to get cloud tablet from cache
{}", tablet_id));
- }
-
if (handle == nullptr) {
+ if (local_only) {
+ LOG(INFO) << "tablet=" << tablet_id
+ << "does not exists in local tablet cache, because param
local_only=true, "
+ "treat it as an error";
+ return ResultError(Status::InternalError(
+ "tablet={} does not exists in local tablet cache, because
param "
+ "local_only=true, "
+ "treat it as an error",
+ tablet_id));
+ }
if (sync_stats) {
++sync_stats->tablet_meta_cache_miss;
}
diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h
index ee0b807602d..d7dde2134ac 100644
--- a/be/src/cloud/cloud_tablet_mgr.h
+++ b/be/src/cloud/cloud_tablet_mgr.h
@@ -47,7 +47,7 @@ public:
Result<std::shared_ptr<CloudTablet>> get_tablet(int64_t tablet_id, bool
warmup_data = false,
bool sync_delete_bitmap =
true,
SyncRowsetStats*
sync_stats = nullptr,
- bool force_use_cache =
false);
+ bool local_only = false);
void erase_tablet(int64_t tablet_id);
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 97e75d8b327..5a53acbeace 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1574,6 +1574,7 @@
DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false");
DEFINE_mBool(enable_update_delete_bitmap_kv_check_core, "false");
+DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, "false");
// the max length of segments key bounds, in bytes
// ATTENTION: as long as this conf has ever been enabled, cluster downgrade
and backup recovery will no longer be supported.
DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4564b01d1ba..3297bfea8eb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1630,6 +1630,7 @@
DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently);
DECLARE_mBool(enable_update_delete_bitmap_kv_check_core);
+DECLARE_mBool(enable_fetch_rowsets_from_peer_replicas);
// the max length of segments key bounds, in bytes
// ATTENTION: as long as this conf has ever been enabled, cluster downgrade
and backup recovery will no longer be supported.
DECLARE_mInt32(segments_key_bounds_truncation_threshold);
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 00a7db1d982..9cde2bbdf45 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -1342,37 +1342,6 @@ void BaseTablet::_rowset_ids_difference(const
RowsetIdUnorderedSet& cur,
}
}
-Status BaseTablet::_capture_consistent_rowsets_unlocked(
- const std::vector<Version>& version_path,
std::vector<RowsetSharedPtr>* rowsets) const {
- DCHECK(rowsets != nullptr);
- rowsets->reserve(version_path.size());
- for (const auto& version : version_path) {
- bool is_find = false;
- do {
- auto it = _rs_version_map.find(version);
- if (it != _rs_version_map.end()) {
- is_find = true;
- rowsets->push_back(it->second);
- break;
- }
-
- auto it_expired = _stale_rs_version_map.find(version);
- if (it_expired != _stale_rs_version_map.end()) {
- is_find = true;
- rowsets->push_back(it_expired->second);
- break;
- }
- } while (false);
-
- if (!is_find) {
- return Status::Error<CAPTURE_ROWSET_ERROR>(
- "fail to find Rowset for version. tablet={}, version={}",
tablet_id(),
- version.to_string());
- }
- }
- return Status::OK();
-}
-
Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr
delete_bitmap,
int64_t max_version,
int64_t txn_id,
const RowsetIdUnorderedSet&
rowset_ids,
@@ -2143,6 +2112,16 @@ void BaseTablet::get_base_rowset_delete_bitmap_count(
}
}
+void TabletReadSource::fill_delete_predicates() {
+ DCHECK_EQ(delete_predicates.size(), 0);
+ auto delete_pred_view =
+ rs_splits | std::views::transform([](auto&& split) {
+ return split.rs_reader->rowset()->rowset_meta();
+ }) |
+ std::views::filter([](const auto& rs_meta) { return
rs_meta->has_delete_predicate(); });
+ delete_predicates = {delete_pred_view.begin(), delete_pred_view.end()};
+}
+
int32_t BaseTablet::max_version_config() {
int32_t max_version = tablet_meta()->compaction_policy() ==
CUMULATIVE_TIME_SERIES_POLICY
?
std::max(config::time_series_max_tablet_version_num,
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 3e1dbd82053..d2762a8a53e 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -43,6 +43,10 @@ class CalcDeleteBitmapToken;
class SegmentCacheHandle;
class RowIdConversion;
struct PartialUpdateInfo;
+class PartialUpdateReadPlan;
+struct CaptureRowsetOps;
+struct CaptureRowsetResult;
+struct TabletReadSource;
class FixedReadPlan;
struct TabletWithVersion {
@@ -50,28 +54,6 @@ struct TabletWithVersion {
int64_t version;
};
-struct CaptureRsReaderOptions {
- // Used by local mode only.
- // If true, allows skipping missing versions during rowset capture.
- // This can be useful when some versions are temporarily unavailable.
- bool skip_missing_version {false};
-
- // ======== only take effect in cloud mode ========
-
- // Enable preference for cached/warmed-up rowsets when building version
paths.
- // When enabled, the capture process will prioritize already cached rowsets
- // to avoid cold data reads and improve query performance.
- bool enable_prefer_cached_rowset {false};
-
- // Query freshness tolerance in milliseconds.
- // Defines the time window for considering data as "fresh enough".
- // Rowsets that became visible within this time range can be skipped if
not warmed up,
- // but older rowsets (before current_time - query_freshness_tolerance_ms)
that are
- // not warmed up will trigger fallback to normal capture.
- // Set to -1 to disable freshness tolerance checking.
- int64_t query_freshness_tolerance_ms {-1};
-};
-
enum class CompactionStage { NOT_SCHEDULED, PENDING, EXECUTING };
// Base class for all tablet classes
@@ -130,12 +112,9 @@ public:
virtual Result<std::unique_ptr<RowsetWriter>>
create_rowset_writer(RowsetWriterContext& context,
bool
vertical) = 0;
- virtual Status capture_consistent_rowsets_unlocked(
- const Version& spec_version, std::vector<RowsetSharedPtr>*
rowsets) const = 0;
-
virtual Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
- const CaptureRsReaderOptions& opts) = 0;
+ const CaptureRowsetOps& opts) = 0;
virtual size_t tablet_footprint() = 0;
@@ -324,7 +303,7 @@ public:
}
void traverse_rowsets_unlocked(std::function<void(const RowsetSharedPtr&)>
visitor,
- bool include_stale = false) {
+ bool include_stale = false) const {
for (auto& [v, rs] : _rs_version_map) {
visitor(rs);
}
@@ -353,6 +332,18 @@ public:
void prefill_dbm_agg_cache(const RowsetSharedPtr& rowset, int64_t version);
void prefill_dbm_agg_cache_after_compaction(const RowsetSharedPtr&
output_rowset);
+ [[nodiscard]] Result<CaptureRowsetResult>
capture_consistent_rowsets_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options)
const;
+
+ [[nodiscard]] virtual Result<std::vector<Version>>
capture_consistent_versions_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options)
const;
+
+ [[nodiscard]] Result<std::vector<RowSetSplits>>
capture_rs_readers_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options)
const;
+
+ [[nodiscard]] Result<TabletReadSource> capture_read_source(const Version&
version_range,
+ const
CaptureRowsetOps& options);
+
protected:
// Find the missed versions until the spec_version.
//
@@ -370,11 +361,10 @@ protected:
const RowsetIdUnorderedSet& pre,
RowsetIdUnorderedSet* to_add,
RowsetIdUnorderedSet* to_del);
- Status _capture_consistent_rowsets_unlocked(const std::vector<Version>&
version_path,
- std::vector<RowsetSharedPtr>*
rowsets) const;
-
Status sort_block(vectorized::Block& in_block, vectorized::Block&
output_block);
+ Result<CaptureRowsetResult> _remote_capture_rowsets(const Version&
version_range) const;
+
mutable std::shared_mutex _meta_lock;
TimestampedVersionTracker _timestamped_version_tracker;
// After version 0.13, all newly created rowsets are saved in
_rs_version_map.
@@ -413,4 +403,39 @@ public:
Status last_compaction_status = Status::OK();
};
+struct CaptureRowsetOps {
+ bool skip_missing_versions = false;
+ bool quiet = false;
+ bool include_stale_rowsets = true;
+ bool enable_fetch_rowsets_from_peers = false;
+
+ // ======== only take effect in cloud mode ========
+
+ // Enable preference for cached/warmed-up rowsets when building version
paths.
+ // When enabled, the capture process will prioritize already cached rowsets
+ // to avoid cold data reads and improve query performance.
+ bool enable_prefer_cached_rowset {false};
+
+ // Query freshness tolerance in milliseconds.
+ // Defines the time window for considering data as "fresh enough".
+ // Rowsets that became visible within this time range can be skipped if
not warmed up,
+ // but older rowsets (before current_time - query_freshness_tolerance_ms)
that are
+ // not warmed up will trigger fallback to normal capture.
+ // Set to -1 to disable freshness tolerance checking.
+ int64_t query_freshness_tolerance_ms {-1};
+};
+
+struct CaptureRowsetResult {
+ std::vector<RowsetSharedPtr> rowsets;
+ std::shared_ptr<DeleteBitmap> delete_bitmap;
+};
+
+struct TabletReadSource {
+ std::vector<RowSetSplits> rs_splits;
+ std::vector<RowsetMetaSharedPtr> delete_predicates;
+ std::shared_ptr<DeleteBitmap> delete_bitmap;
+ // Fill delete predicates with `rs_splits`
+ void fill_delete_predicates();
+};
+
} /* namespace doris */
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 9f2c746ac92..38d2a3fd97a 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -140,8 +140,9 @@ Status FullCompaction::modify_rowsets() {
int64_t max_version = tablet()->max_version().second;
DCHECK(max_version >= _output_rowset->version().second);
if (max_version > _output_rowset->version().second) {
- RETURN_IF_ERROR(_tablet->capture_consistent_rowsets_unlocked(
- {_output_rowset->version().second + 1, max_version},
&tmp_rowsets));
+ auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked(
+ {_output_rowset->version().second + 1, max_version},
CaptureRowsetOps {}));
+ tmp_rowsets = std::move(ret.rowsets);
}
for (const auto& it : tmp_rowsets) {
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 5e72bec59c0..62077b9dd7e 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -73,7 +73,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet,
ReaderType reader_type,
reader_params.tablet = tablet;
reader_params.reader_type = reader_type;
- TabletReader::ReadSource read_source;
+ TabletReadSource read_source;
read_source.rs_splits.reserve(src_rowset_readers.size());
for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
read_source.rs_splits.emplace_back(rs_reader);
@@ -92,7 +92,7 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet,
ReaderType reader_type,
}
reader_params.tablet_schema = merge_tablet_schema;
if (!tablet->tablet_schema()->cluster_key_uids().empty()) {
- reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
+ reader_params.delete_bitmap =
tablet->tablet_meta()->delete_bitmap_ptr();
}
if (stats_output && stats_output->rowid_conversion) {
@@ -257,7 +257,7 @@ Status Merger::vertical_compact_one_group(
reader_params.tablet = tablet;
reader_params.reader_type = reader_type;
- TabletReader::ReadSource read_source;
+ TabletReadSource read_source;
read_source.rs_splits.reserve(src_rowset_readers.size());
for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
read_source.rs_splits.emplace_back(rs_reader);
@@ -277,7 +277,7 @@ Status Merger::vertical_compact_one_group(
reader_params.tablet_schema = merge_tablet_schema;
bool has_cluster_key = false;
if (!tablet->tablet_schema()->cluster_key_uids().empty()) {
- reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
+ reader_params.delete_bitmap =
tablet->tablet_meta()->delete_bitmap_ptr();
has_cluster_key = true;
}
diff --git a/be/src/olap/parallel_scanner_builder.cpp
b/be/src/olap/parallel_scanner_builder.cpp
index b0db2b81e17..48fe50469cc 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -23,6 +23,7 @@
#include "cloud/cloud_tablet_hotspot.h"
#include "cloud/config.h"
#include "common/status.h"
+#include "olap/base_tablet.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/segment_loader.h"
#include "pipeline/exec/olap_scan_operator.h"
@@ -60,7 +61,7 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>&
// `rs_splits` in `entire read source` will be devided into several
partitial read sources
// to build several parallel scanners, based on segment rows number.
All the partitial read sources
// share the same delete predicates from their corresponding entire
read source.
- TabletReader::ReadSource partitial_read_source;
+ TabletReadSource partitial_read_source;
int64_t rows_collected = 0;
for (auto& rs_split : entire_read_source.rs_splits) {
auto reader = rs_split.rs_reader;
@@ -109,10 +110,11 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>&
partitial_read_source.rs_splits.emplace_back(std::move(split));
- scanners.emplace_back(
- _build_scanner(tablet, version, _key_ranges,
-
{std::move(partitial_read_source.rs_splits),
-
entire_read_source.delete_predicates}));
+ scanners.emplace_back(_build_scanner(
+ tablet, version, _key_ranges,
+ {.rs_splits =
std::move(partitial_read_source.rs_splits),
+ .delete_predicates =
entire_read_source.delete_predicates,
+ .delete_bitmap =
entire_read_source.delete_bitmap}));
partitial_read_source = {};
split = RowSetSplits(reader->clone());
@@ -153,9 +155,11 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>&
split.segment_offsets.second -
split.segment_offsets.first);
}
#endif
- scanners.emplace_back(_build_scanner(tablet, version, _key_ranges,
-
{std::move(partitial_read_source.rs_splits),
-
entire_read_source.delete_predicates}));
+ scanners.emplace_back(
+ _build_scanner(tablet, version, _key_ranges,
+ {.rs_splits =
std::move(partitial_read_source.rs_splits),
+ .delete_predicates =
entire_read_source.delete_predicates,
+ .delete_bitmap =
entire_read_source.delete_bitmap}));
}
}
@@ -179,7 +183,7 @@ Status
ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
}
// Collect segments into scanners based on rows count instead of one
scanner per segment
- TabletReader::ReadSource partitial_read_source;
+ TabletReadSource partitial_read_source;
int64_t rows_collected = 0;
for (auto& rs_split : entire_read_source.rs_splits) {
@@ -211,13 +215,14 @@ Status
ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
partitial_read_source.rs_splits.emplace_back(std::move(split));
- scanners.emplace_back(
- _build_scanner(tablet, version, _key_ranges,
-
{std::move(partitial_read_source.rs_splits),
-
entire_read_source.delete_predicates}));
+ scanners.emplace_back(_build_scanner(
+ tablet, version, _key_ranges,
+ {.rs_splits =
std::move(partitial_read_source.rs_splits),
+ .delete_predicates =
entire_read_source.delete_predicates,
+ .delete_bitmap =
entire_read_source.delete_bitmap}));
// Reset for next scanner
- partitial_read_source = TabletReader::ReadSource();
+ partitial_read_source = {};
split = RowSetSplits(reader->clone());
segment_start = i;
rows_collected = 0;
@@ -245,9 +250,11 @@ Status
ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
DCHECK_LT(split.segment_offsets.first,
split.segment_offsets.second);
}
#endif
- scanners.emplace_back(_build_scanner(tablet, version, _key_ranges,
-
{std::move(partitial_read_source.rs_splits),
-
entire_read_source.delete_predicates}));
+ scanners.emplace_back(
+ _build_scanner(tablet, version, _key_ranges,
+ {.rs_splits =
std::move(partitial_read_source.rs_splits),
+ .delete_predicates =
entire_read_source.delete_predicates,
+ .delete_bitmap =
entire_read_source.delete_bitmap}));
}
}
@@ -289,7 +296,7 @@ Status ParallelScannerBuilder::_load() {
std::shared_ptr<OlapScanner> ParallelScannerBuilder::_build_scanner(
BaseTabletSPtr tablet, int64_t version, const
std::vector<OlapScanRange*>& key_ranges,
- TabletReader::ReadSource&& read_source) {
+ TabletReadSource&& read_source) {
OlapScanner::Params params {_state, _scanner_profile.get(), key_ranges,
std::move(tablet),
version, std::move(read_source), _limit,
_is_preaggregation};
return OlapScanner::create_shared(_parent, std::move(params));
diff --git a/be/src/olap/parallel_scanner_builder.h
b/be/src/olap/parallel_scanner_builder.h
index 95d7841e34a..7c57711bc70 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -22,6 +22,7 @@
#include <unordered_map>
#include <utility>
+#include "olap/base_tablet.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/segment_v2/row_ranges.h"
#include "olap/segment_loader.h"
@@ -44,7 +45,7 @@ class ParallelScannerBuilder {
public:
ParallelScannerBuilder(pipeline::OlapScanLocalState* parent,
const std::vector<TabletWithVersion>& tablets,
- std::vector<TabletReader::ReadSource>& read_sources,
+ std::vector<TabletReadSource>& read_sources,
const std::shared_ptr<RuntimeProfile>& profile,
const std::vector<OlapScanRange*>& key_ranges,
RuntimeState* state,
int64_t limit, bool is_dup_mow_key, bool
is_preaggregation)
@@ -78,7 +79,7 @@ private:
std::shared_ptr<vectorized::OlapScanner> _build_scanner(
BaseTabletSPtr tablet, int64_t version, const
std::vector<OlapScanRange*>& key_ranges,
- TabletReader::ReadSource&& read_source);
+ TabletReadSource&& read_source);
pipeline::OlapScanLocalState* _parent;
@@ -111,8 +112,8 @@ private:
bool _is_preaggregation;
std::vector<TabletWithVersion> _tablets;
std::vector<OlapScanRange*> _key_ranges;
- std::unordered_map<int64_t, TabletReader::ReadSource> _all_read_sources;
- std::vector<TabletReader::ReadSource>& _read_sources;
+ std::unordered_map<int64_t, TabletReadSource> _all_read_sources;
+ std::vector<TabletReadSource>& _read_sources;
};
} // namespace doris
diff --git a/be/src/olap/rowset/rowset_reader_context.h
b/be/src/olap/rowset/rowset_reader_context.h
index 19650ed30f0..951679f413b 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -76,7 +76,7 @@ struct RowsetReaderContext {
uint64_t* merged_rows = nullptr;
// for unique key merge on write
bool enable_unique_key_merge_on_write = false;
- const DeleteBitmap* delete_bitmap = nullptr;
+ DeleteBitmapPtr delete_bitmap = nullptr;
bool record_rowids = false;
RowIdConversion* rowid_conversion = nullptr;
bool is_key_column_group = false;
diff --git a/be/src/olap/rowset_version_mgr.cpp
b/be/src/olap/rowset_version_mgr.cpp
new file mode 100644
index 00000000000..bb815ce859a
--- /dev/null
+++ b/be/src/olap/rowset_version_mgr.cpp
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <brpc/controller.h>
+#include <bthread/bthread.h>
+#include <bthread/countdown_event.h>
+#include <bthread/mutex.h>
+#include <bthread/types.h>
+#include <bvar/latency_recorder.h>
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/HeartbeatService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <glog/logging.h>
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <ranges>
+#include <sstream>
+#include <utility>
+
+#include "cloud/config.h"
+#include "common/status.h"
+#include "cpp/sync_point.h"
+#include "olap/base_tablet.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_reader.h"
+#include "runtime/client_cache.h"
+#include "service/backend_options.h"
+#include "service/internal_service.h"
+#include "util/brpc_client_cache.h"
+#include "util/debug_points.h"
+#include "util/thrift_rpc_helper.h"
+#include "util/time.h"
+
+namespace doris {
+
+using namespace ErrorCode;
+using namespace std::ranges;
+
+static bvar::LatencyRecorder
g_remote_fetch_tablet_rowsets_single_request_latency(
+ "remote_fetch_rowsets_single_rpc");
+static bvar::LatencyRecorder
g_remote_fetch_tablet_rowsets_latency("remote_fetch_rowsets");
+
+[[nodiscard]] Result<std::vector<Version>>
BaseTablet::capture_consistent_versions_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options) const {
+ std::vector<Version> version_path;
+ auto st =
+
_timestamped_version_tracker.capture_consistent_versions(version_range,
&version_path);
+ if (!st && !options.quiet) {
+ auto missed_versions =
get_missed_versions_unlocked(version_range.second);
+ if (missed_versions.empty()) {
+ LOG(WARNING) << fmt::format(
+ "version already has been merged. version_range={},
max_version={}, "
+ "tablet_id={}",
+ version_range.to_string(),
_tablet_meta->max_version().second, tablet_id());
+ return ResultError(Status::Error<VERSION_ALREADY_MERGED>(
+ "missed versions is empty, version_range={},
max_version={}, tablet_id={}",
+ version_range.to_string(),
_tablet_meta->max_version().second, tablet_id()));
+ }
+ LOG(WARNING) << fmt::format("missed version for version_range={},
tablet_id={}, st={}",
+ version_range.to_string(), tablet_id(),
st);
+ _print_missed_versions(missed_versions);
+ if (!options.skip_missing_versions) {
+ return ResultError(std::move(st));
+ }
+ LOG(WARNING) << "force skipping missing version for tablet:" <<
tablet_id();
+ }
+ DBUG_EXECUTE_IF("Tablet::capture_consistent_versions.inject_failure", {
+ auto tablet_id = dp->param<int64_t>("tablet_id", -1);
+ auto skip_by_option = dp->param<bool>("skip_by_option", false);
+ if (skip_by_option && !options.enable_fetch_rowsets_from_peers) {
+ return version_path;
+ }
+ if ((tablet_id != -1 && (tablet_id == _tablet_meta->tablet_id())) ||
tablet_id == -2) {
+ return ResultError(Status::Error<VERSION_ALREADY_MERGED>("version
already merged"));
+ }
+ });
+ return version_path;
+}
+
+[[nodiscard]] Result<CaptureRowsetResult>
BaseTablet::capture_consistent_rowsets_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options) const {
+ CaptureRowsetResult result;
+ auto& rowsets = result.rowsets;
+ auto maybe_versions = capture_consistent_versions_unlocked(version_range,
options);
+ if (maybe_versions) {
+ const auto& version_paths = maybe_versions.value();
+ rowsets.reserve(version_paths.size());
+
+ auto rowset_for_version = [&](const Version& version,
+ bool include_stale) ->
Result<RowsetSharedPtr> {
+ if (auto it = _rs_version_map.find(version); it !=
_rs_version_map.end()) {
+ return it->second;
+ } else {
+ VLOG_NOTICE << "fail to find Rowset in rs_version for version.
tablet="
+ << tablet_id() << ", version='" << version.first
<< "-"
+ << version.second;
+ }
+ if (include_stale) {
+ if (auto it = _stale_rs_version_map.find(version);
+ it != _stale_rs_version_map.end()) {
+ return it->second;
+ } else {
+ LOG(WARNING) << fmt::format(
+ "fail to find Rowset in stale_rs_version for
version. tablet={}, "
+ "version={}-{}",
+ tablet_id(), version.first, version.second);
+ }
+ }
+ return ResultError(Status::Error<CAPTURE_ROWSET_ERROR>(
+ "failed to find rowset for version={}",
version.to_string()));
+ };
+
+ for (const auto& version : version_paths) {
+ auto ret = rowset_for_version(version,
options.include_stale_rowsets);
+ if (!ret) {
+ return ResultError(std::move(ret.error()));
+ }
+
+ rowsets.push_back(std::move(ret.value()));
+ }
+ if (keys_type() == KeysType::UNIQUE_KEYS &&
enable_unique_key_merge_on_write()) {
+ result.delete_bitmap = _tablet_meta->delete_bitmap_ptr();
+ }
+ return result;
+ }
+
+ if (!config::is_cloud_mode() || !options.enable_fetch_rowsets_from_peers) {
+ return ResultError(std::move(maybe_versions.error()));
+ }
+ auto ret = _remote_capture_rowsets(version_range);
+ if (!ret) {
+ auto st = Status::Error<VERSION_ALREADY_MERGED>(
+ "version already merged, meet error during remote capturing
rowsets, "
+ "error={}, version_range={}",
+ ret.error().to_string(), version_range.to_string());
+ return ResultError(std::move(st));
+ }
+ return ret;
+}
+
+[[nodiscard]] Result<std::vector<RowSetSplits>>
BaseTablet::capture_rs_readers_unlocked(
+ const Version& version_range, const CaptureRowsetOps& options) const {
+ auto maybe_rs_list = capture_consistent_rowsets_unlocked(version_range,
options);
+ if (!maybe_rs_list) {
+ return ResultError(std::move(maybe_rs_list.error()));
+ }
+ const auto& rs_list = maybe_rs_list.value().rowsets;
+ std::vector<RowSetSplits> rs_splits;
+ rs_splits.reserve(rs_list.size());
+ for (const auto& rs : rs_list) {
+ RowsetReaderSharedPtr rs_reader;
+ auto st = rs->create_reader(&rs_reader);
+ if (!st) {
+ return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+ "failed to create reader for rowset={}, reason={}",
rs->rowset_id().to_string(),
+ st.to_string()));
+ }
+ rs_splits.emplace_back(std::move(rs_reader));
+ }
+ return rs_splits;
+}
+
+[[nodiscard]] Result<TabletReadSource> BaseTablet::capture_read_source(
+ const Version& version_range, const CaptureRowsetOps& options) {
+ std::shared_lock rdlock(get_header_lock());
+ auto maybe_result = capture_consistent_rowsets_unlocked(version_range,
options);
+ if (!maybe_result) {
+ return ResultError(std::move(maybe_result.error()));
+ }
+ auto rowsets_result = std::move(maybe_result.value());
+ TabletReadSource read_source;
+ read_source.delete_bitmap = std::move(rowsets_result.delete_bitmap);
+ const auto& rowsets = rowsets_result.rowsets;
+ read_source.rs_splits.reserve(rowsets.size());
+ for (const auto& rs : rowsets) {
+ RowsetReaderSharedPtr rs_reader;
+ auto st = rs->create_reader(&rs_reader);
+ if (!st) {
+ return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+ "failed to create reader for rowset={}, reason={}",
rs->rowset_id().to_string(),
+ st.to_string()));
+ }
+ read_source.rs_splits.emplace_back(std::move(rs_reader));
+ }
+ return read_source;
+}
+
+template <typename Fn, typename... Args>
+bool call_bthread(bthread_t& th, const bthread_attr_t* attr, Fn&& fn,
Args&&... args) {
+ auto p_wrap_fn = new auto([=] { fn(args...); });
+ auto call_back = [](void* ar) -> void* {
+ auto f = reinterpret_cast<decltype(p_wrap_fn)>(ar);
+ (*f)();
+ delete f;
+ return nullptr;
+ };
+ return bthread_start_background(&th, attr, call_back, p_wrap_fn) == 0;
+}
+
+struct GetRowsetsCntl : std::enable_shared_from_this<GetRowsetsCntl> {
+ struct RemoteGetRowsetResult {
+ std::vector<RowsetMetaSharedPtr> rowsets;
+ std::unique_ptr<DeleteBitmap> delete_bitmap;
+ };
+
+ Status start_req_bg() {
+ task_cnt = req_addrs.size();
+ for (const auto& [ip, port] : req_addrs) {
+ bthread_t tid;
+ bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+
+ bool succ = call_bthread(tid, &attr, [self = shared_from_this(),
&ip, port]() {
+ LOG(INFO) << "start to get tablet rowsets from peer BE, ip="
<< ip;
+ Defer defer_log {[&ip, port]() {
+ LOG(INFO) << "finish to get rowsets from peer BE, ip=" <<
ip
+ << ", port=" << port;
+ }};
+
+ PGetTabletRowsetsRequest req;
+ req.set_tablet_id(self->tablet_id);
+ req.set_version_start(self->version_range.first);
+ req.set_version_end(self->version_range.second);
+ if (self->delete_bitmap_keys.has_value()) {
+
req.mutable_delete_bitmap_keys()->CopyFrom(self->delete_bitmap_keys.value());
+ }
+ brpc::Controller cntl;
+ cntl.set_timeout_ms(60000);
+ cntl.set_max_retry(3);
+ PGetTabletRowsetsResponse response;
+ auto start_tm_us = MonotonicMicros();
+#ifndef BE_TEST
+ std::shared_ptr<PBackendService_Stub> stub =
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(ip, port);
+ if (stub == nullptr) {
+ self->result = ResultError(Status::InternalError(
+ "failed to fetch get_tablet_rowsets stub, ip={},
port={}", ip, port));
+ return;
+ }
+ stub->get_tablet_rowsets(&cntl, &req, &response, nullptr);
+#else
+ TEST_SYNC_POINT_CALLBACK("get_tablet_rowsets", &response);
+#endif
+ g_remote_fetch_tablet_rowsets_single_request_latency
+ << MonotonicMicros() - start_tm_us;
+
+ std::unique_lock l(self->butex);
+ if (self->done) {
+ return;
+ }
+ --self->task_cnt;
+ auto resp_st = Status::create(response.status());
+ DBUG_EXECUTE_IF("GetRowsetCntl::start_req_bg.inject_failure",
+ { resp_st = Status::InternalError("inject
error"); });
+ if (cntl.Failed() || !resp_st) {
+ if (self->task_cnt != 0) {
+ return;
+ }
+ std::stringstream reason;
+ reason << "failed to get rowsets from all replicas,
tablet_id="
+ << self->tablet_id;
+ if (cntl.Failed()) {
+ reason << ", reason=[" << cntl.ErrorCode() << "] " <<
cntl.ErrorText();
+ } else {
+ reason << ", reason=" << resp_st.to_string();
+ }
+ self->result =
ResultError(Status::InternalError(reason.str()));
+ self->done = true;
+ self->event.signal();
+ return;
+ }
+
+ Defer done_cb {[&]() {
+ self->done = true;
+ self->event.signal();
+ }};
+ std::vector<RowsetMetaSharedPtr> rs_metas;
+ for (auto&& rs_pb : response.rowsets()) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ if (!rs_meta->init_from_pb(rs_pb)) {
+ self->result =
+ ResultError(Status::InternalError("failed to
init rowset from pb"));
+ return;
+ }
+ rs_metas.push_back(std::move(rs_meta));
+ }
+ CaptureRowsetResult result;
+ self->result->rowsets = std::move(rs_metas);
+
+ if (response.has_delete_bitmap()) {
+ self->result->delete_bitmap =
std::make_unique<DeleteBitmap>(
+ DeleteBitmap::from_pb(response.delete_bitmap(),
self->tablet_id));
+ }
+ });
+
+ if (!succ) {
+ return Status::InternalError(
+ "failed to create bthread when request rowsets for
tablet={}", tablet_id);
+ }
+ }
+ return Status::OK();
+ }
+
+ Result<RemoteGetRowsetResult> wait_for_ret() {
+ event.wait();
+ return std::move(result);
+ }
+
+ int64_t tablet_id;
+ std::vector<std::pair<std::string, int32_t>> req_addrs;
+ Version version_range;
+ std::optional<DeleteBitmapPB> delete_bitmap_keys = std::nullopt;
+
+private:
+ size_t task_cnt;
+
+ bthread::Mutex butex;
+ bthread::CountdownEvent event {1};
+ bool done = false;
+
+ Result<RemoteGetRowsetResult> result;
+};
+
+Result<std::vector<std::pair<std::string, int32_t>>>
get_peer_replicas_addresses(
+ const int64_t tablet_id) {
+ auto* cluster_info = ExecEnv::GetInstance()->cluster_info();
+ DCHECK_NE(cluster_info, nullptr);
+ auto master_addr = cluster_info->master_fe_addr;
+ TGetTabletReplicaInfosRequest req;
+ req.tablet_ids.push_back(tablet_id);
+ TGetTabletReplicaInfosResult resp;
+ auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
+ master_addr.hostname, master_addr.port,
+ [&](FrontendServiceConnection& client) {
client->getTabletReplicaInfos(resp, req); });
+ if (!st) {
+ return ResultError(Status::InternalError(
+ "failed to get tablet replica infos, rpc error={},
tablet_id={}", st.to_string(),
+ tablet_id));
+ }
+
+ auto it = resp.tablet_replica_infos.find(tablet_id);
+ if (it == resp.tablet_replica_infos.end()) {
+ return ResultError(Status::InternalError("replicas not found,
tablet_id={}", tablet_id));
+ }
+ auto replicas = it->second;
+ auto local_host = BackendOptions::get_localhost();
+ bool include_local_host = false;
+ DBUG_EXECUTE_IF("get_peer_replicas_address.enable_local_host", {
include_local_host = true; });
+ auto ret_view =
+ replicas | std::views::filter([&local_host,
include_local_host](const auto& replica) {
+ return local_host.find(replica.host) == std::string::npos ||
include_local_host;
+ }) |
+ std::views::transform([](auto& replica) {
+ return std::make_pair(std::move(replica.host),
replica.brpc_port);
+ });
+ return std::vector(ret_view.begin(), ret_view.end());
+}
+
+Result<CaptureRowsetResult> BaseTablet::_remote_capture_rowsets(
+ const Version& version_range) const {
+ auto start_tm_us = MonotonicMicros();
+ Defer defer {
+ [&]() { g_remote_fetch_tablet_rowsets_latency << MonotonicMicros()
- start_tm_us; }};
+#ifndef BE_TEST
+ auto maybe_be_addresses = get_peer_replicas_addresses(tablet_id());
+#else
+ Result<std::vector<std::pair<std::string, int32_t>>> maybe_be_addresses;
+ TEST_SYNC_POINT_CALLBACK("get_peer_replicas_addresses",
&maybe_be_addresses);
+#endif
+
DBUG_EXECUTE_IF("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail",
+ { maybe_be_addresses =
ResultError(Status::InternalError("inject failure")); });
+ if (!maybe_be_addresses) {
+ return ResultError(std::move(maybe_be_addresses.error()));
+ }
+ auto be_addresses = std::move(maybe_be_addresses.value());
+ if (be_addresses.empty()) {
+ LOG(WARNING) << "no peers replica for tablet=" << tablet_id();
+ return ResultError(Status::InternalError("no replicas for tablet={}",
tablet_id()));
+ }
+
+ auto cntl = std::make_shared<GetRowsetsCntl>();
+ cntl->tablet_id = tablet_id();
+ cntl->req_addrs = std::move(be_addresses);
+ cntl->version_range = version_range;
+ bool is_mow = keys_type() == KeysType::UNIQUE_KEYS &&
enable_unique_key_merge_on_write();
+ CaptureRowsetResult result;
+ if (is_mow) {
+ result.delete_bitmap =
+
std::make_unique<DeleteBitmap>(_tablet_meta->delete_bitmap().snapshot());
+ DeleteBitmapPB delete_bitmap_keys;
+ auto keyset = result.delete_bitmap->delete_bitmap |
+ std::views::transform([](const auto& kv) { return
kv.first; });
+ for (const auto& key : keyset) {
+ const auto& [rs_id, seg_id, version] = key;
+ delete_bitmap_keys.mutable_rowset_ids()->Add(rs_id.to_string());
+ delete_bitmap_keys.mutable_segment_ids()->Add(seg_id);
+ delete_bitmap_keys.mutable_versions()->Add(version);
+ }
+ cntl->delete_bitmap_keys = std::move(delete_bitmap_keys);
+ }
+
+ RETURN_IF_ERROR_RESULT(cntl->start_req_bg());
+ auto maybe_meta = cntl->wait_for_ret();
+ if (!maybe_meta) {
+ auto err = Status::InternalError(
+ "tried to get rowsets from peer replicas and failed, "
+ "reason={}",
+ maybe_meta.error());
+ return ResultError(std::move(err));
+ }
+
+ auto& remote_meta = maybe_meta.value();
+ const auto& rs_metas = remote_meta.rowsets;
+ for (const auto& rs_meta : rs_metas) {
+ RowsetSharedPtr rs;
+ auto st = RowsetFactory::create_rowset(_tablet_meta->tablet_schema(),
{}, rs_meta, &rs);
+ if (!st) {
+ return ResultError(std::move(st));
+ }
+ result.rowsets.push_back(std::move(rs));
+ }
+ if (is_mow) {
+ DCHECK_NE(result.delete_bitmap, nullptr);
+ result.delete_bitmap->merge(*remote_meta.delete_bitmap);
+ }
+ return result;
+}
+
+} // namespace doris
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 427badee862..cc1879fce88 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1020,7 +1020,7 @@ Status SchemaChangeJob::_do_process_alter_tablet(const
TAlterTabletReqV2& reques
reader_context.sequence_id_idx =
reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = _base_tablet->keys_type() ==
UNIQUE_KEYS;
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
- reader_context.delete_bitmap =
&_base_tablet->tablet_meta()->delete_bitmap();
+ reader_context.delete_bitmap =
_base_tablet->tablet_meta()->delete_bitmap_ptr();
reader_context.version = Version(0, end_version);
if (!_base_tablet_schema->cluster_key_uids().empty()) {
for (const auto& uid :
_base_tablet_schema->cluster_key_uids()) {
@@ -1144,9 +1144,8 @@ Status
SchemaChangeJob::_get_versions_to_be_changed(std::vector<Version>* versio
}
*max_rowset = rowset;
- RETURN_IF_ERROR(_base_tablet->capture_consistent_versions_unlocked(
- Version(0, rowset->version().second), versions_to_be_changed,
false, false));
-
+ *versions_to_be_changed =
DORIS_TRY(_base_tablet->capture_consistent_versions_unlocked(
+ Version(0, rowset->version().second), {}));
return Status::OK();
}
@@ -1559,8 +1558,9 @@ Status
SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version)
<< "double write rowsets for version: " << alter_version + 1
<< "-" << max_version
<< " new_tablet=" << _new_tablet->tablet_id();
std::shared_lock rlock(_new_tablet->get_header_lock());
- RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked(
- {alter_version + 1, max_version}, &rowsets));
+ auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked(
+ {alter_version + 1, max_version}, CaptureRowsetOps {}));
+ rowsets = std::move(ret.rowsets);
}
for (auto rowset_ptr : rowsets) {
std::lock_guard rwlock(_new_tablet->get_rowset_update_lock());
@@ -1578,8 +1578,9 @@ Status
SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version)
LOG(INFO) << "alter table for unique with merge-on-write, calculate
delete bitmap of "
<< "incremental rowsets for version: " << max_version + 1 <<
"-"
<< new_max_version << " new_tablet=" <<
_new_tablet->tablet_id();
- RETURN_IF_ERROR(_new_tablet->capture_consistent_rowsets_unlocked(
- {max_version + 1, new_max_version}, &rowsets));
+ auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked(
+ {max_version + 1, new_max_version}, CaptureRowsetOps {}));
+ rowsets = std::move(ret.rowsets);
}
for (auto&& rowset_ptr : rowsets) {
RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet,
rowset_ptr));
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index dad9e22c057..899a4bef265 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -568,13 +568,22 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
res = check_version_continuity(consistent_rowsets);
if (res.ok() && max_cooldowned_version < version) {
// Pick consistent rowsets of remaining required
version
- res = ref_tablet->capture_consistent_rowsets_unlocked(
- {max_cooldowned_version + 1, version},
&consistent_rowsets);
+ auto ret =
ref_tablet->capture_consistent_rowsets_unlocked(
+ {max_cooldowned_version + 1, version},
CaptureRowsetOps {});
+ if (ret) {
+ consistent_rowsets = std::move(ret->rowsets);
+ } else {
+ res = std::move(ret.error());
+ }
}
} else {
- // get shortest version path
- res =
ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version),
-
&consistent_rowsets);
+ auto ret =
ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version),
+
CaptureRowsetOps {});
+ if (ret) {
+ consistent_rowsets = std::move(ret->rowsets);
+ } else {
+ res = std::move(ret.error());
+ }
}
if (!res.ok()) {
LOG(WARNING) << "fail to select versions to span. res=" <<
res;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 69791626ab8..de64b3a70cd 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -395,12 +395,14 @@ Status Tablet::revise_tablet_meta(const
std::vector<RowsetSharedPtr>& to_add,
}
if (calc_delete_bitmap_ver.first <= calc_delete_bitmap_ver.second) {
- calc_bm_status =
capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver,
-
&calc_delete_bitmap_rowsets);
- if (!calc_bm_status.ok()) {
- LOG(WARNING) << "fail to capture_consistent_rowsets, res: " <<
calc_bm_status;
+ auto ret =
capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver,
+ CaptureRowsetOps
{});
+ if (!ret) {
+ LOG(WARNING) << "fail to capture_consistent_rowsets, res: " <<
ret.error();
+ calc_bm_status = std::move(ret.error());
break;
}
+ calc_delete_bitmap_rowsets = std::move(ret->rowsets);
// FIXME(plat1ko): Use `const TabletSharedPtr&` as parameter
auto self = _engine.tablet_manager()->get_tablet(tablet_id());
CHECK(self);
@@ -455,17 +457,16 @@ Status Tablet::revise_tablet_meta(const
std::vector<RowsetSharedPtr>& to_add,
// that we can capture by version
if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
Version full_version = Version(0, max_version_unlocked());
- std::vector<RowsetSharedPtr> expected_rowsets;
- auto st = capture_consistent_rowsets_unlocked(full_version,
&expected_rowsets);
- DCHECK(st.ok()) << st;
- DCHECK_EQ(base_rowsets_for_full_clone.size(),
expected_rowsets.size());
- if (st.ok() && base_rowsets_for_full_clone.size() !=
expected_rowsets.size())
- [[unlikely]] {
+ auto ret = capture_consistent_rowsets_unlocked(full_version,
CaptureRowsetOps {});
+ DCHECK(ret) << ret.error();
+ DCHECK_EQ(base_rowsets_for_full_clone.size(), ret->rowsets.size());
+
+ if (ret && base_rowsets_for_full_clone.size() !=
ret->rowsets.size()) [[unlikely]] {
LOG(WARNING) << "full clone succeeded, but the count("
<< base_rowsets_for_full_clone.size()
<< ") of base rowsets used for delete bitmap
calculation is not match "
"expect count("
- << expected_rowsets.size() << ") we capture from
tablet meta";
+ << ret->rowsets.size() << ") we capture from
tablet meta";
}
}
}
@@ -552,7 +553,8 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>&
to_add,
tablet_id());
} else if (rs->rowset_id() != it->second->rowset_id()) {
return Status::Error<DELETE_VERSION_ERROR>(
- "try to delete version {} from {}, but rowset id
changed, delete rowset id "
+ "try to delete version {} from {}, but rowset id
changed, delete "
+ "rowset id "
"is {}, exists rowsetid is {}",
rs->version().to_string(), tablet_id(),
rs->rowset_id().to_string(),
it->second->rowset_id().to_string());
@@ -773,10 +775,9 @@ void Tablet::delete_expired_stale_rowset() {
Version test_version = Version(0, lastest_delta->end_version());
stale_version_path_map[*path_id_iter] = version_path;
- Status status =
- capture_consistent_versions_unlocked(test_version,
nullptr, false, false);
+ auto ret = capture_consistent_versions_unlocked(test_version, {});
// 1. When there is no consistent versions, we must reconstruct
the tracker.
- if (!status.ok()) {
+ if (!ret) {
// 2. fetch missing version after delete
Versions after_missed_versions =
get_missed_versions_unlocked(lastest_delta->end_version());
@@ -923,51 +924,11 @@ void Tablet::delete_expired_stale_rowset() {
{ _engine.start_delete_unused_rowset(); });
}
-Status Tablet::capture_consistent_versions_unlocked(const Version&
spec_version,
- Versions* version_path,
- bool skip_missing_version,
bool quiet) const {
- Status status =
-
_timestamped_version_tracker.capture_consistent_versions(spec_version,
version_path);
- if (!status.ok() && !quiet) {
- Versions missed_versions =
get_missed_versions_unlocked(spec_version.second);
- if (missed_versions.empty()) {
- // if version_path is null, it may be a compaction check logic.
- // so to avoid print too many logs.
- if (version_path != nullptr) {
- LOG(WARNING) << "tablet:" << tablet_id()
- << ", version already has been merged.
spec_version: " << spec_version
- << ", max_version: " << max_version_unlocked();
- }
- status = Status::Error<VERSION_ALREADY_MERGED, false>(
- "versions are already compacted, spec_version "
- "{}, max_version {}, tablet_id {}",
- spec_version.second, max_version_unlocked(), tablet_id());
- } else {
- if (version_path != nullptr) {
- LOG(WARNING) << "status:" << status << ", tablet:" <<
tablet_id()
- << ", missed version for version:" <<
spec_version;
- _print_missed_versions(missed_versions);
- if (skip_missing_version) {
- LOG(WARNING) << "force skipping missing version for
tablet:" << tablet_id();
- return Status::OK();
- }
- }
- }
- }
-
- DBUG_EXECUTE_IF("TTablet::capture_consistent_versions.inject_failure", {
- auto tablet_id = dp->param<int64_t>("tablet_id", -1);
- if (tablet_id != -1 && tablet_id == _tablet_meta->tablet_id()) {
- status = Status::Error<VERSION_ALREADY_MERGED>("version already
merged");
- }
- });
-
- return status;
-}
-
Status Tablet::check_version_integrity(const Version& version, bool quiet) {
std::shared_lock rdlock(_meta_lock);
- return capture_consistent_versions_unlocked(version, nullptr, false,
quiet);
+ [[maybe_unused]] auto _versions = DORIS_TRY(
+ capture_consistent_versions_unlocked(version, CaptureRowsetOps
{.quiet = quiet}));
+ return Status::OK();
}
bool Tablet::exceed_version_limit(int32_t limit) {
@@ -997,22 +958,12 @@ void Tablet::acquire_version_and_rowsets(
}
}
-Status Tablet::capture_consistent_rowsets_unlocked(const Version& spec_version,
-
std::vector<RowsetSharedPtr>* rowsets) const {
- std::vector<Version> version_path;
- RETURN_IF_ERROR(
- capture_consistent_versions_unlocked(spec_version, &version_path,
false, false));
- RETURN_IF_ERROR(_capture_consistent_rowsets_unlocked(version_path,
rowsets));
- return Status::OK();
-}
-
Status Tablet::capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
- const CaptureRsReaderOptions& opts) {
+ const CaptureRowsetOps& opts) {
std::shared_lock rlock(_meta_lock);
std::vector<Version> version_path;
- RETURN_IF_ERROR(capture_consistent_versions_unlocked(spec_version,
&version_path,
-
opts.skip_missing_version, false));
- RETURN_IF_ERROR(capture_rs_readers_unlocked(version_path, rs_splits));
+ *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
+ spec_version, CaptureRowsetOps {.skip_missing_versions =
opts.skip_missing_versions}));
return Status::OK();
}
@@ -2065,7 +2016,8 @@ Status Tablet::cooldown(RowsetSharedPtr rowset) {
std::unique_lock schema_change_lock(_schema_change_lock, std::try_to_lock);
if (!schema_change_lock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED>(
- "try schema_change_lock failed, schema change running or
inverted index built on "
+ "try schema_change_lock failed, schema change running or
inverted index built "
+ "on "
"this tablet={}",
tablet_id());
}
@@ -2353,7 +2305,8 @@ Status Tablet::_follow_cooldowned_data() {
}
} else if (!rs->is_local()) {
return Status::InternalError<false>(
- "cooldowned version larger than that to follow with
cooldown version {}",
+ "cooldowned version larger than that to follow with
cooldown version "
+ "{}",
cooldowned_version);
}
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 1b7e809eec3..372c141cc5c 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -182,24 +182,15 @@ public:
/// need to delete flag.
void delete_expired_stale_rowset();
- // Given spec_version, find a continuous version path and store it in
version_path.
- // If quiet is true, then only "does this path exist" is returned.
- // If skip_missing_version is true, return ok even there are missing
versions.
- Status capture_consistent_versions_unlocked(const Version& spec_version,
Versions* version_path,
- bool skip_missing_version,
bool quiet) const;
-
// if quiet is true, no error log will be printed if there are missing
versions
Status check_version_integrity(const Version& version, bool quiet = false);
bool check_version_exist(const Version& version) const;
void acquire_version_and_rowsets(
std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets)
const;
- Status capture_consistent_rowsets_unlocked(
- const Version& spec_version, std::vector<RowsetSharedPtr>*
rowsets) const override;
-
- // If opts.skip_missing_version is true, skip versions if they are missing.
+ // If skip_missing_version is true, skip versions if they are missing.
Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
- const CaptureRsReaderOptions& opts) override;
+ const CaptureRowsetOps& opts) override;
// Find the missed versions until the spec_version.
//
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 652b8bf6c25..dc589b9a992 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -1254,6 +1254,35 @@ DeleteBitmap& DeleteBitmap::operator=(DeleteBitmap&& o)
noexcept {
return *this;
}
+DeleteBitmap DeleteBitmap::from_pb(const DeleteBitmapPB& pb, int64_t
tablet_id) {
+ size_t len = pb.rowset_ids().size();
+ DCHECK_EQ(len, pb.segment_ids().size());
+ DCHECK_EQ(len, pb.versions().size());
+ DeleteBitmap delete_bitmap(tablet_id);
+ for (int32_t i = 0; i < len; ++i) {
+ RowsetId rs_id;
+ rs_id.init(pb.rowset_ids(i));
+ BitmapKey key = {rs_id, pb.segment_ids(i), pb.versions(i)};
+ delete_bitmap.delete_bitmap[key] =
+ roaring::Roaring::read(pb.segment_delete_bitmaps(i).data());
+ }
+ return delete_bitmap;
+}
+
+DeleteBitmapPB DeleteBitmap::to_pb() {
+ std::shared_lock l(lock);
+ DeleteBitmapPB ret;
+ for (const auto& [k, v] : delete_bitmap) {
+ ret.mutable_rowset_ids()->Add(std::get<0>(k).to_string());
+ ret.mutable_segment_ids()->Add(std::get<1>(k));
+ ret.mutable_versions()->Add(std::get<2>(k));
+ std::string bitmap_data(v.getSizeInBytes(), '\0');
+ v.write(bitmap_data.data());
+ ret.mutable_segment_delete_bitmaps()->Add(std::move(bitmap_data));
+ }
+ return ret;
+}
+
DeleteBitmap DeleteBitmap::snapshot() const {
std::shared_lock l(lock);
return DeleteBitmap(*this);
@@ -1711,6 +1740,22 @@ std::shared_ptr<roaring::Roaring>
DeleteBitmap::get_agg_without_cache(
return bitmap;
}
+DeleteBitmap DeleteBitmap::diffset(const std::set<BitmapKey>& key_set) const {
+ std::shared_lock l(lock);
+ auto diff_key_set_view =
+ delete_bitmap | std::ranges::views::transform([](const auto& kv) {
return kv.first; }) |
+ std::ranges::views::filter(
+ [&key_set](const auto& key) { return
!key_set.contains(key); });
+
+ DeleteBitmap dbm(_tablet_id);
+ for (const auto& key : diff_key_set_view) {
+ const auto* bitmap = get(key);
+ DCHECK_NE(bitmap, nullptr);
+ dbm.delete_bitmap[key] = *bitmap;
+ }
+ return dbm;
+}
+
std::string tablet_state_name(TabletState state) {
switch (state) {
case TABLET_NOTREADY:
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index c86d9ad553a..dcefc309a44 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -444,6 +444,10 @@ public:
DeleteBitmap(DeleteBitmap&& r) noexcept;
DeleteBitmap& operator=(DeleteBitmap&& r) noexcept;
+ static DeleteBitmap from_pb(const DeleteBitmapPB& pb, int64_t tablet_id);
+
+ DeleteBitmapPB to_pb();
+
/**
* Makes a snapshot of delete bitmap, read lock will be acquired in this
* process
@@ -608,6 +612,14 @@ public:
void set_tablet_id(int64_t tablet_id);
+ /**
+ * Calculate diffset with given `key_set`. All entries with keys contained
in this delete bitmap but not
+ * in given key_set will be added to the output delete bitmap.
+ *
+ * @return Deletebitmap containning all entries in diffset
+ */
+ DeleteBitmap diffset(const std::set<BitmapKey>& key_set) const;
+
private:
DeleteBitmap::Version _get_rowset_cache_version(const BitmapKey& bmk)
const;
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index b4c9d1aabc0..3660e709e22 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -102,16 +102,6 @@ std::string TabletReader::KeysParam::to_string() const {
return ss.str();
}
-void TabletReader::ReadSource::fill_delete_predicates() {
- DCHECK_EQ(delete_predicates.size(), 0);
- for (auto&& split : rs_splits) {
- auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
- if (rs_meta->has_delete_predicate()) {
- delete_predicates.push_back(rs_meta);
- }
- }
-}
-
TabletReader::~TabletReader() {
for (auto* pred : _col_predicates) {
delete pred;
@@ -682,7 +672,7 @@ Status TabletReader::init_reader_params_and_create_block(
reader_params->version =
Version(input_rowsets.front()->start_version(),
input_rowsets.back()->end_version());
- ReadSource read_source;
+ TabletReadSource read_source;
for (const auto& rowset : input_rowsets) {
RowsetReaderSharedPtr rs_reader;
RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
@@ -704,9 +694,6 @@ Status TabletReader::init_reader_params_and_create_block(
merge_tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
}
reader_params->tablet_schema = merge_tablet_schema;
- if (tablet->enable_unique_key_merge_on_write()) {
- reader_params->delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
- }
reader_params->return_columns.resize(read_tablet_schema->num_columns());
std::iota(reader_params->return_columns.begin(),
reader_params->return_columns.end(), 0);
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index 154a138cba9..8630c932052 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -33,6 +33,7 @@
#include "common/status.h"
#include "exprs/function_filter.h"
#include "io/io_common.h"
+#include "olap/base_tablet.h"
#include "olap/delete_handler.h"
#include "olap/filter_olap_param.h"
#include "olap/iterators.h"
@@ -91,12 +92,6 @@ class TabletReader {
};
public:
- struct ReadSource {
- std::vector<RowSetSplits> rs_splits;
- std::vector<RowsetMetaSharedPtr> delete_predicates;
- // Fill delete predicates with `rs_splits`
- void fill_delete_predicates();
- };
// Params for Reader,
// mainly include tablet, data version and fetch range.
struct ReaderParams {
@@ -117,9 +112,14 @@ public:
return BeExecVersionManager::get_newest_version();
}
- void set_read_source(ReadSource read_source) {
+ void set_read_source(TabletReadSource read_source, bool
skip_delete_bitmap = false) {
rs_splits = std::move(read_source.rs_splits);
delete_predicates = std::move(read_source.delete_predicates);
+#ifndef BE_TEST
+ if (tablet->enable_unique_key_merge_on_write() &&
!skip_delete_bitmap) {
+ delete_bitmap = std::move(read_source.delete_bitmap);
+ }
+#endif
}
BaseTabletSPtr tablet;
@@ -148,7 +148,7 @@ public:
std::vector<RowSetSplits> rs_splits;
// For unique key table with merge-on-write
- DeleteBitmap* delete_bitmap = nullptr;
+ DeleteBitmapPtr delete_bitmap = nullptr;
// return_columns is init from query schema
std::vector<ColumnId> return_columns;
diff --git a/be/src/olap/task/engine_checksum_task.cpp
b/be/src/olap/task/engine_checksum_task.cpp
index 05ecfc0401b..9dbaeabad30 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -81,13 +81,15 @@ Status EngineChecksumTask::_compute_checksum() {
vectorized::Block block;
{
std::shared_lock rdlock(tablet->get_header_lock());
- Status acquire_reader_st =
- tablet->capture_consistent_rowsets_unlocked(version,
&input_rowsets);
- if (!acquire_reader_st.ok()) {
+ auto ret = tablet->capture_consistent_rowsets_unlocked(version,
CaptureRowsetOps {});
+ if (ret) {
+ input_rowsets = std::move(ret->rowsets);
+ } else {
LOG(WARNING) << "fail to captute consistent rowsets. tablet=" <<
tablet->tablet_id()
- << "res=" << acquire_reader_st;
- return acquire_reader_st;
+ << "res=" << ret.error();
+ return std::move(ret.error());
}
+
RETURN_IF_ERROR(TabletReader::init_reader_params_and_create_block(
tablet, ReaderType::READER_CHECKSUM, input_rowsets,
&reader_params, &block));
}
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
index e7935eae55f..497837a9f6b 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -33,6 +33,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "io/fs/local_file_system.h"
+#include "olap/base_tablet.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@@ -87,8 +88,10 @@ Status EngineStorageMigrationTask::_get_versions(int64_t
start_version, int64_t*
<< ", start_version=" << start_version << ", end_version="
<< *end_version;
return Status::OK();
}
- return _tablet->capture_consistent_rowsets_unlocked(Version(start_version,
*end_version),
- consistent_rowsets);
+ auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked(
+ Version(start_version, *end_version), CaptureRowsetOps {}));
+ *consistent_rowsets = std::move(ret.rowsets);
+ return Status::OK();
}
bool EngineStorageMigrationTask::_is_timeout() {
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index a24232e50a7..d5c1be28cdc 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -719,16 +719,15 @@ Status OlapScanLocalState::prepare(RuntimeState* state) {
}
}
- CaptureRsReaderOptions opts {
- .skip_missing_version = _state->skip_missing_version(),
- .enable_prefer_cached_rowset =
- config::is_cloud_mode() ?
_state->enable_prefer_cached_rowset() : false,
- .query_freshness_tolerance_ms =
- config::is_cloud_mode() ?
_state->query_freshness_tolerance_ms() : -1,
- };
for (size_t i = 0; i < _scan_ranges.size(); i++) {
- RETURN_IF_ERROR(_tablets[i].tablet->capture_rs_readers({0,
_tablets[i].version},
-
&_read_sources[i].rs_splits, opts));
+ _read_sources[i] = DORIS_TRY(_tablets[i].tablet->capture_read_source(
+ {0, _tablets[i].version},
+ {.skip_missing_versions = _state->skip_missing_version(),
+ .enable_fetch_rowsets_from_peers =
config::enable_fetch_rowsets_from_peer_replicas,
+ .enable_prefer_cached_rowset =
+ config::is_cloud_mode() ?
_state->enable_prefer_cached_rowset() : false,
+ .query_freshness_tolerance_ms =
+ config::is_cloud_mode() ?
_state->query_freshness_tolerance_ms() : -1}));
if (!PipelineXLocalState<>::_state->skip_delete_predicate()) {
_read_sources[i].fill_delete_predicates();
}
diff --git a/be/src/pipeline/exec/olap_scan_operator.h
b/be/src/pipeline/exec/olap_scan_operator.h
index 695d84a1866..f9d7597f4f0 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -290,7 +290,7 @@ private:
RuntimeProfile::Counter* _variant_subtree_sparse_iter_count = nullptr;
std::vector<TabletWithVersion> _tablets;
- std::vector<TabletReader::ReadSource> _read_sources;
+ std::vector<TabletReadSource> _read_sources;
std::map<SlotId, vectorized::VExprContextSPtr>
_slot_id_to_virtual_column_expr;
std::map<SlotId, size_t> _slot_id_to_index_in_block;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 2ff23ff97d0..daf45179f79 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -54,6 +54,9 @@
#include <utility>
#include <vector>
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet_mgr.h"
+#include "cloud/config.h"
#include "common/config.h"
#include "common/exception.h"
#include "common/logging.h"
@@ -153,6 +156,8 @@
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit:
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads,
MetricUnit::NOUNIT);
+static bvar::LatencyRecorder
g_process_remote_fetch_rowsets_latency("process_remote_fetch_rowsets");
+
bthread_key_t btls_key;
static void thread_context_deleter(void* d) {
@@ -2318,5 +2323,68 @@ void
PInternalService::abort_refresh_dictionary(google::protobuf::RpcController*
request->version_id());
st.to_protobuf(response->mutable_status());
}
+
+void PInternalService::get_tablet_rowsets(google::protobuf::RpcController*
controller,
+ const PGetTabletRowsetsRequest*
request,
+ PGetTabletRowsetsResponse* response,
+ google::protobuf::Closure* done) {
+ DCHECK(config::is_cloud_mode());
+ auto start_time = GetMonoTimeMicros();
+ Defer defer {
+ [&]() { g_process_remote_fetch_rowsets_latency <<
GetMonoTimeMicros() - start_time; }};
+ brpc::ClosureGuard closure_guard(done);
+ LOG(INFO) << "process get tablet rowsets, request=" <<
request->ShortDebugString();
+ if (!request->has_tablet_id() || !request->has_version_start() ||
!request->has_version_end()) {
+ Status::InvalidArgument("missing params
tablet/version_start/version_end")
+ .to_protobuf(response->mutable_status());
+ return;
+ }
+ CloudStorageEngine& storage =
ExecEnv::GetInstance()->storage_engine().to_cloud();
+
+ auto maybe_tablet =
+ storage.tablet_mgr().get_tablet(request->tablet_id(), /*warmup
data*/ false,
+ /*syn_delete_bitmap*/ false,
/*delete_bitmap*/ nullptr,
+ /*local_only*/ true);
+ if (!maybe_tablet) {
+ maybe_tablet.error().to_protobuf(response->mutable_status());
+ return;
+ }
+ auto tablet = maybe_tablet.value();
+ Result<CaptureRowsetResult> ret;
+ {
+ std::shared_lock l(tablet->get_header_lock());
+ ret = tablet->capture_consistent_rowsets_unlocked(
+ {request->version_start(), request->version_end()},
+ CaptureRowsetOps {.enable_fetch_rowsets_from_peers = false});
+ }
+ if (!ret) {
+ ret.error().to_protobuf(response->mutable_status());
+ return;
+ }
+ auto rowsets = std::move(ret.value().rowsets);
+ for (const auto& rs : rowsets) {
+ RowsetMetaPB meta;
+ rs->rowset_meta()->to_rowset_pb(&meta);
+ response->mutable_rowsets()->Add(std::move(meta));
+ }
+ if (request->has_delete_bitmap_keys()) {
+ DCHECK(tablet->enable_unique_key_merge_on_write());
+ auto delete_bitmap = std::move(ret.value().delete_bitmap);
+ auto keys_pb = request->delete_bitmap_keys();
+ size_t len = keys_pb.rowset_ids().size();
+ DCHECK_EQ(len, keys_pb.segment_ids().size());
+ DCHECK_EQ(len, keys_pb.versions().size());
+ std::set<DeleteBitmap::BitmapKey> keys;
+ for (size_t i = 0; i < len; ++i) {
+ RowsetId rs_id;
+ rs_id.init(keys_pb.rowset_ids(i));
+ keys.emplace(rs_id, keys_pb.segment_ids(i), keys_pb.versions(i));
+ }
+ auto diffset = delete_bitmap->diffset(keys).to_pb();
+ *response->mutable_delete_bitmap() = std::move(diffset);
+ }
+ Status::OK().to_protobuf(response->mutable_status());
+}
+
#include "common/compile_check_avoid_end.h"
} // namespace doris
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 5ebef074b96..d73501bfc80 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -227,6 +227,11 @@ public:
PAbortRefreshDictionaryResponse* response,
google::protobuf::Closure* done) override;
+ void get_tablet_rowsets(google::protobuf::RpcController* controller,
+ const PGetTabletRowsetsRequest* request,
+ PGetTabletRowsetsResponse* response,
+ google::protobuf::Closure* done) override;
+
private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController*
controller,
const PExecPlanFragmentRequest*
request,
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp
b/be/src/vec/exec/scan/olap_scanner.cpp
index 48fbea41778..2ff4820d28e 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -65,7 +65,7 @@
namespace doris::vectorized {
#include "common/compile_check_avoid_begin.h"
-using ReadSource = TabletReader::ReadSource;
+using ReadSource = TabletReadSource;
OlapScanner::OlapScanner(pipeline::ScanLocalStateBase* parent,
OlapScanner::Params&& params)
: Scanner(params.state, parent, params.limit, params.profile),
@@ -98,7 +98,8 @@ OlapScanner::OlapScanner(pipeline::ScanLocalStateBase*
parent, OlapScanner::Para
.collection_statistics {},
.ann_topn_runtime {},
.condition_cache_digest =
parent->get_condition_cache_digest()}) {
- _tablet_reader_params.set_read_source(std::move(params.read_source));
+ _tablet_reader_params.set_read_source(std::move(params.read_source),
+ _state->skip_delete_bitmap());
_has_prepared = false;
_vector_search_params = params.state->get_vector_search_params();
}
@@ -219,19 +220,26 @@ Status OlapScanner::prepare() {
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
}
- CaptureRsReaderOptions opts {
- .skip_missing_version = _state->skip_missing_version(),
- .enable_prefer_cached_rowset =
- config::is_cloud_mode() ?
_state->enable_prefer_cached_rowset() : false,
- .query_freshness_tolerance_ms =
- config::is_cloud_mode() ?
_state->query_freshness_tolerance_ms() : -1,
- };
- auto st = tablet->capture_rs_readers(_tablet_reader_params.version,
- &read_source.rs_splits, opts);
- if (!st.ok()) {
- LOG(WARNING) << "fail to init reader.res=" << st;
- return st;
+ auto maybe_read_source = tablet->capture_read_source(
+ _tablet_reader_params.version,
+ {
+ .skip_missing_versions =
_state->skip_missing_version(),
+ .enable_fetch_rowsets_from_peers =
+
config::enable_fetch_rowsets_from_peer_replicas,
+ .enable_prefer_cached_rowset =
+ config::is_cloud_mode() ?
_state->enable_prefer_cached_rowset()
+ : false,
+ .query_freshness_tolerance_ms =
+ config::is_cloud_mode() ?
_state->query_freshness_tolerance_ms()
+ : -1,
+ });
+ if (!maybe_read_source) {
+ LOG(WARNING) << "fail to init reader. res=" <<
maybe_read_source.error();
+ return maybe_read_source.error();
}
+
+ read_source = std::move(maybe_read_source.value());
+
if (config::enable_mow_verbose_log &&
tablet->enable_unique_key_merge_on_write()) {
LOG_INFO("finish capture_rs_readers for tablet={},
query_id={}",
tablet->tablet_id(), print_id(_state->query_id()));
@@ -358,7 +366,6 @@ Status OlapScanner::_init_tablet_reader_params(
std::inserter(_tablet_reader_params.function_filters,
_tablet_reader_params.function_filters.begin()));
- auto& tablet = _tablet_reader_params.tablet;
auto& tablet_schema = _tablet_reader_params.tablet_schema;
// Merge the columns in delete predicate that not in latest schema in to
current tablet schema
for (auto& del_pred : _tablet_reader_params.delete_predicates) {
@@ -418,10 +425,6 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.use_page_cache = _state->enable_page_cache();
- if (tablet->enable_unique_key_merge_on_write() &&
!_state->skip_delete_bitmap()) {
- _tablet_reader_params.delete_bitmap =
&tablet->tablet_meta()->delete_bitmap();
- }
-
DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block",
DBUG_BLOCK);
if (!_state->skip_storage_engine_merge()) {
diff --git a/be/src/vec/exec/scan/olap_scanner.h
b/be/src/vec/exec/scan/olap_scanner.h
index 8f14889222b..27e09f29817 100644
--- a/be/src/vec/exec/scan/olap_scanner.h
+++ b/be/src/vec/exec/scan/olap_scanner.h
@@ -66,7 +66,7 @@ public:
std::vector<OlapScanRange*> key_ranges;
BaseTabletSPtr tablet;
int64_t version;
- TabletReader::ReadSource read_source;
+ TabletReadSource read_source;
int64_t limit;
bool aggregation;
};
diff --git a/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp
b/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp
index 8d4d5a37bf7..464afb9fc6c 100644
--- a/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp
+++ b/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp
@@ -130,12 +130,12 @@ public:
void check_capture_result(CloudTabletSPtr tablet, Version spec_version,
const std::vector<Version>& expected_versions) {
- std::vector<RowSetSplits> rs_splits;
- CaptureRsReaderOptions opts {.skip_missing_version = false,
- .enable_prefer_cached_rowset = true,
- .query_freshness_tolerance_ms = -1};
- auto st = tablet->capture_rs_readers(spec_version, &rs_splits, opts);
- ASSERT_TRUE(st.ok());
+ CaptureRowsetOps opts {.skip_missing_versions = false,
+ .enable_prefer_cached_rowset = true,
+ .query_freshness_tolerance_ms = -1};
+ auto res = tablet->capture_read_source(spec_version, opts);
+ ASSERT_TRUE(res.has_value());
+ std::vector<RowSetSplits> rs_splits = std::move(res.value().rs_splits);
auto dump_versions = [](const std::vector<Version>& expected_versions,
const std::vector<RowSetSplits>& splits) {
std::vector<std::string> expected_str;
@@ -801,4 +801,4 @@ TEST_F(TestQueryPreferCache, testCapture_4_1) {
check_capture_result(tablet, Version {0, 18}, expected_versions);
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp
b/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp
index 5fe5c2e51bc..1a24ea275be 100644
--- a/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp
+++ b/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp
@@ -128,12 +128,12 @@ public:
void check_capture_result(CloudTabletSPtr tablet, Version spec_version,
int64_t query_freshness_tolerance_ms,
const std::vector<Version>& expected_versions) {
- std::vector<RowSetSplits> rs_splits;
- CaptureRsReaderOptions opts {.skip_missing_version = false,
- .enable_prefer_cached_rowset = false,
- .query_freshness_tolerance_ms =
query_freshness_tolerance_ms};
- auto st = tablet->capture_rs_readers(spec_version, &rs_splits, opts);
- ASSERT_TRUE(st.ok());
+ CaptureRowsetOps opts {.skip_missing_versions = false,
+ .enable_prefer_cached_rowset = false,
+ .query_freshness_tolerance_ms =
query_freshness_tolerance_ms};
+ auto res = tablet->capture_read_source(spec_version, opts);
+ ASSERT_TRUE(res.has_value());
+ std::vector<RowSetSplits> rs_splits = std::move(res.value().rs_splits);
auto dump_versions = [](const std::vector<Version>& expected_versions,
const std::vector<RowSetSplits>& splits) {
std::vector<std::string> expected_str;
diff --git a/be/test/olap/segcompaction_mow_test.cpp
b/be/test/olap/segcompaction_mow_test.cpp
index a7e3c9fd706..1dc240d2ada 100644
--- a/be/test/olap/segcompaction_mow_test.cpp
+++ b/be/test/olap/segcompaction_mow_test.cpp
@@ -204,6 +204,7 @@ protected:
rowset_writer_context->tablet_schema = tablet_schema;
rowset_writer_context->version.first = 10;
rowset_writer_context->version.second = 10;
+ rowset_writer_context->enable_segcompaction = true;
TabletMetaSharedPtr tablet_meta = std::make_shared<TabletMeta>();
tablet_meta->_tablet_id = TABLET_ID;
@@ -238,7 +239,7 @@ protected:
std::vector<uint32_t> return_columns = {0, 1, 2};
reader_context.return_columns = &return_columns;
reader_context.stats = &_stats;
- reader_context.delete_bitmap = delete_bitmap.get();
+ reader_context.delete_bitmap = delete_bitmap;
Status s;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 7bd487e42c4..3fc7c652f58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -613,4 +613,20 @@ public class CloudReplica extends Replica {
return;
}
}
+
+ public List<Backend> getAllPrimaryBes() {
+ List<Backend> result = new ArrayList<Backend>();
+ primaryClusterToBackends.keySet().forEach(clusterId -> {
+ List<Long> backendIds = primaryClusterToBackends.get(clusterId);
+ if (backendIds == null || backendIds.isEmpty()) {
+ return;
+ }
+ Long beId = backendIds.get(0);
+ if (beId != -1) {
+ Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+ result.add(backend);
+ }
+ });
+ return result;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index fe706fb4a36..e89465d9161 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2775,29 +2775,36 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
LOG.warn("replica {} not normal", replica.getId());
continue;
}
- Backend backend;
- if (Config.isCloudMode() && request.isSetWarmUpJobId()) {
+ List<Backend> backends;
+ if (Config.isCloudMode()) {
CloudReplica cloudReplica = (CloudReplica) replica;
- // On the cloud, the PrimaryBackend of a tablet indicates
the BE where the tablet is stably located,
- // while the SecondBackend refers to a BE selected by a
new hash when the PrimaryBackend
- // is temporarily unavailable. Once the PrimaryBackend
recovers,
- // the system will switch back to using it. During the
preheating phase,
- // data needs to be synchronized downstream, which
requires a stable BE,
- // so the PrimaryBackend is used in this case.
- backend = cloudReplica.getPrimaryBackend(clusterId, true);
+ if (!request.isSetWarmUpJobId()) {
+ backends = cloudReplica.getAllPrimaryBes();
+ } else {
+ // On the cloud, the PrimaryBackend of a tablet
+ // indicates the BE where the tablet is stably located,
+ // while the SecondBackend refers to a BE selected by
a new hash when the PrimaryBackend
+ // is temporarily unavailable. Once the PrimaryBackend
recovers,
+ // the system will switch back to using it. During the
preheating phase,
+ // data needs to be synchronized downstream, which
requires a stable BE,
+ // so the PrimaryBackend is used in this case.
+ Backend backend =
cloudReplica.getPrimaryBackend(clusterId, true);
+ backends = Lists.newArrayList(backend);
+ }
} else {
- backend =
Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException());
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException());
+ backends = Lists.newArrayList(backend);
}
- if (backend != null) {
- TReplicaInfo replicaInfo = new TReplicaInfo();
- replicaInfo.setHost(backend.getHost());
- replicaInfo.setBePort(backend.getBePort());
- replicaInfo.setHttpPort(backend.getHttpPort());
- replicaInfo.setBrpcPort(backend.getBrpcPort());
- replicaInfo.setIsAlive(backend.isAlive());
- replicaInfo.setBackendId(backend.getId());
- replicaInfo.setReplicaId(replica.getId());
- replicaInfos.add(replicaInfo);
+ for (Backend backend : backends) {
+ if (backend != null) {
+ TReplicaInfo replicaInfo = new TReplicaInfo();
+ replicaInfo.setHost(backend.getHost());
+ replicaInfo.setBePort(backend.getBePort());
+ replicaInfo.setHttpPort(backend.getHttpPort());
+ replicaInfo.setBrpcPort(backend.getBrpcPort());
+ replicaInfo.setReplicaId(replica.getId());
+ replicaInfos.add(replicaInfo);
+ }
}
}
tabletReplicaInfos.put(tabletId, replicaInfos);
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 8bbc80adfbb..1ddfbcf2502 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -1066,6 +1066,21 @@ message PAbortRefreshDictionaryResponse {
optional PStatus status = 1;
}
+message PGetTabletRowsetsRequest {
+ optional int64 tablet_id = 1;
+ optional int64 version_start = 2;
+ optional int64 version_end = 3;
+
+ optional DeleteBitmapPB delete_bitmap_keys = 4;
+}
+
+message PGetTabletRowsetsResponse {
+ required PStatus status = 1;
+ repeated RowsetMetaPB rowsets = 2;
+
+ optional DeleteBitmapPB delete_bitmap = 3;
+}
+
service PBackendService {
// If #fragments of a query is < 3, use exec_plan_fragment directly.
// If #fragments of a query is >=3, use exec_plan_fragment_prepare +
exec_plan_fragment_start
@@ -1121,5 +1136,6 @@ service PBackendService {
rpc delete_dictionary(PDeleteDictionaryRequest) returns
(PDeleteDictionaryResponse);
rpc commit_refresh_dictionary(PCommitRefreshDictionaryRequest) returns
(PCommitRefreshDictionaryResponse);
rpc abort_refresh_dictionary(PAbortRefreshDictionaryRequest) returns
(PAbortRefreshDictionaryResponse);
+ rpc get_tablet_rowsets(PGetTabletRowsetsRequest) returns
(PGetTabletRowsetsResponse);
};
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out
new file mode 100644
index 00000000000..78964812ebf
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/cloud/test_cloud_version_already_merged.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 1 1 1
+2 2 2 2
+3 3 3 3
+4 4 4 4
+5 5 5 5
+
+-- !sql --
+1 1 1 1
+2 2 2 2
+3 3 3 3
+4 4 4 4
+5 5 5 5
+
diff --git a/regression-test/pipeline/p0/conf/be.conf
b/regression-test/pipeline/p0/conf/be.conf
index aa533b0f89f..4c22d3bf869 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -90,3 +90,4 @@ enable_parquet_page_index=true
enable_graceful_exit_check=true
enable_prefill_all_dbm_agg_cache_after_compaction=true
+enable_fetch_rowsets_from_peer_replicas = true
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy
new file mode 100644
index 00000000000..0dd1f92dc18
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_version_already_merged.groovy
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.util.NodeType
+
+
+suite("test_cloud_version_already_merged", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+ def tblName = "test_cloud_version_already_merged"
+ sql """ DROP TABLE IF EXISTS ${tblName} FORCE; """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tblName} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1");
+ """
+
+
+ sql "insert into ${tblName} values(1,-1,-1,-1);"
+ sql "insert into ${tblName} values(2,-2,-2,-2);"
+ sql "insert into ${tblName} values(3,-3,-3,-3);"
+ sql "insert into ${tblName} values(4,-4,-4,-4)"
+ sql "insert into ${tblName} values(5,-5,-5,-5)"
+ sql "insert into ${tblName} values(1,1,1,1);"
+ sql "insert into ${tblName} values(2,2,2,2);"
+ sql "insert into ${tblName} values(3,3,3,3);"
+ sql "insert into ${tblName} values(4,4,4,4)"
+ sql "insert into ${tblName} values(5,5,5,5)"
+
+
+
+
+ sql "sync;"
+ qt_sql "select * from ${tblName} order by k1;"
+
+
+ def backends = sql_return_maparray('show backends')
+ def tabletStats = sql_return_maparray("show tablets from ${tblName};")
+ assert tabletStats.size() == 1
+ def tabletId = tabletStats[0].TabletId
+ def tabletBackendId = tabletStats[0].BackendId
+ def tabletBackend
+ for (def be : backends) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with
backendId=${tabletBackend.BackendId}");
+
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ sleep(10000)
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure",
[tablet_id: tabletId])
+
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+
+ test {
+ sql """ SELECT * from ${tblName} ORDER BY k1 """
+ exception "version already merged, meet error during remote
capturing rowsets"
+ }
+
+
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+ try {
+ GetDebugPoint().enableDebugPoint(tabletBackend.Host,
tabletBackend.HttpPort as int, NodeType.BE,
"Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId,
skip_by_option: true])
+
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+
GetDebugPoint().enableDebugPointForAllBEs("GetRowsetCntl::start_req_bg.inject_failure");
+
+
+ test {
+ sql """ SELECT * from ${tblName} ORDER BY k1 """
+ exception "version already merged, meet error during remote
capturing rowsets"
+ }
+
+
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+
+ try {
+ GetDebugPoint().enableDebugPoint(tabletBackend.Host,
tabletBackend.HttpPort as int, NodeType.BE,
"Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId,
skip_by_option: true])
+
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+
GetDebugPoint().enableDebugPointForAllBEs("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail");
+
+
+ test {
+ sql """ SELECT * from ${tblName} ORDER BY k1 """
+ exception "version already merged, meet error during remote
capturing rowsets"
+ }
+
+
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+ try {
+ GetDebugPoint().enableDebugPoint(tabletBackend.Host,
tabletBackend.HttpPort as int, NodeType.BE,
"Tablet::capture_consistent_versions.inject_failure", [tablet_id: tabletId,
skip_by_option: true])
+
GetDebugPoint().enableDebugPointForAllBEs("get_peer_replicas_address.enable_local_host")
+
+ qt_sql """ SELECT * from ${tblName} ORDER BY k1 """
+
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]