This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 61108154d7b branch-4.0: [opt](variant) add column cache for variant
sparse column #56159 (#56543)
61108154d7b is described below
commit 61108154d7b046d6013ef71065e29cfc13553584
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Sep 28 20:02:43 2025 +0800
branch-4.0: [opt](variant) add column cache for variant sparse column
#56159 (#56543)
Cherry-picked from #56159
Co-authored-by: Sun Chenyang <[email protected]>
---
be/src/olap/field.h | 4 +
be/src/olap/iterators.h | 4 -
be/src/olap/rowset/segment_v2/column_reader.h | 1 +
be/src/olap/rowset/segment_v2/segment.cpp | 21 +++-
be/src/olap/rowset/segment_v2/segment.h | 10 +-
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 17 ++-
be/src/olap/rowset/segment_v2/segment_iterator.h | 3 +
.../variant/sparse_column_extract_iterator.h | 84 +++++--------
.../variant/sparse_column_merge_iterator.cpp | 9 +-
.../variant/sparse_column_merge_iterator.h | 18 ++-
.../segment_v2/variant/variant_column_reader.cpp | 67 +++++++----
.../segment_v2/variant/variant_column_reader.h | 132 +++++++++++++++++++--
.../variant_column_writer_reader_test.cpp | 24 ++--
13 files changed, 256 insertions(+), 138 deletions(-)
diff --git a/be/src/olap/field.h b/be/src/olap/field.h
index 69b3ac02dec..75adf75ed82 100644
--- a/be/src/olap/field.h
+++ b/be/src/olap/field.h
@@ -212,6 +212,10 @@ public:
int32_t get_scale() const { return _scale; }
const TabletColumn& get_desc() const { return _desc; }
+ int32_t get_unique_id() const {
+ return is_extracted_column() ? parent_unique_id() : unique_id();
+ }
+
protected:
TypeInfoPtr _type_info;
TabletColumn _desc;
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index f2aa423f72b..5d2b319dd23 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -129,10 +129,6 @@ public:
std::shared_ptr<vectorized::ScoreRuntime> score_runtime;
CollectionStatisticsPtr collection_statistics;
-
- // Cache for sparse column data to avoid redundant reads
- // col_unique_id -> cached column_ptr
- std::unordered_map<int32_t, vectorized::ColumnPtr> sparse_column_cache;
};
struct CompactionSampleInfo {
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h
b/be/src/olap/rowset/segment_v2/column_reader.h
index ba0c8804ebd..91aadffbfce 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -120,6 +120,7 @@ class FileColumnIterator;
using ColumnIteratorUPtr = std::unique_ptr<ColumnIterator>;
using OffsetFileColumnIteratorUPtr = std::unique_ptr<OffsetFileColumnIterator>;
using FileColumnIteratorUPtr = std::unique_ptr<FileColumnIterator>;
+using ColumnIteratorSPtr = std::shared_ptr<ColumnIterator>;
// There will be concurrent users to read the same column. So
// we should do our best to reduce resource usage through share
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index 9334d75fb16..0675c357ccb 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -725,7 +725,9 @@ Status Segment::new_default_iterator(const TabletColumn&
tablet_column,
// but they are not the same column
Status Segment::new_column_iterator(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
- const StorageReadOptions* opt) {
+ const StorageReadOptions* opt,
+ const std::unordered_map<int32_t,
PathToSparseColumnCacheUPtr>*
+ variant_sparse_column_cache) {
if (opt->runtime_state != nullptr) {
_be_exec_version = opt->runtime_state->be_exec_version();
}
@@ -746,10 +748,21 @@ Status Segment::new_column_iterator(const TabletColumn&
tablet_column,
return Status::InternalError("column reader is nullptr, unique_id={}",
unique_id);
}
if (reader->get_meta_type() == FieldType::OLAP_FIELD_TYPE_VARIANT) {
+ // if sparse_column_cache_ptr is nullptr, means the sparse column
cache is not used
+ PathToSparseColumnCache* sparse_column_cache_ptr = nullptr;
+ if (variant_sparse_column_cache) {
+ auto it = variant_sparse_column_cache->find(unique_id);
+ if (it != variant_sparse_column_cache->end()) {
+ sparse_column_cache_ptr = it->second.get();
+ } else {
+ DCHECK(false) << "sparse column cache is not found,
unique_id=" << unique_id;
+ }
+ }
// use _column_reader_cache to get variant subcolumn(path column)
reader
- RETURN_IF_ERROR(
- assert_cast<VariantColumnReader*>(reader.get())
- ->new_iterator(iter, &tablet_column, opt,
_column_reader_cache.get()));
+ RETURN_IF_ERROR(assert_cast<VariantColumnReader*>(reader.get())
+ ->new_iterator(iter, &tablet_column, opt,
+ _column_reader_cache.get(),
+ sparse_column_cache_ptr));
} else {
RETURN_IF_ERROR(reader->new_iterator(iter, &tablet_column, opt));
}
diff --git a/be/src/olap/rowset/segment_v2/segment.h
b/be/src/olap/rowset/segment_v2/segment.h
index 098c3c082f8..b2e3e29ec6b 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -38,6 +38,7 @@
#include "olap/page_cache.h"
#include "olap/rowset/segment_v2/column_reader.h" // ColumnReader
#include "olap/rowset/segment_v2/page_handle.h"
+#include "olap/rowset/segment_v2/variant/variant_column_reader.h"
#include "olap/schema.h"
#include "olap/tablet_schema.h"
#include "runtime/define_primitive_type.h"
@@ -110,12 +111,11 @@ public:
uint32_t num_rows() const { return _num_rows; }
+ // if variant_sparse_column_cache is nullptr, means the sparse column
cache is not used
Status new_column_iterator(const TabletColumn& tablet_column,
- std::unique_ptr<ColumnIterator>* iter,
- const StorageReadOptions* opt);
-
- Status new_column_iterator(int32_t unique_id, const StorageReadOptions*
opt,
- std::unique_ptr<ColumnIterator>* iter);
+ std::unique_ptr<ColumnIterator>* iter, const
StorageReadOptions* opt,
+ const std::unordered_map<int32_t,
PathToSparseColumnCacheUPtr>*
+ variant_sparse_column_cache = nullptr);
Status new_bitmap_index_iterator(const TabletColumn& tablet_column,
const StorageReadOptions& read_options,
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 3ac3f2c772a..57b627edf2b 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -323,8 +323,6 @@ Status SegmentIterator::_init_impl(const
StorageReadOptions& opts) {
_score_runtime = _opts.score_runtime;
_ann_topn_runtime = _opts.ann_topn_runtime;
- RETURN_IF_ERROR(init_iterators());
-
if (opts.output_columns != nullptr) {
_output_columns = *(opts.output_columns);
}
@@ -360,9 +358,16 @@ Status SegmentIterator::_init_impl(const
StorageReadOptions& opts) {
}
}
_storage_name_and_type[i] = std::make_pair(field_name,
storage_type);
+ if (int32_t uid = col->get_unique_id();
!_variant_sparse_column_cache.contains(uid)) {
+ DCHECK(uid >= 0);
+ _variant_sparse_column_cache.emplace(uid,
+
std::make_unique<PathToSparseColumnCache>());
+ }
}
}
+ RETURN_IF_ERROR(init_iterators());
+
RETURN_IF_ERROR(_construct_compound_expr_context());
_enable_common_expr_pushdown = !_common_expr_ctxs_push_down.empty();
VLOG_DEBUG << fmt::format(
@@ -575,7 +580,8 @@ Status SegmentIterator::_prepare_seek(const
StorageReadOptions::KeyRange& key_ra
}
RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
-
&_column_iterators[cid], &_opts));
+
&_column_iterators[cid], &_opts,
+
&_variant_sparse_column_cache));
ColumnIteratorOptions iter_opts {
.use_page_cache = _opts.use_page_cache,
.file_reader = _file_reader.get(),
@@ -1302,7 +1308,8 @@ Status SegmentIterator::_init_return_column_iterators() {
if (_column_iterators[cid] == nullptr) {
RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
-
&_column_iterators[cid], &_opts));
+
&_column_iterators[cid], &_opts,
+
&_variant_sparse_column_cache));
ColumnIteratorOptions iter_opts {
.use_page_cache = _opts.use_page_cache,
// If the col is predicate column, then should read the
last page to check
@@ -2309,8 +2316,6 @@ Status
SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
Status SegmentIterator::next_batch(vectorized::Block* block) {
// Replace virtual columns with ColumnNothing at the begining of each
next_batch call.
_init_virtual_columns(block);
- // Clear the sparse column cache before processing a new batch
- _opts.sparse_column_cache.clear();
auto status = [&]() {
RETURN_IF_CATCH_EXCEPTION({
auto res = _next_batch_internal(block);
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 7a82ed2f716..95b85649efa 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -503,6 +503,9 @@ private:
std::map<ColumnId, size_t> _vir_cid_to_idx_in_block;
IndexQueryContextPtr _index_query_context;
+
+ // key is column uid, value is the sparse column cache
+ std::unordered_map<int32_t, PathToSparseColumnCacheUPtr>
_variant_sparse_column_cache;
};
} // namespace segment_v2
diff --git
a/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h
b/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h
index a58fd5ca312..f4afa778ead 100644
--- a/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h
+++ b/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h
@@ -31,6 +31,7 @@
#include "olap/iterators.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/rowset/segment_v2/stream_reader.h"
+#include "olap/rowset/segment_v2/variant/variant_column_reader.h"
#include "olap/schema.h"
#include "olap/tablet_schema.h"
#include "vec/columns/column.h"
@@ -56,10 +57,8 @@ namespace doris::segment_v2 {
// Base class for sparse column processors with common functionality
class BaseSparseColumnProcessor : public ColumnIterator {
protected:
- vectorized::MutableColumnPtr _sparse_column;
- StorageReadOptions* _read_opts; // Shared cache pointer
- std::unique_ptr<ColumnIterator> _sparse_column_reader;
- int32_t _col_uid;
+ const StorageReadOptions* _read_opts;
+ SparseColumnCacheSPtr _sparse_column_cache;
// Pure virtual method for data processing when encounter existing sparse
columns(to be implemented by subclasses)
virtual void
_process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst,
size_t num_rows) =
0;
@@ -69,33 +68,17 @@ protected:
size_t num_rows) = 0;
public:
- BaseSparseColumnProcessor(std::unique_ptr<ColumnIterator>&& reader,
StorageReadOptions* opts,
- const TabletColumn& col)
- : _read_opts(opts),
- _sparse_column_reader(std::move(reader)),
- _col_uid(col.parent_unique_id()) {
- _sparse_column = vectorized::ColumnVariant::create_sparse_column_fn();
- }
+ BaseSparseColumnProcessor(SparseColumnCacheSPtr sparse_column_cache,
+ const StorageReadOptions* opts)
+ : _read_opts(opts),
_sparse_column_cache(std::move(sparse_column_cache)) {}
// Common initialization for all processors
Status init(const ColumnIteratorOptions& opts) override {
- return _sparse_column_reader->init(opts);
- }
-
- // When performing compaction, multiple columns are extracted from the
sparse columns,
- // and the sparse columns only need to be read once.
- // So we need to cache the sparse column and reuse it.
- // The cache is only used when the compaction reader is used.
- bool has_sparse_column_cache() const {
- return _read_opts && _read_opts->sparse_column_cache[_col_uid] &&
-
ColumnReader::is_compaction_reader_type(_read_opts->io_ctx.reader_type);
+ return _sparse_column_cache->init(opts);
}
Status seek_to_ordinal(ordinal_t ord) override {
- if (has_sparse_column_cache()) {
- return Status::OK();
- }
- return _sparse_column_reader->seek_to_ordinal(ord);
+ return _sparse_column_cache->seek_to_ordinal(ord);
}
ordinal_t get_current_ordinal() const override {
@@ -106,28 +89,18 @@ public:
template <typename ReadMethod>
Status _process_batch(ReadMethod&& read_method, size_t nrows,
vectorized::MutableColumnPtr& dst) {
- // Cache check and population logic
- if (has_sparse_column_cache()) {
- _sparse_column =
_read_opts->sparse_column_cache[_col_uid]->assume_mutable();
- } else {
- _sparse_column->clear();
- {
-
SCOPED_RAW_TIMER(&_read_opts->stats->variant_scan_sparse_column_timer_ns);
- int64_t before_size =
_read_opts->stats->uncompressed_bytes_read;
- RETURN_IF_ERROR(read_method());
- _read_opts->stats->variant_scan_sparse_column_bytes +=
- _read_opts->stats->uncompressed_bytes_read -
before_size;
- }
-
- // cache the sparse column
- if (_read_opts) {
- _read_opts->sparse_column_cache[_col_uid] =
_sparse_column->get_ptr();
- }
+ {
+
SCOPED_RAW_TIMER(&_read_opts->stats->variant_scan_sparse_column_timer_ns);
+ int64_t before_size = _read_opts->stats->uncompressed_bytes_read;
+ RETURN_IF_ERROR(read_method());
+ _read_opts->stats->variant_scan_sparse_column_bytes +=
+ _read_opts->stats->uncompressed_bytes_read - before_size;
}
-
SCOPED_RAW_TIMER(&_read_opts->stats->variant_fill_path_from_sparse_column_timer_ns);
+
SCOPED_RAW_TIMER(&_read_opts->stats->variant_fill_path_from_sparse_column_timer_ns);
const auto& offsets =
- assert_cast<const
vectorized::ColumnMap&>(*_sparse_column).get_offsets();
+ assert_cast<const
vectorized::ColumnMap&>(*_sparse_column_cache->sparse_column)
+ .get_offsets();
if (offsets.back() == offsets[-1]) {
// no sparse column in this batch
_process_data_without_sparse_column(dst, nrows);
@@ -142,25 +115,21 @@ public:
// Implementation for path extraction processor
class SparseColumnExtractIterator : public BaseSparseColumnProcessor {
public:
- SparseColumnExtractIterator(std::string_view path,
std::unique_ptr<ColumnIterator> reader,
- StorageReadOptions* opts, const TabletColumn&
col)
- : BaseSparseColumnProcessor(std::move(reader), opts, col),
_path(path) {}
+ SparseColumnExtractIterator(std::string_view path, SparseColumnCacheSPtr
sparse_column_cache,
+ const StorageReadOptions* opts)
+ : BaseSparseColumnProcessor(std::move(sparse_column_cache), opts),
_path(path) {}
// Batch processing using template method
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool*
has_null) override {
- return _process_batch(
- [&]() { return _sparse_column_reader->next_batch(n,
_sparse_column, has_null); },
- *n, dst);
+ return _process_batch([&]() { return
_sparse_column_cache->next_batch(n, has_null); }, *n,
+ dst);
}
// RowID-based read using template method
Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override {
- return _process_batch(
- [&]() {
- return _sparse_column_reader->read_by_rowids(rowids,
count, _sparse_column);
- },
- count, dst);
+ return _process_batch([&]() { return
_sparse_column_cache->read_by_rowids(rowids, count); },
+ count, dst);
}
private:
@@ -188,8 +157,9 @@ private:
nullable_column ? &nullable_column->get_null_map_data() :
nullptr;
vectorized::ColumnVariant::fill_path_column_from_sparse_data(
*var.get_subcolumn({}) /*root*/, null_map, StringRef
{_path.data(), _path.size()},
- _sparse_column->get_ptr(), 0, _sparse_column->size());
- var.incr_num_rows(_sparse_column->size());
+ _sparse_column_cache->sparse_column->get_ptr(), 0,
+ _sparse_column_cache->sparse_column->size());
+ var.incr_num_rows(_sparse_column_cache->sparse_column->size());
var.get_sparse_column()->assume_mutable()->resize(var.rows());
ENABLE_CHECK_CONSISTENCY(&var);
}
diff --git
a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp
b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp
index 5543ca736ac..ee5f3fd7a56 100644
--- a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp
@@ -37,7 +37,7 @@ namespace doris::segment_v2 {
#include "common/compile_check_begin.h"
Status SparseColumnMergeIterator::seek_to_ordinal(ordinal_t ord) {
- RETURN_IF_ERROR(_sparse_column_reader->seek_to_ordinal(ord));
+ RETURN_IF_ERROR(_sparse_column_cache->seek_to_ordinal(ord));
for (auto& entry : _src_subcolumns_for_sparse) {
RETURN_IF_ERROR(entry->data.iterator->seek_to_ordinal(ord));
}
@@ -45,7 +45,7 @@ Status SparseColumnMergeIterator::seek_to_ordinal(ordinal_t
ord) {
}
Status SparseColumnMergeIterator::init(const ColumnIteratorOptions& opts) {
- RETURN_IF_ERROR(_sparse_column_reader->init(opts));
+ RETURN_IF_ERROR(_sparse_column_cache->init(opts));
for (auto& entry : _src_subcolumns_for_sparse) {
entry->data.serde = entry->data.type->get_serde();
RETURN_IF_ERROR(entry->data.iterator->init(opts));
@@ -113,7 +113,8 @@ void
SparseColumnMergeIterator::_merge_to(vectorized::MutableColumnPtr& dst) {
assert_cast<vectorized::ColumnString&>(column_map.get_values());
auto& dst_sparse_column_offsets = column_map.get_offsets();
- const auto& src_column_map = assert_cast<const
vectorized::ColumnMap&>(*_sparse_column);
+ const auto& src_column_map =
+ assert_cast<const
vectorized::ColumnMap&>(*_sparse_column_cache->sparse_column);
const auto& src_sparse_column_paths =
assert_cast<const
vectorized::ColumnString&>(*src_column_map.get_keys_ptr());
const auto& src_sparse_column_values =
@@ -122,7 +123,7 @@ void
SparseColumnMergeIterator::_merge_to(vectorized::MutableColumnPtr& dst) {
DCHECK_EQ(src_sparse_column_paths.size(), src_sparse_column_values.size());
// Src object column contains some paths in serialized sparse column in
specified range.
// Iterate over this range and insert all required paths into serialized
sparse column or subcolumns.
- for (size_t row = 0; row != _sparse_column->size(); ++row) {
+ for (size_t row = 0; row != _sparse_column_cache->sparse_column->size();
++row) {
// Use separate index to iterate over sorted
sorted_src_subcolumn_for_sparse_column.
size_t sorted_src_subcolumn_for_sparse_column_idx = 0;
size_t sorted_src_subcolumn_for_sparse_column_size =
_src_subcolumns_for_sparse.size();
diff --git
a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h
b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h
index 4f0c132d2cb..bc3af0ef146 100644
--- a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h
+++ b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h
@@ -58,10 +58,10 @@ namespace doris::segment_v2 {
class SparseColumnMergeIterator : public BaseSparseColumnProcessor {
public:
SparseColumnMergeIterator(const TabletSchema::PathsSetInfo& path_set_info,
- std::unique_ptr<ColumnIterator>&&
sparse_column_reader,
+ SparseColumnCacheSPtr sparse_column_cache,
SubstreamReaderTree&& src_subcolumns_for_sparse,
- StorageReadOptions* opts, const TabletColumn&
col)
- : BaseSparseColumnProcessor(std::move(sparse_column_reader), opts,
col),
+ const StorageReadOptions* opts)
+ : BaseSparseColumnProcessor(std::move(sparse_column_cache), opts),
_src_subcolumn_map(path_set_info.sub_path_set),
_src_subcolumns_for_sparse(src_subcolumns_for_sparse) {}
Status init(const ColumnIteratorOptions& opts) override;
@@ -74,9 +74,8 @@ public:
return entry->data.iterator->next_batch(n, entry->data.column,
&has_null);
}));
// then read sparse column
- return _process_batch(
- [&]() { return _sparse_column_reader->next_batch(n,
_sparse_column, has_null); },
- *n, dst);
+ return _process_batch([&]() { return
_sparse_column_cache->next_batch(n, has_null); }, *n,
+ dst);
}
// RowID-based read using template method
@@ -87,11 +86,8 @@ public:
return entry->data.iterator->read_by_rowids(rowids, count,
entry->data.column);
}));
// then read sparse column
- return _process_batch(
- [&]() {
- return _sparse_column_reader->read_by_rowids(rowids,
count, _sparse_column);
- },
- count, dst);
+ return _process_batch([&]() { return
_sparse_column_cache->read_by_rowids(rowids, count); },
+ count, dst);
}
Status seek_to_ordinal(ordinal_t ord) override;
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
index 737ab3453fe..8e4edb74995 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
@@ -151,7 +151,7 @@ Status
VariantColumnReader::_create_hierarchical_reader(ColumnIteratorUPtr* read
Status VariantColumnReader::_create_sparse_merge_reader(ColumnIteratorUPtr*
iterator,
const
StorageReadOptions* opts,
const TabletColumn&
target_col,
- ColumnIteratorUPtr
inner_iter,
+ SparseColumnCacheSPtr
sparse_column_cache,
ColumnReaderCache*
column_reader_cache) {
// Get subcolumns path set from tablet schema
const auto& path_set_info =
opts->tablet_schema->path_set_info(target_col.parent_unique_id());
@@ -182,8 +182,8 @@ Status
VariantColumnReader::_create_sparse_merge_reader(ColumnIteratorUPtr* iter
VLOG_DEBUG << "subcolumns to merge " << src_subcolumns_for_sparse.size();
// Create sparse column merge reader
*iterator = std::make_unique<SparseColumnMergeIterator>(
- path_set_info, std::move(inner_iter),
std::move(src_subcolumns_for_sparse),
- const_cast<StorageReadOptions*>(opts), target_col);
+ path_set_info, std::move(sparse_column_cache),
std::move(src_subcolumns_for_sparse),
+ opts);
return Status::OK();
}
@@ -226,12 +226,29 @@ Status
VariantColumnReader::_new_default_iter_with_same_nested(
return Status::OK();
}
-Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIteratorUPtr*
iterator,
- const TabletColumn&
target_col,
- const
StorageReadOptions* opts,
- bool
exceeded_sparse_column_limit,
- bool
existed_in_sparse_column,
- ColumnReaderCache*
column_reader_cache) {
+Result<SparseColumnCacheSPtr> VariantColumnReader::_get_shared_column_cache(
+ PathToSparseColumnCache* sparse_column_cache_ptr, const std::string&
path) {
+ if (!sparse_column_cache_ptr || !sparse_column_cache_ptr->contains(path)) {
+ ColumnIteratorUPtr inner_iter;
+
RETURN_IF_ERROR_RESULT(_sparse_column_reader->new_iterator(&inner_iter,
nullptr));
+ vectorized::MutableColumnPtr sparse_column =
+ vectorized::ColumnVariant::create_sparse_column_fn();
+ auto sparse_column_cache =
std::make_shared<SparseColumnCache>(std::move(inner_iter),
+
std::move(sparse_column));
+ // if sparse_column_cache_ptr is nullptr, means the sparse column
cache is not used
+ if (sparse_column_cache_ptr) {
+ sparse_column_cache_ptr->emplace(path, sparse_column_cache);
+ }
+ return sparse_column_cache;
+ }
+ return sparse_column_cache_ptr->at(path);
+}
+
+Status VariantColumnReader::_new_iterator_with_flat_leaves(
+ ColumnIteratorUPtr* iterator, const TabletColumn& target_col,
+ const StorageReadOptions* opts, bool exceeded_sparse_column_limit,
+ bool existed_in_sparse_column, ColumnReaderCache* column_reader_cache,
+ PathToSparseColumnCache* sparse_column_cache_ptr) {
DCHECK(opts != nullptr);
auto relative_path = target_col.path_info_ptr()->copy_pop_front();
// compaction need to read flat leaves nodes data to prevent from
amplification
@@ -240,11 +257,11 @@ Status
VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIteratorUPtr* i
if (!node) {
if (relative_path.get_path() == SPARSE_COLUMN_PATH &&
_sparse_column_reader != nullptr) {
// read sparse column and filter extracted columns in
subcolumn_path_map
- std::unique_ptr<ColumnIterator> inner_iter;
- RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter,
nullptr));
+ SparseColumnCacheSPtr sparse_column_cache = DORIS_TRY(
+ _get_shared_column_cache(sparse_column_cache_ptr,
SPARSE_COLUMN_PATH));
// get subcolumns in sparse path set which will be merged into
sparse column
- RETURN_IF_ERROR(_create_sparse_merge_reader(
- iterator, opts, target_col, std::move(inner_iter),
column_reader_cache));
+ RETURN_IF_ERROR(_create_sparse_merge_reader(iterator, opts,
target_col,
+ sparse_column_cache,
column_reader_cache));
return Status::OK();
}
@@ -259,13 +276,11 @@ Status
VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIteratorUPtr* i
// even if the sparse column size is reached limit
if (existed_in_sparse_column || exceeded_sparse_column_limit) {
// Sparse column exists or reached sparse size limit, read sparse
column
- ColumnIteratorUPtr inner_iter;
- RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter,
nullptr));
+ SparseColumnCacheSPtr sparse_column_cache = DORIS_TRY(
+ _get_shared_column_cache(sparse_column_cache_ptr,
SPARSE_COLUMN_PATH));
DCHECK(opts);
*iterator = std::make_unique<SparseColumnExtractIterator>(
- relative_path.get_path(), std::move(inner_iter),
- // need to modify sparse_column_cache, so use const_cast
here
- const_cast<StorageReadOptions*>(opts), target_col);
+ relative_path.get_path(), std::move(sparse_column_cache),
opts);
return Status::OK();
}
@@ -297,7 +312,8 @@ Status
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
Status VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
const TabletColumn* target_col,
const StorageReadOptions* opt,
- ColumnReaderCache*
column_reader_cache) {
+ ColumnReaderCache*
column_reader_cache,
+ PathToSparseColumnCache*
sparse_column_cache_ptr) {
int32_t col_uid =
target_col->unique_id() >= 0 ? target_col->unique_id() :
target_col->parent_unique_id();
// root column use unique id, leaf column use parent_unique_id
@@ -347,9 +363,9 @@ Status
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
if (need_read_flat_leaves(opt)) {
// original path, compaction with wide schema
- return _new_iterator_with_flat_leaves(iterator, *target_col, opt,
- exceeded_sparse_column_limit,
- existed_in_sparse_column,
column_reader_cache);
+ return _new_iterator_with_flat_leaves(
+ iterator, *target_col, opt, exceeded_sparse_column_limit,
existed_in_sparse_column,
+ column_reader_cache, sparse_column_cache_ptr);
}
// Check if path is prefix, example sparse columns path: a.b.c, a.b.e,
access prefix: a.b.
@@ -374,13 +390,12 @@ Status
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
// {"b" : {"c":456}} b.c in subcolumn
// {"b" : 123} b in sparse column
// Then we should use hierarchical reader to read b
- ColumnIteratorUPtr inner_iter;
- RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter,
nullptr));
+ SparseColumnCacheSPtr sparse_column_cache =
+ DORIS_TRY(_get_shared_column_cache(sparse_column_cache_ptr,
SPARSE_COLUMN_PATH));
DCHECK(opt);
// Sparse column exists or reached sparse size limit, read sparse
column
*iterator = std::make_unique<SparseColumnExtractIterator>(
- relative_path.get_path(), std::move(inner_iter),
- const_cast<StorageReadOptions*>(opt), *target_col);
+ relative_path.get_path(), std::move(sparse_column_cache), opt);
opt->stats->variant_subtree_sparse_iter_count++;
return Status::OK();
}
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
index 1ba16881419..8511e152dca 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
@@ -48,6 +48,120 @@ class InvertedIndexIterator;
class InvertedIndexFileReader;
class ColumnReaderCache;
+/**
+ * SparseColumnCache provides a caching layer for sparse column data access.
+ *
+ * The "shared" aspect refers to the ability to share cached column data
between
+ * multiple iterators or readers that access the same column
(SPARSE_COLUMN_PATH). This reduces
+ * redundant I/O operations and memory usage when multiple consumers need the
+ * same column data.
+ *
+ * Key features:
+ * - Caches column data after reading to avoid repeated I/O
+ * - Maintains state to track the current data validity
+ * - Supports both sequential (next_batch) and random (read_by_rowids) access
patterns
+ * - Optimizes performance by reusing cached data when possible
+ *
+ * The cache operates in different states:
+ * - INVALID: Cache is uninitialized
+ * - INITED: Iterator is initialized but no data cached
+ * - SEEKED_NEXT_BATCHED: Data cached from sequential read
+ * - READ_BY_ROWIDS: Data cached from random access read
+ */
+struct SparseColumnCache {
+ const ColumnIteratorUPtr sparse_column_iterator = nullptr;
+ vectorized::MutableColumnPtr sparse_column = nullptr;
+
+ enum class State : uint8_t {
+ INVALID = 0,
+ INITED = 1,
+ SEEKED_NEXT_BATCHED = 2,
+ READ_BY_ROWIDS = 3,
+ };
+ State state = State::INVALID;
+
+ ordinal_t offset = 0; // Current offset position for
sequential reads
+ std::unique_ptr<rowid_t[]> rowids; // Cached row IDs for random access
reads
+ size_t length = 0; // Length of cached data
+
+ SparseColumnCache() = default;
+ SparseColumnCache(ColumnIteratorUPtr _column_iterator,
vectorized::MutableColumnPtr _column)
+ : sparse_column_iterator(std::move(_column_iterator)),
+ sparse_column(std::move(_column)) {}
+
+ Status init(const ColumnIteratorOptions& opts) {
+ if (state >= State::INITED) {
+ return Status::OK();
+ }
+ reset(State::INITED);
+ return sparse_column_iterator->init(opts);
+ }
+
+ Status seek_to_ordinal(ordinal_t ord) {
+ // in the different batch, we need to reset the state to
SEEKED_NEXT_BATCHED
+ if (state == State::SEEKED_NEXT_BATCHED && offset == ord) {
+ return Status::OK();
+ }
+ reset(State::SEEKED_NEXT_BATCHED);
+ RETURN_IF_ERROR(sparse_column_iterator->seek_to_ordinal(ord));
+ offset = ord;
+ return Status::OK();
+ }
+
+ Status next_batch(size_t* _n, bool* _has_null) {
+ // length is 0, means data is not cached, need to read from iterator
+ if (length != 0) {
+ DCHECK(state == State::SEEKED_NEXT_BATCHED);
+ *_n = length;
+ return Status::OK();
+ }
+ sparse_column->clear();
+ DCHECK(state == State::SEEKED_NEXT_BATCHED);
+ RETURN_IF_ERROR(sparse_column_iterator->next_batch(_n, sparse_column,
_has_null));
+ length = *_n; // update length
+ return Status::OK();
+ }
+
+ Status read_by_rowids(const rowid_t* _rowids, const size_t _count) {
+ // if rowsids or count is different from cached data, need to read
from iterator
+ // in the different batch, we need to reset the state to READ_BY_ROWIDS
+ if (is_read_by_rowids(_rowids, _count)) {
+ return Status::OK();
+ }
+ reset(State::READ_BY_ROWIDS);
+ RETURN_IF_ERROR(sparse_column_iterator->read_by_rowids(_rowids,
_count, sparse_column));
+ length = _count; // update length
+ rowids = std::make_unique<rowid_t[]>(_count); // update rowids
+ std::copy(_rowids, _rowids + _count, rowids.get());
+ return Status::OK();
+ }
+
+ void reset(State _state) {
+ state = _state;
+ offset = 0;
+ length = 0;
+ sparse_column->clear();
+ rowids.reset();
+ }
+
+ bool is_read_by_rowids(const rowid_t* _rowids, const size_t _count) const {
+ if (state != State::READ_BY_ROWIDS) {
+ return false;
+ }
+ if (length != _count) {
+ return false;
+ }
+ return std::equal(_rowids, _rowids + _count, rowids.get());
+ }
+};
+
+using SparseColumnCacheSPtr = std::shared_ptr<SparseColumnCache>;
+
+// key is column path, value is the sparse column cache
+// now column path is only SPARSE_COLUMN_PATH, in the future, we can add more
sparse column paths
+using PathToSparseColumnCache = std::unordered_map<std::string,
SparseColumnCacheSPtr>;
+using PathToSparseColumnCacheUPtr = std::unique_ptr<PathToSparseColumnCache>;
+
class VariantColumnReader : public ColumnReader {
public:
VariantColumnReader() = default;
@@ -59,7 +173,8 @@ public:
const StorageReadOptions* opt) override;
Status new_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* col,
- const StorageReadOptions* opt, ColumnReaderCache*
column_reader_cache);
+ const StorageReadOptions* opt, ColumnReaderCache*
column_reader_cache,
+ PathToSparseColumnCache* sparse_column_cache_ptr =
nullptr);
virtual const SubcolumnColumnMetaInfo::Node* get_subcolumn_meta_by_path(
const vectorized::PathInData& relative_path) const;
@@ -96,11 +211,11 @@ private:
Status _new_default_iter_with_same_nested(ColumnIteratorUPtr* iterator,
const TabletColumn& col,
const StorageReadOptions* opt,
ColumnReaderCache*
column_reader_cache);
- Status _new_iterator_with_flat_leaves(ColumnIteratorUPtr* iterator, const
TabletColumn& col,
- const StorageReadOptions* opts,
- bool exceeded_sparse_column_limit,
- bool existed_in_sparse_column,
- ColumnReaderCache*
column_reader_cache);
+ Status _new_iterator_with_flat_leaves(
+ ColumnIteratorUPtr* iterator, const TabletColumn& col, const
StorageReadOptions* opts,
+ bool exceeded_sparse_column_limit, bool existed_in_sparse_column,
+ ColumnReaderCache* column_reader_cache,
+ PathToSparseColumnCache* sparse_column_cache_ptr = nullptr);
Status _create_hierarchical_reader(ColumnIteratorUPtr* reader, int32_t
col_uid,
vectorized::PathInData path,
@@ -110,8 +225,11 @@ private:
OlapReaderStatistics* stats);
Status _create_sparse_merge_reader(ColumnIteratorUPtr* iterator, const
StorageReadOptions* opts,
const TabletColumn& target_col,
- ColumnIteratorUPtr inner_iter,
+ SparseColumnCacheSPtr
sparse_column_cache,
ColumnReaderCache* column_reader_cache);
+
+ Result<SparseColumnCacheSPtr> _get_shared_column_cache(
+ PathToSparseColumnCache* sparse_column_cache_ptr, const
std::string& path);
std::unique_ptr<SubcolumnColumnMetaInfo> _subcolumns_meta_info;
std::shared_ptr<ColumnReader> _sparse_column_reader;
std::shared_ptr<ColumnReader> _root_column_reader;
diff --git
a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
index 420202b20dd..063709aff31 100644
--- a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
+++ b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
@@ -398,6 +398,9 @@ TEST_F(VariantColumnWriterReaderTest,
test_write_data_normal) {
};
// 10. check sparse extract reader
+ PathToSparseColumnCacheUPtr sparse_column_cache =
+ std::make_unique<std::unordered_map<std::string,
SparseColumnCacheSPtr>>();
+ stats.bytes_read = 0;
for (int i = 3; i < 10; ++i) {
std::string key = ".key" + std::to_string(i);
TabletColumn subcolumn_in_sparse;
@@ -411,26 +414,16 @@ TEST_F(VariantColumnWriterReaderTest,
test_write_data_normal) {
ColumnIteratorUPtr it;
st = variant_column_reader->new_iterator(&it, &subcolumn_in_sparse,
&storage_read_opts,
- &column_reader_cache);
+ &column_reader_cache,
sparse_column_cache.get());
EXPECT_TRUE(st.ok()) << st.msg();
EXPECT_TRUE(assert_cast<SparseColumnExtractIterator*>(it.get()) !=
nullptr);
st = it->init(column_iter_opts);
EXPECT_TRUE(st.ok()) << st.msg();
+ int64_t before_bytes_read = stats.bytes_read;
read_to_column_object(it);
- {
- // read with opt
- auto* iter = assert_cast<SparseColumnExtractIterator*>(it.get());
- StorageReadOptions storage_read_opts1;
- storage_read_opts1.stats = &stats;
- storage_read_opts1.io_ctx.reader_type = ReaderType::READER_QUERY;
- iter->_read_opts = &storage_read_opts1;
- st = iter->next_batch(&nrows, new_column_object, nullptr);
- EXPECT_TRUE(st.ok()) << st.msg();
- EXPECT_TRUE(stats.bytes_read > 0);
- iter->_read_opts->io_ctx.reader_type =
ReaderType::READER_BASE_COMPACTION;
- st = iter->next_batch(&nrows, new_column_object, nullptr);
- EXPECT_TRUE(st.ok()) << st.msg();
+ if (before_bytes_read != 0) {
+ EXPECT_EQ(stats.bytes_read, before_bytes_read);
}
for (int row = 0; row < 1000; ++row) {
@@ -676,6 +669,7 @@ TEST_F(VariantColumnWriterReaderTest,
test_write_data_normal) {
&column_reader_cache);
EXPECT_TRUE(st.ok()) << st.msg();
EXPECT_TRUE(assert_cast<SparseColumnExtractIterator*>(it5.get()) !=
nullptr);
+ EXPECT_TRUE(it5->init(column_iter_opts).ok());
{
// test SparseColumnExtractIterator seek_to_first
@@ -701,6 +695,8 @@ TEST_F(VariantColumnWriterReaderTest,
test_write_data_normal) {
MutableColumnPtr sparse_dst3 = ColumnVariant::create(3);
size_t rs = 1000;
bool has_null = false;
+ st = iter->seek_to_ordinal(0);
+ EXPECT_TRUE(st.ok()) << st.msg();
st = iter->next_batch(&rs, sparse_dst3, &has_null);
EXPECT_TRUE(st.ok()) << st.msg();
EXPECT_TRUE(sparse_dst3->size() == row_ids1.size());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]