This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d8f80986f1e [feature](merge-cloud) Sync cloud tablet rowsets before
scan (#30639)
d8f80986f1e is described below
commit d8f80986f1e56f16a2e0b226af5c77cb6784a6da
Author: walter <[email protected]>
AuthorDate: Wed Jan 31 23:01:48 2024 +0800
[feature](merge-cloud) Sync cloud tablet rowsets before scan (#30639)
---
be/src/cloud/cloud_tablet.cpp | 18 ++++++++++
be/src/cloud/cloud_tablet.h | 3 ++
be/src/olap/base_tablet.cpp | 31 +++++++++++++++++
be/src/olap/base_tablet.h | 11 ++++++
be/src/olap/full_compaction.cpp | 2 +-
be/src/olap/parallel_scanner_builder.cpp | 2 +-
be/src/olap/parallel_scanner_builder.h | 6 ----
be/src/olap/schema_change.cpp | 8 ++---
be/src/olap/snapshot_manager.cpp | 6 ++--
be/src/olap/tablet.cpp | 39 +++-------------------
be/src/olap/tablet.h | 7 ++--
be/src/olap/task/engine_checksum_task.cpp | 3 +-
be/src/olap/task/engine_storage_migration_task.cpp | 6 ++--
be/src/vec/exec/scan/new_olap_scan_node.cpp | 26 ++++++++++-----
14 files changed, 102 insertions(+), 66 deletions(-)
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 24c924fb6bc..6c0b50807c6 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -31,6 +31,7 @@
#include "olap/olap_define.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
@@ -48,6 +49,23 @@ bool CloudTablet::exceed_version_limit(int32_t limit) {
return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
}
+Status CloudTablet::capture_consistent_rowsets_unlocked(
+ const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets)
const {
+ Versions version_path;
+ auto st =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
&version_path);
+ if (!st.ok()) {
+ // Check no missed versions or req version is merged
+ auto missed_versions = get_missed_versions(spec_version.second);
+ if (missed_versions.empty()) {
+ st.set_code(VERSION_ALREADY_MERGED); // Reset error code
+ }
+ st.append(" tablet_id=" + std::to_string(tablet_id()));
+ return st;
+ }
+ VLOG_DEBUG << "capture consitent versions: " << version_path;
+ return _capture_consistent_rowsets_unlocked(version_path, rowsets);
+}
+
Status CloudTablet::capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) {
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index e057d23fd51..10e13904961 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -37,6 +37,9 @@ public:
Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) override;
+ Status capture_consistent_rowsets_unlocked(
+ const Version& spec_version, std::vector<RowsetSharedPtr>*
rowsets) const override;
+
size_t tablet_footprint() override {
return _approximate_data_size.load(std::memory_order_relaxed);
}
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 2e82740887d..83c6bb36fbf 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -1028,4 +1028,35 @@ void BaseTablet::_rowset_ids_difference(const
RowsetIdUnorderedSet& cur,
}
}
+Status BaseTablet::_capture_consistent_rowsets_unlocked(
+ const std::vector<Version>& version_path,
std::vector<RowsetSharedPtr>* rowsets) const {
+ DCHECK(rowsets != nullptr);
+ rowsets->reserve(version_path.size());
+ for (const auto& version : version_path) {
+ bool is_find = false;
+ do {
+ auto it = _rs_version_map.find(version);
+ if (it != _rs_version_map.end()) {
+ is_find = true;
+ rowsets->push_back(it->second);
+ break;
+ }
+
+ auto it_expired = _stale_rs_version_map.find(version);
+ if (it_expired != _stale_rs_version_map.end()) {
+ is_find = true;
+ rowsets->push_back(it_expired->second);
+ break;
+ }
+ } while (false);
+
+ if (!is_find) {
+ return Status::Error<CAPTURE_ROWSET_ERROR>(
+ "fail to find Rowset for version. tablet={}, version={}",
tablet_id(),
+ version.to_string());
+ }
+ }
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 0e3501b2730..5152f0a541d 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -35,6 +35,11 @@ class RowsetWriter;
class CalcDeleteBitmapToken;
class SegmentCacheHandle;
+struct TabletWithVersion {
+ BaseTabletSPtr tablet;
+ int64_t version;
+};
+
// Base class for all tablet classes
class BaseTablet {
public:
@@ -82,6 +87,9 @@ public:
virtual Result<std::unique_ptr<RowsetWriter>>
create_rowset_writer(RowsetWriterContext& context,
bool
vertical) = 0;
+ virtual Status capture_consistent_rowsets_unlocked(
+ const Version& spec_version, std::vector<RowsetSharedPtr>*
rowsets) const = 0;
+
virtual Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) = 0;
@@ -208,6 +216,9 @@ protected:
const RowsetIdUnorderedSet& pre,
RowsetIdUnorderedSet* to_add,
RowsetIdUnorderedSet* to_del);
+ Status _capture_consistent_rowsets_unlocked(const std::vector<Version>&
version_path,
+ std::vector<RowsetSharedPtr>*
rowsets) const;
+
void sort_block(vectorized::Block& in_block, vectorized::Block&
output_block);
mutable std::shared_mutex _meta_lock;
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 9e8525f6168..f98ac2a24e7 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -150,7 +150,7 @@ Status
FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedP
int64_t max_version = _tablet->max_version().second;
DCHECK(max_version >= rowset->version().second);
if (max_version > rowset->version().second) {
- RETURN_IF_ERROR(_tablet->capture_consistent_rowsets(
+ RETURN_IF_ERROR(_tablet->capture_consistent_rowsets_unlocked(
{rowset->version().second + 1, max_version}, &tmp_rowsets));
}
diff --git a/be/src/olap/parallel_scanner_builder.cpp
b/be/src/olap/parallel_scanner_builder.cpp
index d25bcdc7a2f..e7f51881288 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -170,7 +170,7 @@ Status ParallelScannerBuilder<ParentType>::_load() {
auto& rowsets = _all_rowsets[tablet_id];
{
std::shared_lock read_lock(tablet->get_header_lock());
- RETURN_IF_ERROR(tablet->capture_consistent_rowsets({0, version},
&rowsets));
+ RETURN_IF_ERROR(tablet->capture_consistent_rowsets_unlocked({0,
version}, &rowsets));
}
for (auto& rowset : rowsets) {
diff --git a/be/src/olap/parallel_scanner_builder.h
b/be/src/olap/parallel_scanner_builder.h
index b9d659abc27..8b23ce27735 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -32,14 +32,8 @@ namespace vectorized {
class VScanner;
}
-using TabletSPtr = std::shared_ptr<Tablet>;
using VScannerSPtr = std::shared_ptr<vectorized::VScanner>;
-struct TabletWithVersion {
- TabletSPtr tablet;
- int64_t version;
-};
-
template <typename ParentType>
class ParallelScannerBuilder {
public:
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 320f6a3e645..1543a6dfd5d 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1375,8 +1375,8 @@ Status
SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr ne
<< "double write rowsets for version: " << alter_version + 1
<< "-" << max_version
<< " new_tablet=" << new_tablet->tablet_id();
std::shared_lock<std::shared_mutex>
rlock(new_tablet->get_header_lock());
- RETURN_IF_ERROR(
- new_tablet->capture_consistent_rowsets({alter_version + 1,
max_version}, &rowsets));
+ RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets_unlocked(
+ {alter_version + 1, max_version}, &rowsets));
}
for (auto rowset_ptr : rowsets) {
std::lock_guard<std::mutex>
rwlock(new_tablet->get_rowset_update_lock());
@@ -1394,8 +1394,8 @@ Status
SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr ne
LOG(INFO) << "alter table for unique with merge-on-write, calculate
delete bitmap of "
<< "incremental rowsets for version: " << max_version + 1 <<
"-"
<< new_max_version << " new_tablet=" <<
new_tablet->tablet_id();
- RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets({max_version +
1, new_max_version},
- &rowsets));
+ RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets_unlocked(
+ {max_version + 1, new_max_version}, &rowsets));
}
for (auto&& rowset_ptr : rowsets) {
RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(new_tablet,
rowset_ptr));
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 53920b7692a..f8235a83af5 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -516,13 +516,13 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
res = check_version_continuity(consistent_rowsets);
if (res.ok() && max_cooldowned_version < version) {
// Pick consistent rowsets of remaining required
version
- res = ref_tablet->capture_consistent_rowsets(
+ res = ref_tablet->capture_consistent_rowsets_unlocked(
{max_cooldowned_version + 1, version},
&consistent_rowsets);
}
} else {
// get shortest version path
- res = ref_tablet->capture_consistent_rowsets(Version(0,
version),
-
&consistent_rowsets);
+ res =
ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version),
+
&consistent_rowsets);
}
if (!res.ok()) {
LOG(WARNING) << "fail to select versions to span. res=" <<
res;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 4eac632f0f0..9730c149f74 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -391,8 +391,8 @@ Status Tablet::revise_tablet_meta(const
std::vector<RowsetSharedPtr>& to_add,
}
if (calc_delete_bitmap_ver.first <= calc_delete_bitmap_ver.second) {
- Status res =
- capture_consistent_rowsets(calc_delete_bitmap_ver,
&calc_delete_bitmap_rowsets);
+ Status res =
capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver,
+
&calc_delete_bitmap_rowsets);
// Because the data in memory has been changed, can't return an
error.
CHECK(res.ok()) << "fail to capture_consistent_rowsets, res: " <<
res;
@@ -858,8 +858,8 @@ void Tablet::acquire_version_and_rowsets(
}
}
-Status Tablet::capture_consistent_rowsets(const Version& spec_version,
- std::vector<RowsetSharedPtr>*
rowsets) const {
+Status Tablet::capture_consistent_rowsets_unlocked(const Version& spec_version,
+
std::vector<RowsetSharedPtr>* rowsets) const {
std::vector<Version> version_path;
RETURN_IF_ERROR(
capture_consistent_versions_unlocked(spec_version, &version_path,
false, false));
@@ -867,37 +867,6 @@ Status Tablet::capture_consistent_rowsets(const Version&
spec_version,
return Status::OK();
}
-Status Tablet::_capture_consistent_rowsets_unlocked(const
std::vector<Version>& version_path,
-
std::vector<RowsetSharedPtr>* rowsets) const {
- DCHECK(rowsets != nullptr);
- rowsets->reserve(version_path.size());
- for (auto& version : version_path) {
- bool is_find = false;
- do {
- auto it = _rs_version_map.find(version);
- if (it != _rs_version_map.end()) {
- is_find = true;
- rowsets->push_back(it->second);
- break;
- }
-
- auto it_expired = _stale_rs_version_map.find(version);
- if (it_expired != _stale_rs_version_map.end()) {
- is_find = true;
- rowsets->push_back(it_expired->second);
- break;
- }
- } while (false);
-
- if (!is_find) {
- return Status::Error<CAPTURE_ROWSET_ERROR>(
- "fail to find Rowset for version. tablet={}, version={}",
tablet_id(),
- version.to_string());
- }
- }
- return Status::OK();
-}
-
Status Tablet::capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) {
std::shared_lock rlock(_meta_lock);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index ddc1c276fea..7b993d1af32 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -168,8 +168,9 @@ public:
void acquire_version_and_rowsets(
std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets)
const;
- Status capture_consistent_rowsets(const Version& spec_version,
- std::vector<RowsetSharedPtr>* rowsets)
const;
+ Status capture_consistent_rowsets_unlocked(
+ const Version& spec_version, std::vector<RowsetSharedPtr>*
rowsets) const override;
+
// If skip_missing_version is true, skip versions if they are missing.
Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) override;
@@ -470,8 +471,6 @@ private:
/// Delete stale rowset by version. This method not only delete the
version in expired rowset map,
/// but also delete the version in rowset meta vector.
void _delete_stale_rowset_by_version(const Version& version);
- Status _capture_consistent_rowsets_unlocked(const std::vector<Version>&
version_path,
- std::vector<RowsetSharedPtr>*
rowsets) const;
uint32_t _calc_cumulative_compaction_score(
std::shared_ptr<CumulativeCompactionPolicy>
cumulative_compaction_policy);
diff --git a/be/src/olap/task/engine_checksum_task.cpp
b/be/src/olap/task/engine_checksum_task.cpp
index 4099c378377..67b90dfc27b 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -81,7 +81,8 @@ Status EngineChecksumTask::_compute_checksum() {
vectorized::Block block;
{
std::shared_lock rdlock(tablet->get_header_lock());
- Status acquire_reader_st = tablet->capture_consistent_rowsets(version,
&input_rowsets);
+ Status acquire_reader_st =
+ tablet->capture_consistent_rowsets_unlocked(version,
&input_rowsets);
if (!acquire_reader_st.ok()) {
LOG(WARNING) << "fail to captute consistent rowsets. tablet=" <<
tablet->tablet_id()
<< "res=" << acquire_reader_st;
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
index e778a0b8f8d..acee9d2a834 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -84,8 +84,8 @@ Status EngineStorageMigrationTask::_get_versions(int32_t
start_version, int32_t*
<< ", start_version=" << start_version << ", end_version="
<< *end_version;
return Status::OK();
}
- return _tablet->capture_consistent_rowsets(Version(start_version,
*end_version),
- consistent_rowsets);
+ return _tablet->capture_consistent_rowsets_unlocked(Version(start_version,
*end_version),
+ consistent_rowsets);
}
bool EngineStorageMigrationTask::_is_timeout() {
@@ -93,7 +93,7 @@ bool EngineStorageMigrationTask::_is_timeout() {
int64_t timeout = std::max<int64_t>(config::migration_task_timeout_secs,
_tablet->tablet_local_size() >> 20);
if (time_elapsed > timeout) {
- LOG(WARNING) << "migration failed due to timeout, time_eplapsed=" <<
time_elapsed
+ LOG(WARNING) << "migration failed due to timeout, time_elapsed=" <<
time_elapsed
<< ", tablet=" << _tablet->tablet_id();
return true;
}
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index dc82ebd9e81..82617210230 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -34,7 +34,9 @@
#include <variant>
#include <vector>
-#include "common/config.h"
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_tablet.h"
+#include "cloud/config.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
@@ -494,24 +496,32 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
bool is_dup_mow_key = false;
size_t segment_count = 0;
std::vector<TabletReader::ReadSource> tablets_read_source;
- tablets_read_source.reserve(_scan_ranges.size());
std::vector<std::vector<size_t>> tablet_rs_seg_count;
+ std::vector<TabletWithVersion> tablets_to_scan;
+
+ tablets_read_source.reserve(_scan_ranges.size());
tablet_rs_seg_count.reserve(_scan_ranges.size());
- std::vector<std::pair<BaseTabletSPtr, int64_t /* version */>>
tablets_to_scan;
tablets_to_scan.reserve(_scan_ranges.size());
- std::vector<TabletWithVersion> tablets;
-
for (auto&& scan_range : _scan_ranges) {
auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
int64_t version = 0;
std::from_chars(scan_range->version.data(),
scan_range->version.data() +
scan_range->version.size(), version);
- tablets.emplace_back(
- TabletWithVersion {std::dynamic_pointer_cast<Tablet>(tablet),
version});
tablets_to_scan.emplace_back(std::move(tablet), version);
}
+ if (config::is_cloud_mode()) {
+ std::vector<std::function<Status()>> tasks;
+ tasks.reserve(_scan_ranges.size());
+ for (auto&& [tablet, version] : tablets_to_scan) {
+ tasks.emplace_back([tablet, version]() {
+ return
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
+ });
+ }
+ RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
+ }
+
bool enable_parallel_scan = _state->enable_parallel_scan();
// Split tablet segment by scanner, only use in pipeline in duplicate key
@@ -564,7 +574,7 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
}
ParallelScannerBuilder<NewOlapScanNode> scanner_builder(
- this, tablets, _scanner_profile, key_ranges, _state,
_limit_per_scanner,
+ this, tablets_to_scan, _scanner_profile, key_ranges, _state,
_limit_per_scanner,
is_dup_mow_key, _olap_scan_node.is_preaggregation);
int max_scanners_count = _state->parallel_scan_max_scanners_count();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]