This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bc78d93afb2 [cloud](topn-opt) support topn 2 phase read in cloud mode
(#31548)
bc78d93afb2 is described below
commit bc78d93afb20ce3a50803a491b9e8eabf31d2e4d
Author: lihangyu <[email protected]>
AuthorDate: Thu Feb 29 18:58:57 2024 +0800
[cloud](topn-opt) support topn 2 phase read in cloud mode (#31548)
---
be/src/cloud/cloud_storage_engine.cpp | 6 +
be/src/cloud/cloud_storage_engine.h | 1 -
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/exec/rowid_fetcher.cpp | 197 +++++++++++++++++++
be/src/exec/rowid_fetcher.h | 6 +
be/src/olap/olap_server.cpp | 6 +
be/src/olap/storage_engine.cpp | 40 ++--
be/src/olap/storage_engine.h | 24 +--
be/src/runtime/descriptors.h | 2 +
be/src/service/internal_service.cpp | 214 +--------------------
be/src/service/internal_service.h | 25 ++-
be/src/vec/exec/scan/new_olap_scanner.cpp | 23 +--
.../pipeline/cloud_p0/conf/session_variables.sql | 1 -
14 files changed, 300 insertions(+), 247 deletions(-)
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 8ca0d503e74..439c767211f 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -115,6 +115,12 @@ Status CloudStorageEngine::start_bg_threads() {
[this]() { this->_sync_tablets_thread_callback(); },
&_bg_threads.emplace_back()));
LOG(INFO) << "sync tablets thread started";
+ RETURN_IF_ERROR(Thread::create(
+ "CloudStorageEngine", "evict_querying_rowset_thread",
+ [this]() { this->_evict_quring_rowset_thread_callback(); },
+ &_evict_quering_rowset_thread));
+ LOG(INFO) << "evict quering thread started";
+
// TODO(plat1ko): lease_compaction_thread
// TODO(plat1ko): check_bucket_enable_versioning_thread
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index 7ccfeaa784e..16b1cc5a1e0 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -67,7 +67,6 @@ private:
void _sync_tablets_thread_callback();
std::atomic_bool _stopped {false};
- CountDownLatch _stop_background_threads_latch {1};
std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr;
std::unique_ptr<CloudTabletMgr> _tablet_mgr;
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f7f4cc99a4b..dc3b7ae0449 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -278,6 +278,7 @@ DEFINE_mInt32(tablet_lookup_cache_stale_sweep_time_sec,
"30");
DEFINE_mInt32(point_query_row_cache_stale_sweep_time_sec, "300");
DEFINE_mInt32(disk_stat_monitor_interval, "5");
DEFINE_mInt32(unused_rowset_monitor_interval, "30");
+DEFINE_mInt32(quering_rowsets_evict_interval, "30");
DEFINE_String(storage_root_path, "${DORIS_HOME}/storage");
DEFINE_mString(broken_storage_path, "");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 6fcbecd8f71..b95969b761b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -321,6 +321,7 @@ DECLARE_mInt32(tablet_lookup_cache_stale_sweep_time_sec);
DECLARE_mInt32(point_query_row_cache_stale_sweep_time_sec);
DECLARE_mInt32(disk_stat_monitor_interval);
DECLARE_mInt32(unused_rowset_monitor_interval);
+DECLARE_mInt32(quering_rowsets_evict_interval);
DECLARE_String(storage_root_path);
DECLARE_mString(broken_storage_path);
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 6a7e21d81cc..c921be9509f 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -30,6 +30,7 @@
#include <algorithm>
#include <cstdint>
+#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
@@ -37,11 +38,19 @@
#include <vector>
#include "bthread/countdown_event.h"
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet.h"
+#include "cloud/cloud_tablet_mgr.h"
+#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
#include "common/exception.h"
#include "exec/tablet_info.h" // DorisNodesInfo
#include "olap/olap_common.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_fwd.h"
+#include "olap/tablet_manager.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "runtime/descriptors.h"
@@ -275,4 +284,192 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr&
column_row_ids,
return Status::OK();
}
+template <typename Func>
+auto scope_timer_run(Func fn, int64_t* cost) -> decltype(fn()) {
+ MonotonicStopWatch watch;
+ watch.start();
+ auto res = fn();
+ *cost += watch.elapsed_time() / 1000 / 1000;
+ return res;
+}
+
+struct IteratorKey {
+ int64_t tablet_id;
+ RowsetId rowset_id;
+ uint64_t segment_id;
+ int slot_id;
+
+ // unordered map std::equal_to
+ bool operator==(const IteratorKey& rhs) const {
+ return tablet_id == rhs.tablet_id && rowset_id == rhs.rowset_id &&
+ segment_id == rhs.segment_id && slot_id == rhs.slot_id;
+ }
+};
+
+struct HashOfIteratorKey {
+ size_t operator()(const IteratorKey& key) const {
+ size_t seed = 0;
+ seed = HashUtil::hash64(&key.tablet_id, sizeof(key.tablet_id), seed);
+ seed = HashUtil::hash64(&key.rowset_id.hi, sizeof(key.rowset_id.hi),
seed);
+ seed = HashUtil::hash64(&key.rowset_id.mi, sizeof(key.rowset_id.mi),
seed);
+ seed = HashUtil::hash64(&key.rowset_id.lo, sizeof(key.rowset_id.lo),
seed);
+ seed = HashUtil::hash64(&key.segment_id, sizeof(key.segment_id), seed);
+ seed = HashUtil::hash64(&key.slot_id, sizeof(key.slot_id), seed);
+ return seed;
+ }
+};
+
+struct IteratorItem {
+ std::unique_ptr<ColumnIterator> iterator;
+ // for holding the reference of segment to avoid use after release
+ SegmentSharedPtr segment;
+};
+
+Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
+ PMultiGetResponse* response) {
+ // read from storage engine row id by row id
+ OlapReaderStatistics stats;
+ vectorized::Block result_block;
+ int64_t acquire_tablet_ms = 0;
+ int64_t acquire_rowsets_ms = 0;
+ int64_t acquire_segments_ms = 0;
+ int64_t lookup_row_data_ms = 0;
+
+ // init desc
+ TupleDescriptor desc(request.desc());
+ std::vector<SlotDescriptor> slots;
+ slots.reserve(request.slots().size());
+ for (const auto& pslot : request.slots()) {
+ slots.push_back(SlotDescriptor(pslot));
+ desc.add_slot(&slots.back());
+ }
+
+ // init read schema
+ TabletSchema full_read_schema;
+ for (const ColumnPB& column_pb : request.column_desc()) {
+ full_read_schema.append_column(TabletColumn(column_pb));
+ }
+
+ std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey>
iterator_map;
+ // read row by row
+ for (size_t i = 0; i < request.row_locs_size(); ++i) {
+ const auto& row_loc = request.row_locs(i);
+ MonotonicStopWatch watch;
+ watch.start();
+ BaseTabletSPtr tablet = scope_timer_run(
+ [&]() {
+ auto res = ExecEnv::get_tablet(row_loc.tablet_id());
+ return !res.has_value() ? nullptr
+ :
std::dynamic_pointer_cast<BaseTablet>(res.value());
+ },
+ &acquire_tablet_ms);
+ RowsetId rowset_id;
+ rowset_id.init(row_loc.rowset_id());
+ if (!tablet) {
+ continue;
+ }
+ // We ensured it's rowset is not released when init Tablet reader
param, rowset->update_delayed_expired_timestamp();
+ BetaRowsetSharedPtr rowset =
std::static_pointer_cast<BetaRowset>(scope_timer_run(
+ [&]() {
+ return
ExecEnv::GetInstance()->storage_engine().get_quering_rowset(rowset_id);
+ },
+ &acquire_rowsets_ms));
+ if (!rowset) {
+ LOG(INFO) << "no such rowset " << rowset_id;
+ continue;
+ }
+ size_t row_size = 0;
+ Defer _defer([&]() {
+ LOG_EVERY_N(INFO, 100)
+ << "multiget_data single_row, cost(us):" <<
watch.elapsed_time() / 1000
+ << ", row_size:" << row_size;
+ *response->add_row_locs() = row_loc;
+ });
+ SegmentCacheHandle segment_cache;
+ RETURN_IF_ERROR(scope_timer_run(
+ [&]() {
+ return SegmentLoader::instance()->load_segments(rowset,
&segment_cache, true);
+ },
+ &acquire_segments_ms));
+ // find segment
+ auto it = std::find_if(segment_cache.get_segments().cbegin(),
+ segment_cache.get_segments().cend(),
+ [&row_loc](const segment_v2::SegmentSharedPtr&
seg) {
+ return seg->id() == row_loc.segment_id();
+ });
+ if (it == segment_cache.get_segments().end()) {
+ continue;
+ }
+ segment_v2::SegmentSharedPtr segment = *it;
+ GlobalRowLoacation row_location(row_loc.tablet_id(),
rowset->rowset_id(),
+ row_loc.segment_id(),
row_loc.ordinal_id());
+ // fetch by row store, more effcient way
+ if (request.fetch_row_store()) {
+ CHECK(tablet->tablet_schema()->store_row_column());
+ RowLocation loc(rowset_id, segment->id(), row_loc.ordinal_id());
+ string* value = response->add_binary_row_data();
+ RETURN_IF_ERROR(scope_timer_run(
+ [&]() {
+ return tablet->lookup_row_data({}, loc, rowset, &desc,
stats, *value);
+ },
+ &lookup_row_data_ms));
+ row_size = value->size();
+ continue;
+ }
+
+ // fetch by column store
+ if (result_block.is_empty_column()) {
+ result_block = vectorized::Block(desc.slots(),
request.row_locs().size());
+ }
+ VLOG_DEBUG << "Read row location "
+ << fmt::format("{}, {}, {}, {}", row_location.tablet_id,
+
row_location.row_location.rowset_id.to_string(),
+ row_location.row_location.segment_id,
+ row_location.row_location.row_id);
+ for (int x = 0; x < desc.slots().size(); ++x) {
+ auto row_id =
static_cast<segment_v2::rowid_t>(row_loc.ordinal_id());
+ vectorized::MutableColumnPtr column =
+ result_block.get_by_position(x).column->assume_mutable();
+ IteratorKey iterator_key {.tablet_id = tablet->tablet_id(),
+ .rowset_id = rowset_id,
+ .segment_id = row_loc.segment_id(),
+ .slot_id = desc.slots()[x]->id()};
+ IteratorItem& iterator_item = iterator_map[iterator_key];
+ if (iterator_item.segment == nullptr) {
+ // hold the reference
+ iterator_map[iterator_key].segment = segment;
+ }
+ segment = iterator_item.segment;
+ RETURN_IF_ERROR(segment->seek_and_read_by_rowid(full_read_schema,
desc.slots()[x],
+ row_id, column,
stats,
+
iterator_item.iterator));
+ }
+ }
+ // serialize block if not empty
+ if (!result_block.is_empty_column()) {
+ VLOG_DEBUG << "dump block:" << result_block.dump_data(0, 10)
+ << ", be_exec_version:" << request.be_exec_version();
+ [[maybe_unused]] size_t compressed_size = 0;
+ [[maybe_unused]] size_t uncompressed_size = 0;
+ int be_exec_version = request.has_be_exec_version() ?
request.be_exec_version() : 0;
+ RETURN_IF_ERROR(result_block.serialize(be_exec_version,
response->mutable_block(),
+ &uncompressed_size,
&compressed_size,
+
segment_v2::CompressionTypePB::LZ4));
+ }
+
+ LOG(INFO) << "Query stats: "
+ << fmt::format(
+ "hit_cached_pages:{}, total_pages_read:{},
compressed_bytes_read:{}, "
+ "io_latency:{}ns, "
+ "uncompressed_bytes_read:{},"
+ "bytes_read:{},"
+ "acquire_tablet_ms:{}, acquire_rowsets_ms:{},
acquire_segments_ms:{}, "
+ "lookup_row_data_ms:{}",
+ stats.cached_pages_num, stats.total_pages_num,
stats.compressed_bytes_read,
+ stats.io_ns, stats.uncompressed_bytes_read,
stats.bytes_read,
+ acquire_tablet_ms, acquire_rowsets_ms,
acquire_segments_ms,
+ lookup_row_data_ms);
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/exec/rowid_fetcher.h b/be/src/exec/rowid_fetcher.h
index ae57c295a5f..78184ae8feb 100644
--- a/be/src/exec/rowid_fetcher.h
+++ b/be/src/exec/rowid_fetcher.h
@@ -26,6 +26,7 @@
#include "common/status.h"
#include "exec/tablet_info.h" // DorisNodesInfo
+#include "olap/storage_engine.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
@@ -67,4 +68,9 @@ private:
FetchOption _fetch_option;
};
+class RowIdStorageReader {
+public:
+ static Status read_by_rowids(const PMultiGetRequest& request,
PMultiGetResponse* response);
+};
+
} // namespace doris
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index c4239898892..242e71e3342 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -129,6 +129,12 @@ Status StorageEngine::start_bg_threads() {
&_unused_rowset_monitor_thread));
LOG(INFO) << "unused rowset monitor thread started";
+ RETURN_IF_ERROR(Thread::create(
+ "StorageEngine", "evict_querying_rowset_thread",
+ [this]() { this->_evict_quring_rowset_thread_callback(); },
+ &_evict_quering_rowset_thread));
+ LOG(INFO) << "evict quering thread started";
+
// start thread for monitoring the snapshot and trash folder
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "garbage_sweeper_thread",
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 5e735dfb77f..0a94c264110 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -108,6 +108,7 @@ bvar::Adder<uint64_t>
unused_rowsets_counter("ununsed_rowsets_counter");
BaseStorageEngine::BaseStorageEngine(Type type, const UniqueId& backend_uid)
: _type(type),
_rowset_id_generator(std::make_unique<UniqueRowsetIdGenerator>(backend_uid)),
+ _stop_background_threads_latch(1),
_segment_meta_mem_tracker(std::make_shared<MemTracker>(
"SegmentMeta",
ExecEnv::GetInstance()->experimental_mem_tracker())) {}
@@ -149,7 +150,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_is_all_cluster_id_exist(true),
_stopped(false),
_segcompaction_mem_tracker(std::make_shared<MemTracker>("SegCompaction")),
- _stop_background_threads_latch(1),
_tablet_manager(new TabletManager(*this,
config::tablet_map_shard_size)),
_txn_manager(new TxnManager(*this, config::txn_map_shard_size,
config::txn_shard_size)),
_default_rowset_type(BETA_ROWSET),
@@ -1081,12 +1081,7 @@ void StorageEngine::start_delete_unused_rowset() {
{
std::lock_guard<std::mutex> lock(_gc_mutex);
for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
- uint64_t now = UnixSeconds();
auto&& rs = it->second;
- if (now > rs->delayed_expired_timestamp()) {
- // We delay the GC time of this rowset since it's maybe still
needed, see #20732
- evict_querying_rowset(it->second->rowset_id());
- }
if (rs.use_count() == 1 && rs->need_delete_file()) {
// remote rowset data will be reclaimed by
`remove_unused_remote_files`
if (rs->is_local()) {
@@ -1281,6 +1276,19 @@ bool BaseStorageEngine::notify_listener(std::string_view
name) {
return found;
}
+void BaseStorageEngine::_evict_quring_rowset_thread_callback() {
+ int32_t interval = config::quering_rowsets_evict_interval;
+ do {
+ _evict_querying_rowset();
+ interval = config::quering_rowsets_evict_interval;
+ if (interval <= 0) {
+ LOG(WARNING) << "quering_rowsets_evict_interval config is illegal:
" << interval
+ << ", force set to 1";
+ interval = 1;
+ }
+ } while
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
+}
+
// check whether any unused rowsets's id equal to rowset_id
bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId&
rowset_id) {
std::lock_guard<std::mutex> lock(_gc_mutex);
@@ -1396,12 +1404,12 @@ Status
StorageEngine::get_compaction_status_json(std::string* result) {
return Status::OK();
}
-void StorageEngine::add_quering_rowset(RowsetSharedPtr rs) {
+void BaseStorageEngine::add_quering_rowset(RowsetSharedPtr rs) {
std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
_querying_rowsets.emplace(rs->rowset_id(), rs);
}
-RowsetSharedPtr StorageEngine::get_quering_rowset(RowsetId rs_id) {
+RowsetSharedPtr BaseStorageEngine::get_quering_rowset(RowsetId rs_id) {
std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
auto it = _querying_rowsets.find(rs_id);
if (it != _querying_rowsets.end()) {
@@ -1410,9 +1418,19 @@ RowsetSharedPtr
StorageEngine::get_quering_rowset(RowsetId rs_id) {
return nullptr;
}
-void StorageEngine::evict_querying_rowset(RowsetId rs_id) {
- std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
- _querying_rowsets.erase(rs_id);
+void BaseStorageEngine::_evict_querying_rowset() {
+ {
+ std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
+ for (auto it = _querying_rowsets.begin(); it !=
_querying_rowsets.end();) {
+ uint64_t now = UnixSeconds();
+ // We delay the GC time of this rowset since it's maybe still
needed, see #20732
+ if (now > it->second->delayed_expired_timestamp()) {
+ it = _querying_rowsets.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
}
bool StorageEngine::add_broken_path(std::string path) {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6e04269b5f2..6ebfb9479f4 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -129,7 +129,14 @@ public:
return _segment_meta_mem_tracker;
}
+ void add_quering_rowset(RowsetSharedPtr rs);
+
+ RowsetSharedPtr get_quering_rowset(RowsetId rs_id);
+
protected:
+ void _evict_querying_rowset();
+ void _evict_quring_rowset_thread_callback();
+
int32_t _effective_cluster_id = -1;
HeartbeatFlags* _heartbeat_flags = nullptr;
@@ -140,6 +147,7 @@ protected:
std::unique_ptr<RowsetIdGenerator> _rowset_id_generator;
std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;
std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;
+ CountDownLatch _stop_background_threads_latch;
// This mem tracker is only for tracking memory use by segment meta data
such as footer or index page.
// The memory consumed by querying is tracked in segment iterator.
@@ -147,6 +155,11 @@ protected:
// is similar to `-2912341218700198079`. So, temporarily put it in
experimental type tracker.
// maybe have to use ColumnReader count as segment meta size.
std::shared_ptr<MemTracker> _segment_meta_mem_tracker;
+
+ // Hold reference of quering rowsets
+ std::mutex _quering_rowsets_mutex;
+ std::unordered_map<RowsetId, RowsetSharedPtr, HashOfRowsetId>
_querying_rowsets;
+ scoped_refptr<Thread> _evict_quering_rowset_thread;
};
class StorageEngine final : public BaseStorageEngine {
@@ -266,12 +279,6 @@ public:
int64_t transaction_id, bool is_recover);
int64_t get_pending_publish_min_version(int64_t tablet_id);
- void add_quering_rowset(RowsetSharedPtr rs);
-
- RowsetSharedPtr get_quering_rowset(RowsetId rs_id);
-
- void evict_querying_rowset(RowsetId rs_id);
-
bool add_broken_path(std::string path);
bool remove_broken_path(std::string path);
@@ -406,14 +413,9 @@ private:
PendingRowsetSet _pending_local_rowsets;
PendingRowsetSet _pending_remote_rowsets;
- // Hold reference of quering rowsets
- std::mutex _quering_rowsets_mutex;
- std::unordered_map<RowsetId, RowsetSharedPtr, HashOfRowsetId>
_querying_rowsets;
-
// Count the memory consumption of segment compaction tasks.
std::shared_ptr<MemTracker> _segcompaction_mem_tracker;
- CountDownLatch _stop_background_threads_latch;
scoped_refptr<Thread> _unused_rowset_monitor_thread;
// thread to monitor snapshot expiry
scoped_refptr<Thread> _garbage_sweeper_thread;
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index fff1ed339d5..3bc9099477c 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -121,6 +121,7 @@ private:
friend class SchemaScanner;
friend class OlapTableSchemaParam;
friend class PInternalServiceImpl;
+ friend class RowIdStorageReader;
friend class Tablet;
friend class TabletSchema;
@@ -410,6 +411,7 @@ private:
friend class SchemaScanner;
friend class OlapTableSchemaParam;
friend class PInternalServiceImpl;
+ friend class RowIdStorageReader;
friend class TabletSchema;
const TupleId _id;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 2abd1867908..185bb34b391 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -57,6 +57,7 @@
#include "common/logging.h"
#include "common/signal_handler.h"
#include "common/status.h"
+#include "exec/rowid_fetcher.h"
#include "gen_cpp/BackendService.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/internal_service.pb.h"
@@ -181,25 +182,6 @@ private:
google::protobuf::Closure* _done = nullptr;
};
-template <typename T>
-concept CanCancel = requires(T* response) { response->mutable_status(); };
-
-template <CanCancel T>
-void offer_failed(T* response, google::protobuf::Closure* done, const
FifoThreadPool& pool) {
- brpc::ClosureGuard closure_guard(done);
- response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
- response->mutable_status()->add_error_msgs("fail to offer request to the
work pool, pool=" +
- pool.get_info());
- LOG(WARNING) << "cancelled due to fail to offer request to the work pool,
pool="
- << pool.get_info();
-}
-
-template <typename T>
-void offer_failed(T* response, google::protobuf::Closure* done, const
FifoThreadPool& pool) {
- brpc::ClosureGuard closure_guard(done);
- LOG(WARNING) << "fail to offer request to the work pool, pool=" <<
pool.get_info();
-}
-
PInternalService::PInternalService(ExecEnv* exec_env)
: _exec_env(exec_env),
_heavy_work_pool(config::brpc_heavy_work_pool_threads != -1
@@ -1738,201 +1720,17 @@ void
PInternalServiceImpl::response_slave_tablet_pull_rowset(
}
}
-template <typename Func>
-auto scope_timer_run(Func fn, int64_t* cost) -> decltype(fn()) {
- MonotonicStopWatch watch;
- watch.start();
- auto res = fn();
- *cost += watch.elapsed_time() / 1000 / 1000;
- return res;
-}
-
-struct IteratorKey {
- int64_t tablet_id;
- RowsetId rowset_id;
- uint64_t segment_id;
- int slot_id;
-
- // unordered map std::equal_to
- bool operator==(const IteratorKey& rhs) const {
- return tablet_id == rhs.tablet_id && rowset_id == rhs.rowset_id &&
- segment_id == rhs.segment_id && slot_id == rhs.slot_id;
- }
-};
-
-struct HashOfIteratorKey {
- size_t operator()(const IteratorKey& key) const {
- size_t seed = 0;
- seed = HashUtil::hash64(&key.tablet_id, sizeof(key.tablet_id), seed);
- seed = HashUtil::hash64(&key.rowset_id.hi, sizeof(key.rowset_id.hi),
seed);
- seed = HashUtil::hash64(&key.rowset_id.mi, sizeof(key.rowset_id.mi),
seed);
- seed = HashUtil::hash64(&key.rowset_id.lo, sizeof(key.rowset_id.lo),
seed);
- seed = HashUtil::hash64(&key.segment_id, sizeof(key.segment_id), seed);
- seed = HashUtil::hash64(&key.slot_id, sizeof(key.slot_id), seed);
- return seed;
- }
-};
-
-struct IteratorItem {
- std::unique_ptr<ColumnIterator> iterator;
- // for holding the reference of segment to avoid use after release
- SegmentSharedPtr segment;
-};
-
-Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
- PMultiGetResponse* response) {
- OlapReaderStatistics stats;
- vectorized::Block result_block;
- int64_t acquire_tablet_ms = 0;
- int64_t acquire_rowsets_ms = 0;
- int64_t acquire_segments_ms = 0;
- int64_t lookup_row_data_ms = 0;
-
- // init desc
- TupleDescriptor desc(request.desc());
- std::vector<SlotDescriptor> slots;
- slots.reserve(request.slots().size());
- for (const auto& pslot : request.slots()) {
- slots.push_back(SlotDescriptor(pslot));
- desc.add_slot(&slots.back());
- }
-
- // init read schema
- TabletSchema full_read_schema;
- for (const ColumnPB& column_pb : request.column_desc()) {
- full_read_schema.append_column(TabletColumn(column_pb));
- }
-
- std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey>
iterator_map;
- // read row by row
- for (size_t i = 0; i < request.row_locs_size(); ++i) {
- const auto& row_loc = request.row_locs(i);
- MonotonicStopWatch watch;
- watch.start();
- TabletSharedPtr tablet = scope_timer_run(
- [&]() {
- return
_engine.tablet_manager()->get_tablet(row_loc.tablet_id(),
- true /*include
deleted*/);
- },
- &acquire_tablet_ms);
- RowsetId rowset_id;
- rowset_id.init(row_loc.rowset_id());
- if (!tablet) {
- continue;
- }
- // We ensured it's rowset is not released when init Tablet reader
param, rowset->update_delayed_expired_timestamp();
- BetaRowsetSharedPtr rowset =
std::static_pointer_cast<BetaRowset>(scope_timer_run(
- [&]() { return _engine.get_quering_rowset(rowset_id); },
&acquire_rowsets_ms));
- if (!rowset) {
- LOG(INFO) << "no such rowset " << rowset_id;
- continue;
- }
- size_t row_size = 0;
- Defer _defer([&]() {
- LOG_EVERY_N(INFO, 100)
- << "multiget_data single_row, cost(us):" <<
watch.elapsed_time() / 1000
- << ", row_size:" << row_size;
- *response->add_row_locs() = row_loc;
- });
- SegmentCacheHandle segment_cache;
- RETURN_IF_ERROR(scope_timer_run(
- [&]() {
- return SegmentLoader::instance()->load_segments(rowset,
&segment_cache, true);
- },
- &acquire_segments_ms));
- // find segment
- auto it = std::find_if(segment_cache.get_segments().cbegin(),
- segment_cache.get_segments().cend(),
- [&row_loc](const segment_v2::SegmentSharedPtr&
seg) {
- return seg->id() == row_loc.segment_id();
- });
- if (it == segment_cache.get_segments().end()) {
- continue;
- }
- segment_v2::SegmentSharedPtr segment = *it;
- GlobalRowLoacation row_location(row_loc.tablet_id(),
rowset->rowset_id(),
- row_loc.segment_id(),
row_loc.ordinal_id());
- // fetch by row store, more effcient way
- if (request.fetch_row_store()) {
- CHECK(tablet->tablet_schema()->store_row_column());
- RowLocation loc(rowset_id, segment->id(), row_loc.ordinal_id());
- string* value = response->add_binary_row_data();
- RETURN_IF_ERROR(scope_timer_run(
- [&]() {
- return tablet->lookup_row_data({}, loc, rowset, &desc,
stats, *value);
- },
- &lookup_row_data_ms));
- row_size = value->size();
- continue;
- }
-
- // fetch by column store
- if (result_block.is_empty_column()) {
- result_block = vectorized::Block(desc.slots(),
request.row_locs().size());
- }
- VLOG_DEBUG << "Read row location "
- << fmt::format("{}, {}, {}, {}", row_location.tablet_id,
-
row_location.row_location.rowset_id.to_string(),
- row_location.row_location.segment_id,
- row_location.row_location.row_id);
- for (int x = 0; x < desc.slots().size(); ++x) {
- auto row_id =
static_cast<segment_v2::rowid_t>(row_loc.ordinal_id());
- vectorized::MutableColumnPtr column =
- result_block.get_by_position(x).column->assume_mutable();
- IteratorKey iterator_key {.tablet_id = tablet->tablet_id(),
- .rowset_id = rowset_id,
- .segment_id = row_loc.segment_id(),
- .slot_id = desc.slots()[x]->id()};
- IteratorItem& iterator_item = iterator_map[iterator_key];
- if (iterator_item.segment == nullptr) {
- // hold the reference
- iterator_map[iterator_key].segment = segment;
- }
- segment = iterator_item.segment;
- RETURN_IF_ERROR(segment->seek_and_read_by_rowid(full_read_schema,
desc.slots()[x],
- row_id, column,
stats,
-
iterator_item.iterator));
- }
- }
- // serialize block if not empty
- if (!result_block.is_empty_column()) {
- VLOG_DEBUG << "dump block:" << result_block.dump_data(0, 10)
- << ", be_exec_version:" << request.be_exec_version();
- [[maybe_unused]] size_t compressed_size = 0;
- [[maybe_unused]] size_t uncompressed_size = 0;
- int be_exec_version = request.has_be_exec_version() ?
request.be_exec_version() : 0;
- RETURN_IF_ERROR(result_block.serialize(be_exec_version,
response->mutable_block(),
- &uncompressed_size,
&compressed_size,
-
segment_v2::CompressionTypePB::LZ4));
- }
-
- LOG(INFO) << "Query stats: "
- << fmt::format(
- "hit_cached_pages:{}, total_pages_read:{},
compressed_bytes_read:{}, "
- "io_latency:{}ns, "
- "uncompressed_bytes_read:{},"
- "bytes_read:{},"
- "acquire_tablet_ms:{}, acquire_rowsets_ms:{},
acquire_segments_ms:{}, "
- "lookup_row_data_ms:{}",
- stats.cached_pages_num, stats.total_pages_num,
stats.compressed_bytes_read,
- stats.io_ns, stats.uncompressed_bytes_read,
stats.bytes_read,
- acquire_tablet_ms, acquire_rowsets_ms,
acquire_segments_ms,
- lookup_row_data_ms);
- return Status::OK();
-}
-
-void PInternalServiceImpl::multiget_data(google::protobuf::RpcController*
controller,
- const PMultiGetRequest* request,
- PMultiGetResponse* response,
- google::protobuf::Closure* done) {
- bool ret = _light_work_pool.try_offer([request, response, done, this]() {
+void PInternalService::multiget_data(google::protobuf::RpcController*
controller,
+ const PMultiGetRequest* request,
PMultiGetResponse* response,
+ google::protobuf::Closure* done) {
+ bool ret = _light_work_pool.try_offer([request, response, done]() {
signal::set_signal_task_id(request->query_id());
// multi get data by rowid
MonotonicStopWatch watch;
watch.start();
brpc::ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(0);
- Status st = _multi_get(*request, response);
+ Status st = RowIdStorageReader::read_by_rowids(*request, response);
st.to_protobuf(response->mutable_status());
LOG(INFO) << "multiget_data finished, cost(us):" <<
watch.elapsed_time() / 1000;
});
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index b9d3432982c..1cf2b5ae42f 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -17,6 +17,7 @@
#pragma once
+#include <brpc/closure_guard.h>
#include <gen_cpp/internal_service.pb.h>
#include <string>
@@ -38,6 +39,25 @@ class PHandShakeResponse;
class LoadStreamMgr;
class RuntimeState;
+template <typename T>
+concept CanCancel = requires(T* response) { response->mutable_status(); };
+
+template <typename T>
+void offer_failed(T* response, google::protobuf::Closure* done, const
FifoThreadPool& pool) {
+ brpc::ClosureGuard closure_guard(done);
+ LOG(WARNING) << "fail to offer request to the work pool, pool=" <<
pool.get_info();
+}
+
+template <CanCancel T>
+void offer_failed(T* response, google::protobuf::Closure* done, const
FifoThreadPool& pool) {
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to the
work pool, pool=" +
+ pool.get_info());
+ LOG(WARNING) << "cancelled due to fail to offer request to the work pool,
pool="
+ << pool.get_info();
+}
+
class PInternalService : public PBackendService {
public:
PInternalService(ExecEnv* exec_env);
@@ -183,6 +203,9 @@ public:
PGetWalQueueSizeResponse* response,
google::protobuf::Closure* done) override;
+ void multiget_data(google::protobuf::RpcController* controller, const
PMultiGetRequest* request,
+ PMultiGetResponse* response, google::protobuf::Closure*
done) override;
+
private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController*
controller,
const PExecPlanFragmentRequest*
request,
@@ -233,8 +256,6 @@ public:
const PTabletWriteSlaveDoneRequest*
request,
PTabletWriteSlaveDoneResult*
response,
google::protobuf::Closure* done)
override;
- void multiget_data(google::protobuf::RpcController* controller, const
PMultiGetRequest* request,
- PMultiGetResponse* response, google::protobuf::Closure*
done) override;
void tablet_fetch_data(google::protobuf::RpcController* controller,
const PTabletKeyLookupRequest* request,
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 920e25f0904..e8353f9544d 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -395,19 +395,16 @@ Status NewOlapScanner::_init_tablet_reader_params(
// If this is a Two-Phase read query, and we need to delay the release of
Rowset
// by rowset->update_delayed_expired_timestamp().This could expand the
lifespan of Rowset
- // TODO(plat1ko): CloudStorageEngine
- if (!config::is_cloud_mode()) {
- if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) {
- constexpr static int delayed_s = 60;
- for (auto rs_reader : _tablet_reader_params.rs_splits) {
- uint64_t delayed_expired_timestamp =
- UnixSeconds() +
_tablet_reader_params.runtime_state->execution_timeout() +
- delayed_s;
-
rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp(
- delayed_expired_timestamp);
-
ExecEnv::GetInstance()->storage_engine().to_local().add_quering_rowset(
- rs_reader.rs_reader->rowset());
- }
+ if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) {
+ constexpr static int delayed_s = 60;
+ for (auto rs_reader : _tablet_reader_params.rs_splits) {
+ uint64_t delayed_expired_timestamp =
+ UnixSeconds() +
_tablet_reader_params.runtime_state->execution_timeout() +
+ delayed_s;
+ rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp(
+ delayed_expired_timestamp);
+ ExecEnv::GetInstance()->storage_engine().add_quering_rowset(
+ rs_reader.rs_reader->rowset());
}
}
diff --git a/regression-test/pipeline/cloud_p0/conf/session_variables.sql
b/regression-test/pipeline/cloud_p0/conf/session_variables.sql
index 8885476ee06..96a849bc1ad 100644
--- a/regression-test/pipeline/cloud_p0/conf/session_variables.sql
+++ b/regression-test/pipeline/cloud_p0/conf/session_variables.sql
@@ -3,4 +3,3 @@ set global insert_visible_timeout_ms=60000;
set global enable_auto_analyze=false;
set global enable_audit_plugin=true;
set global enable_memtable_on_sink_node=false;
-set global enable_two_phase_read_opt = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]