This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9cad929e96 [Fix](rowset) When a rowset is cooled down, it is directly
deleted. This can result in data query misses in the second phase of a
two-phase query. (#21741)
9cad929e96 is described below
commit 9cad929e96ecdf5df7c60f6831e9ebe09eafb63f
Author: lihangyu <[email protected]>
AuthorDate: Thu Jul 13 11:46:12 2023 +0800
[Fix](rowset) When a rowset is cooled down, it is directly deleted. This
can result in data query misses in the second phase of a two-phase query.
(#21741)
* [Fix](rowset) When a rowset is cooled down, it is directly deleted. This
can result in data query misses in the second phase of a two-phase query.
related pr #20732
There are two reasons for moving the logic of delayed deletion from the
Tablet to the StorageEngine. The first reason is to consolidate the logic and
unify the delayed operations. The second reason is that delayed garbage
collection during queries can cause rowsets to remain in the "stale rowsets"
state, preventing the timely deletion of rowset metadata, It may cause rowset
metadata too large.
* not use unused rowsets
---
be/src/olap/storage_engine.cpp | 25 ++++++++++++++++++++++++-
be/src/olap/storage_engine.h | 10 ++++++++++
be/src/olap/tablet.cpp | 5 -----
be/src/service/internal_service.cpp | 5 +++--
be/src/vec/exec/scan/new_olap_scanner.cpp | 1 +
5 files changed, 38 insertions(+), 8 deletions(-)
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index cc37ced5b5..f573b59f60 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -903,11 +903,15 @@ void StorageEngine::start_delete_unused_rowset() {
{
std::lock_guard<std::mutex> lock(_gc_mutex);
for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
- if (it->second.use_count() == 1 && it->second->need_delete_file())
{
+ uint64_t now = UnixSeconds();
+ if (it->second.use_count() == 1 && it->second->need_delete_file()
&&
+ // We delay the GC time of this rowset since it's maybe still
needed, see #20732
+ now > it->second->delayed_expired_timestamp()) {
if (it->second->is_local()) {
unused_rowsets_copy[it->first] = it->second;
}
// remote rowset data will be reclaimed by
`remove_unused_remote_files`
+ evict_querying_rowset(it->second->rowset_id());
it = _unused_rowsets.erase(it);
} else {
++it;
@@ -1169,4 +1173,23 @@ Status
StorageEngine::get_compaction_status_json(std::string* result) {
return Status::OK();
}
+void StorageEngine::add_quering_rowset(RowsetSharedPtr rs) {
+ std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
+ _querying_rowsets.emplace(rs->rowset_id(), rs);
+}
+
+RowsetSharedPtr StorageEngine::get_quering_rowset(RowsetId rs_id) {
+ std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
+ auto it = _querying_rowsets.find(rs_id);
+ if (it != _querying_rowsets.end()) {
+ return it->second;
+ }
+ return nullptr;
+}
+
+void StorageEngine::evict_querying_rowset(RowsetId rs_id) {
+ std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
+ _querying_rowsets.erase(rs_id);
+}
+
} // namespace doris
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index d6215586ed..2bd01ba2cb 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -224,6 +224,12 @@ public:
int64_t transaction_id, bool is_recover);
int64_t get_pending_publish_min_version(int64_t tablet_id);
+ void add_quering_rowset(RowsetSharedPtr rs);
+
+ RowsetSharedPtr get_quering_rowset(RowsetId rs_id);
+
+ void evict_querying_rowset(RowsetId rs_id);
+
private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
@@ -372,6 +378,10 @@ private:
// map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we
need custom hash func
std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;
+ // Hold reference of quering rowsets
+ std::mutex _quering_rowsets_mutex;
+ std::unordered_map<RowsetId, RowsetSharedPtr, HashOfRowsetId>
_querying_rowsets;
+
// Count the memory consumption of segment compaction tasks.
std::shared_ptr<MemTracker> _segcompaction_mem_tracker;
// This mem tracker is only for tracking memory use by segment meta data
such as footer or index page.
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index eea7c66632..efaaf2fffe 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -781,11 +781,6 @@ void Tablet::delete_expired_stale_rowset() {
for (auto& timestampedVersion : to_delete_version) {
auto it =
_stale_rs_version_map.find(timestampedVersion->version());
if (it != _stale_rs_version_map.end()) {
- uint64_t now = UnixSeconds();
- if (now <= it->second->delayed_expired_timestamp()) {
- // Some rowsets gc time was delayed, ignore
- continue;
- }
// delete rowset
StorageEngine::instance()->add_unused_rowset(it->second);
_stale_rs_version_map.erase(it);
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 05d9f16cfe..745de1bb71 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1540,8 +1540,9 @@ Status PInternalServiceImpl::_multi_get(const
PMultiGetRequest& request,
if (!tablet) {
continue;
}
- BetaRowsetSharedPtr rowset =
-
std::static_pointer_cast<BetaRowset>(tablet->get_rowset(rowset_id));
+ // We ensured it's rowset is not released when init Tablet reader
param, rowset->update_delayed_expired_timestamp();
+ BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(
+ StorageEngine::instance()->get_quering_rowset(rowset_id));
if (!rowset) {
LOG(INFO) << "no such rowset " << rowset_id;
continue;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 0b631b143e..b945d42f6c 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -413,6 +413,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
UnixSeconds() +
_tablet_reader_params.runtime_state->execution_timeout() +
delayed_s;
rs_reader->rowset()->update_delayed_expired_timestamp(delayed_expired_timestamp);
+ StorageEngine::instance()->add_quering_rowset(rs_reader->rowset());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]