This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 61108154d7b branch-4.0: [opt](variant) add column cache for variant 
sparse column #56159 (#56543)
61108154d7b is described below

commit 61108154d7b046d6013ef71065e29cfc13553584
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Sep 28 20:02:43 2025 +0800

    branch-4.0: [opt](variant) add column cache for variant sparse column 
#56159 (#56543)
    
    Cherry-picked from #56159
    
    Co-authored-by: Sun Chenyang <[email protected]>
---
 be/src/olap/field.h                                |   4 +
 be/src/olap/iterators.h                            |   4 -
 be/src/olap/rowset/segment_v2/column_reader.h      |   1 +
 be/src/olap/rowset/segment_v2/segment.cpp          |  21 +++-
 be/src/olap/rowset/segment_v2/segment.h            |  10 +-
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  17 ++-
 be/src/olap/rowset/segment_v2/segment_iterator.h   |   3 +
 .../variant/sparse_column_extract_iterator.h       |  84 +++++--------
 .../variant/sparse_column_merge_iterator.cpp       |   9 +-
 .../variant/sparse_column_merge_iterator.h         |  18 ++-
 .../segment_v2/variant/variant_column_reader.cpp   |  67 +++++++----
 .../segment_v2/variant/variant_column_reader.h     | 132 +++++++++++++++++++--
 .../variant_column_writer_reader_test.cpp          |  24 ++--
 13 files changed, 256 insertions(+), 138 deletions(-)

diff --git a/be/src/olap/field.h b/be/src/olap/field.h
index 69b3ac02dec..75adf75ed82 100644
--- a/be/src/olap/field.h
+++ b/be/src/olap/field.h
@@ -212,6 +212,10 @@ public:
     int32_t get_scale() const { return _scale; }
     const TabletColumn& get_desc() const { return _desc; }
 
+    int32_t get_unique_id() const {
+        return is_extracted_column() ? parent_unique_id() : unique_id();
+    }
+
 protected:
     TypeInfoPtr _type_info;
     TabletColumn _desc;
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index f2aa423f72b..5d2b319dd23 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -129,10 +129,6 @@ public:
 
     std::shared_ptr<vectorized::ScoreRuntime> score_runtime;
     CollectionStatisticsPtr collection_statistics;
-
-    // Cache for sparse column data to avoid redundant reads
-    // col_unique_id -> cached column_ptr
-    std::unordered_map<int32_t, vectorized::ColumnPtr> sparse_column_cache;
 };
 
 struct CompactionSampleInfo {
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h 
b/be/src/olap/rowset/segment_v2/column_reader.h
index ba0c8804ebd..91aadffbfce 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -120,6 +120,7 @@ class FileColumnIterator;
 using ColumnIteratorUPtr = std::unique_ptr<ColumnIterator>;
 using OffsetFileColumnIteratorUPtr = std::unique_ptr<OffsetFileColumnIterator>;
 using FileColumnIteratorUPtr = std::unique_ptr<FileColumnIterator>;
+using ColumnIteratorSPtr = std::shared_ptr<ColumnIterator>;
 
 // There will be concurrent users to read the same column. So
 // we should do our best to reduce resource usage through share
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index 9334d75fb16..0675c357ccb 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -725,7 +725,9 @@ Status Segment::new_default_iterator(const TabletColumn& 
tablet_column,
 // but they are not the same column
 Status Segment::new_column_iterator(const TabletColumn& tablet_column,
                                     std::unique_ptr<ColumnIterator>* iter,
-                                    const StorageReadOptions* opt) {
+                                    const StorageReadOptions* opt,
+                                    const std::unordered_map<int32_t, 
PathToSparseColumnCacheUPtr>*
+                                            variant_sparse_column_cache) {
     if (opt->runtime_state != nullptr) {
         _be_exec_version = opt->runtime_state->be_exec_version();
     }
@@ -746,10 +748,21 @@ Status Segment::new_column_iterator(const TabletColumn& 
tablet_column,
         return Status::InternalError("column reader is nullptr, unique_id={}", 
unique_id);
     }
     if (reader->get_meta_type() == FieldType::OLAP_FIELD_TYPE_VARIANT) {
+        // if sparse_column_cache_ptr is nullptr, means the sparse column 
cache is not used
+        PathToSparseColumnCache* sparse_column_cache_ptr = nullptr;
+        if (variant_sparse_column_cache) {
+            auto it = variant_sparse_column_cache->find(unique_id);
+            if (it != variant_sparse_column_cache->end()) {
+                sparse_column_cache_ptr = it->second.get();
+            } else {
+                DCHECK(false) << "sparse column cache is not found, 
unique_id=" << unique_id;
+            }
+        }
         // use _column_reader_cache to get variant subcolumn(path column) 
reader
-        RETURN_IF_ERROR(
-                assert_cast<VariantColumnReader*>(reader.get())
-                        ->new_iterator(iter, &tablet_column, opt, 
_column_reader_cache.get()));
+        RETURN_IF_ERROR(assert_cast<VariantColumnReader*>(reader.get())
+                                ->new_iterator(iter, &tablet_column, opt,
+                                               _column_reader_cache.get(),
+                                               sparse_column_cache_ptr));
     } else {
         RETURN_IF_ERROR(reader->new_iterator(iter, &tablet_column, opt));
     }
diff --git a/be/src/olap/rowset/segment_v2/segment.h 
b/be/src/olap/rowset/segment_v2/segment.h
index 098c3c082f8..b2e3e29ec6b 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -38,6 +38,7 @@
 #include "olap/page_cache.h"
 #include "olap/rowset/segment_v2/column_reader.h" // ColumnReader
 #include "olap/rowset/segment_v2/page_handle.h"
+#include "olap/rowset/segment_v2/variant/variant_column_reader.h"
 #include "olap/schema.h"
 #include "olap/tablet_schema.h"
 #include "runtime/define_primitive_type.h"
@@ -110,12 +111,11 @@ public:
 
     uint32_t num_rows() const { return _num_rows; }
 
+    // if variant_sparse_column_cache is nullptr, means the sparse column 
cache is not used
     Status new_column_iterator(const TabletColumn& tablet_column,
-                               std::unique_ptr<ColumnIterator>* iter,
-                               const StorageReadOptions* opt);
-
-    Status new_column_iterator(int32_t unique_id, const StorageReadOptions* 
opt,
-                               std::unique_ptr<ColumnIterator>* iter);
+                               std::unique_ptr<ColumnIterator>* iter, const 
StorageReadOptions* opt,
+                               const std::unordered_map<int32_t, 
PathToSparseColumnCacheUPtr>*
+                                       variant_sparse_column_cache = nullptr);
 
     Status new_bitmap_index_iterator(const TabletColumn& tablet_column,
                                      const StorageReadOptions& read_options,
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 3ac3f2c772a..57b627edf2b 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -323,8 +323,6 @@ Status SegmentIterator::_init_impl(const 
StorageReadOptions& opts) {
     _score_runtime = _opts.score_runtime;
     _ann_topn_runtime = _opts.ann_topn_runtime;
 
-    RETURN_IF_ERROR(init_iterators());
-
     if (opts.output_columns != nullptr) {
         _output_columns = *(opts.output_columns);
     }
@@ -360,9 +358,16 @@ Status SegmentIterator::_init_impl(const 
StorageReadOptions& opts) {
                 }
             }
             _storage_name_and_type[i] = std::make_pair(field_name, 
storage_type);
+            if (int32_t uid = col->get_unique_id(); 
!_variant_sparse_column_cache.contains(uid)) {
+                DCHECK(uid >= 0);
+                _variant_sparse_column_cache.emplace(uid,
+                                                     
std::make_unique<PathToSparseColumnCache>());
+            }
         }
     }
 
+    RETURN_IF_ERROR(init_iterators());
+
     RETURN_IF_ERROR(_construct_compound_expr_context());
     _enable_common_expr_pushdown = !_common_expr_ctxs_push_down.empty();
     VLOG_DEBUG << fmt::format(
@@ -575,7 +580,8 @@ Status SegmentIterator::_prepare_seek(const 
StorageReadOptions::KeyRange& key_ra
             }
 
             
RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
-                                                          
&_column_iterators[cid], &_opts));
+                                                          
&_column_iterators[cid], &_opts,
+                                                          
&_variant_sparse_column_cache));
             ColumnIteratorOptions iter_opts {
                     .use_page_cache = _opts.use_page_cache,
                     .file_reader = _file_reader.get(),
@@ -1302,7 +1308,8 @@ Status SegmentIterator::_init_return_column_iterators() {
 
         if (_column_iterators[cid] == nullptr) {
             
RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
-                                                          
&_column_iterators[cid], &_opts));
+                                                          
&_column_iterators[cid], &_opts,
+                                                          
&_variant_sparse_column_cache));
             ColumnIteratorOptions iter_opts {
                     .use_page_cache = _opts.use_page_cache,
                     // If the col is predicate column, then should read the 
last page to check
@@ -2309,8 +2316,6 @@ Status 
SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
 Status SegmentIterator::next_batch(vectorized::Block* block) {
     // Replace virtual columns with ColumnNothing at the begining of each 
next_batch call.
     _init_virtual_columns(block);
-    // Clear the sparse column cache before processing a new batch
-    _opts.sparse_column_cache.clear();
     auto status = [&]() {
         RETURN_IF_CATCH_EXCEPTION({
             auto res = _next_batch_internal(block);
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h 
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 7a82ed2f716..95b85649efa 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -503,6 +503,9 @@ private:
     std::map<ColumnId, size_t> _vir_cid_to_idx_in_block;
 
     IndexQueryContextPtr _index_query_context;
+
+    // key is column uid, value is the sparse column cache
+    std::unordered_map<int32_t, PathToSparseColumnCacheUPtr> 
_variant_sparse_column_cache;
 };
 
 } // namespace segment_v2
diff --git 
a/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h 
b/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h
index a58fd5ca312..f4afa778ead 100644
--- a/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h
+++ b/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h
@@ -31,6 +31,7 @@
 #include "olap/iterators.h"
 #include "olap/rowset/segment_v2/column_reader.h"
 #include "olap/rowset/segment_v2/stream_reader.h"
+#include "olap/rowset/segment_v2/variant/variant_column_reader.h"
 #include "olap/schema.h"
 #include "olap/tablet_schema.h"
 #include "vec/columns/column.h"
@@ -56,10 +57,8 @@ namespace doris::segment_v2 {
 // Base class for sparse column processors with common functionality
 class BaseSparseColumnProcessor : public ColumnIterator {
 protected:
-    vectorized::MutableColumnPtr _sparse_column;
-    StorageReadOptions* _read_opts; // Shared cache pointer
-    std::unique_ptr<ColumnIterator> _sparse_column_reader;
-    int32_t _col_uid;
+    const StorageReadOptions* _read_opts;
+    SparseColumnCacheSPtr _sparse_column_cache;
     // Pure virtual method for data processing when encounter existing sparse 
columns(to be implemented by subclasses)
     virtual void 
_process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst,
                                                            size_t num_rows) = 
0;
@@ -69,33 +68,17 @@ protected:
                                                      size_t num_rows) = 0;
 
 public:
-    BaseSparseColumnProcessor(std::unique_ptr<ColumnIterator>&& reader, 
StorageReadOptions* opts,
-                              const TabletColumn& col)
-            : _read_opts(opts),
-              _sparse_column_reader(std::move(reader)),
-              _col_uid(col.parent_unique_id()) {
-        _sparse_column = vectorized::ColumnVariant::create_sparse_column_fn();
-    }
+    BaseSparseColumnProcessor(SparseColumnCacheSPtr sparse_column_cache,
+                              const StorageReadOptions* opts)
+            : _read_opts(opts), 
_sparse_column_cache(std::move(sparse_column_cache)) {}
 
     // Common initialization for all processors
     Status init(const ColumnIteratorOptions& opts) override {
-        return _sparse_column_reader->init(opts);
-    }
-
-    // When performing compaction, multiple columns are extracted from the 
sparse columns,
-    // and the sparse columns only need to be read once.
-    // So we need to cache the sparse column and reuse it.
-    // The cache is only used when the compaction reader is used.
-    bool has_sparse_column_cache() const {
-        return _read_opts && _read_opts->sparse_column_cache[_col_uid] &&
-               
ColumnReader::is_compaction_reader_type(_read_opts->io_ctx.reader_type);
+        return _sparse_column_cache->init(opts);
     }
 
     Status seek_to_ordinal(ordinal_t ord) override {
-        if (has_sparse_column_cache()) {
-            return Status::OK();
-        }
-        return _sparse_column_reader->seek_to_ordinal(ord);
+        return _sparse_column_cache->seek_to_ordinal(ord);
     }
 
     ordinal_t get_current_ordinal() const override {
@@ -106,28 +89,18 @@ public:
     template <typename ReadMethod>
     Status _process_batch(ReadMethod&& read_method, size_t nrows,
                           vectorized::MutableColumnPtr& dst) {
-        // Cache check and population logic
-        if (has_sparse_column_cache()) {
-            _sparse_column = 
_read_opts->sparse_column_cache[_col_uid]->assume_mutable();
-        } else {
-            _sparse_column->clear();
-            {
-                
SCOPED_RAW_TIMER(&_read_opts->stats->variant_scan_sparse_column_timer_ns);
-                int64_t before_size = 
_read_opts->stats->uncompressed_bytes_read;
-                RETURN_IF_ERROR(read_method());
-                _read_opts->stats->variant_scan_sparse_column_bytes +=
-                        _read_opts->stats->uncompressed_bytes_read - 
before_size;
-            }
-
-            // cache the sparse column
-            if (_read_opts) {
-                _read_opts->sparse_column_cache[_col_uid] = 
_sparse_column->get_ptr();
-            }
+        {
+            
SCOPED_RAW_TIMER(&_read_opts->stats->variant_scan_sparse_column_timer_ns);
+            int64_t before_size = _read_opts->stats->uncompressed_bytes_read;
+            RETURN_IF_ERROR(read_method());
+            _read_opts->stats->variant_scan_sparse_column_bytes +=
+                    _read_opts->stats->uncompressed_bytes_read - before_size;
         }
-        
SCOPED_RAW_TIMER(&_read_opts->stats->variant_fill_path_from_sparse_column_timer_ns);
 
+        
SCOPED_RAW_TIMER(&_read_opts->stats->variant_fill_path_from_sparse_column_timer_ns);
         const auto& offsets =
-                assert_cast<const 
vectorized::ColumnMap&>(*_sparse_column).get_offsets();
+                assert_cast<const 
vectorized::ColumnMap&>(*_sparse_column_cache->sparse_column)
+                        .get_offsets();
         if (offsets.back() == offsets[-1]) {
             // no sparse column in this batch
             _process_data_without_sparse_column(dst, nrows);
@@ -142,25 +115,21 @@ public:
 // Implementation for path extraction processor
 class SparseColumnExtractIterator : public BaseSparseColumnProcessor {
 public:
-    SparseColumnExtractIterator(std::string_view path, 
std::unique_ptr<ColumnIterator> reader,
-                                StorageReadOptions* opts, const TabletColumn& 
col)
-            : BaseSparseColumnProcessor(std::move(reader), opts, col), 
_path(path) {}
+    SparseColumnExtractIterator(std::string_view path, SparseColumnCacheSPtr 
sparse_column_cache,
+                                const StorageReadOptions* opts)
+            : BaseSparseColumnProcessor(std::move(sparse_column_cache), opts), 
_path(path) {}
 
     // Batch processing using template method
     Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* 
has_null) override {
-        return _process_batch(
-                [&]() { return _sparse_column_reader->next_batch(n, 
_sparse_column, has_null); },
-                *n, dst);
+        return _process_batch([&]() { return 
_sparse_column_cache->next_batch(n, has_null); }, *n,
+                              dst);
     }
 
     // RowID-based read using template method
     Status read_by_rowids(const rowid_t* rowids, const size_t count,
                           vectorized::MutableColumnPtr& dst) override {
-        return _process_batch(
-                [&]() {
-                    return _sparse_column_reader->read_by_rowids(rowids, 
count, _sparse_column);
-                },
-                count, dst);
+        return _process_batch([&]() { return 
_sparse_column_cache->read_by_rowids(rowids, count); },
+                              count, dst);
     }
 
 private:
@@ -188,8 +157,9 @@ private:
                 nullable_column ? &nullable_column->get_null_map_data() : 
nullptr;
         vectorized::ColumnVariant::fill_path_column_from_sparse_data(
                 *var.get_subcolumn({}) /*root*/, null_map, StringRef 
{_path.data(), _path.size()},
-                _sparse_column->get_ptr(), 0, _sparse_column->size());
-        var.incr_num_rows(_sparse_column->size());
+                _sparse_column_cache->sparse_column->get_ptr(), 0,
+                _sparse_column_cache->sparse_column->size());
+        var.incr_num_rows(_sparse_column_cache->sparse_column->size());
         var.get_sparse_column()->assume_mutable()->resize(var.rows());
         ENABLE_CHECK_CONSISTENCY(&var);
     }
diff --git 
a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp 
b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp
index 5543ca736ac..ee5f3fd7a56 100644
--- a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp
@@ -37,7 +37,7 @@ namespace doris::segment_v2 {
 #include "common/compile_check_begin.h"
 
 Status SparseColumnMergeIterator::seek_to_ordinal(ordinal_t ord) {
-    RETURN_IF_ERROR(_sparse_column_reader->seek_to_ordinal(ord));
+    RETURN_IF_ERROR(_sparse_column_cache->seek_to_ordinal(ord));
     for (auto& entry : _src_subcolumns_for_sparse) {
         RETURN_IF_ERROR(entry->data.iterator->seek_to_ordinal(ord));
     }
@@ -45,7 +45,7 @@ Status SparseColumnMergeIterator::seek_to_ordinal(ordinal_t 
ord) {
 }
 
 Status SparseColumnMergeIterator::init(const ColumnIteratorOptions& opts) {
-    RETURN_IF_ERROR(_sparse_column_reader->init(opts));
+    RETURN_IF_ERROR(_sparse_column_cache->init(opts));
     for (auto& entry : _src_subcolumns_for_sparse) {
         entry->data.serde = entry->data.type->get_serde();
         RETURN_IF_ERROR(entry->data.iterator->init(opts));
@@ -113,7 +113,8 @@ void 
SparseColumnMergeIterator::_merge_to(vectorized::MutableColumnPtr& dst) {
             assert_cast<vectorized::ColumnString&>(column_map.get_values());
     auto& dst_sparse_column_offsets = column_map.get_offsets();
 
-    const auto& src_column_map = assert_cast<const 
vectorized::ColumnMap&>(*_sparse_column);
+    const auto& src_column_map =
+            assert_cast<const 
vectorized::ColumnMap&>(*_sparse_column_cache->sparse_column);
     const auto& src_sparse_column_paths =
             assert_cast<const 
vectorized::ColumnString&>(*src_column_map.get_keys_ptr());
     const auto& src_sparse_column_values =
@@ -122,7 +123,7 @@ void 
SparseColumnMergeIterator::_merge_to(vectorized::MutableColumnPtr& dst) {
     DCHECK_EQ(src_sparse_column_paths.size(), src_sparse_column_values.size());
     // Src object column contains some paths in serialized sparse column in 
specified range.
     // Iterate over this range and insert all required paths into serialized 
sparse column or subcolumns.
-    for (size_t row = 0; row != _sparse_column->size(); ++row) {
+    for (size_t row = 0; row != _sparse_column_cache->sparse_column->size(); 
++row) {
         // Use separate index to iterate over sorted 
sorted_src_subcolumn_for_sparse_column.
         size_t sorted_src_subcolumn_for_sparse_column_idx = 0;
         size_t sorted_src_subcolumn_for_sparse_column_size = 
_src_subcolumns_for_sparse.size();
diff --git 
a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h 
b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h
index 4f0c132d2cb..bc3af0ef146 100644
--- a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h
+++ b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h
@@ -58,10 +58,10 @@ namespace doris::segment_v2 {
 class SparseColumnMergeIterator : public BaseSparseColumnProcessor {
 public:
     SparseColumnMergeIterator(const TabletSchema::PathsSetInfo& path_set_info,
-                              std::unique_ptr<ColumnIterator>&& 
sparse_column_reader,
+                              SparseColumnCacheSPtr sparse_column_cache,
                               SubstreamReaderTree&& src_subcolumns_for_sparse,
-                              StorageReadOptions* opts, const TabletColumn& 
col)
-            : BaseSparseColumnProcessor(std::move(sparse_column_reader), opts, 
col),
+                              const StorageReadOptions* opts)
+            : BaseSparseColumnProcessor(std::move(sparse_column_cache), opts),
               _src_subcolumn_map(path_set_info.sub_path_set),
               _src_subcolumns_for_sparse(src_subcolumns_for_sparse) {}
     Status init(const ColumnIteratorOptions& opts) override;
@@ -74,9 +74,8 @@ public:
             return entry->data.iterator->next_batch(n, entry->data.column, 
&has_null);
         }));
         // then read sparse column
-        return _process_batch(
-                [&]() { return _sparse_column_reader->next_batch(n, 
_sparse_column, has_null); },
-                *n, dst);
+        return _process_batch([&]() { return 
_sparse_column_cache->next_batch(n, has_null); }, *n,
+                              dst);
     }
 
     // RowID-based read using template method
@@ -87,11 +86,8 @@ public:
             return entry->data.iterator->read_by_rowids(rowids, count, 
entry->data.column);
         }));
         // then read sparse column
-        return _process_batch(
-                [&]() {
-                    return _sparse_column_reader->read_by_rowids(rowids, 
count, _sparse_column);
-                },
-                count, dst);
+        return _process_batch([&]() { return 
_sparse_column_cache->read_by_rowids(rowids, count); },
+                              count, dst);
     }
 
     Status seek_to_ordinal(ordinal_t ord) override;
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp 
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
index 737ab3453fe..8e4edb74995 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
@@ -151,7 +151,7 @@ Status 
VariantColumnReader::_create_hierarchical_reader(ColumnIteratorUPtr* read
 Status VariantColumnReader::_create_sparse_merge_reader(ColumnIteratorUPtr* 
iterator,
                                                         const 
StorageReadOptions* opts,
                                                         const TabletColumn& 
target_col,
-                                                        ColumnIteratorUPtr 
inner_iter,
+                                                        SparseColumnCacheSPtr 
sparse_column_cache,
                                                         ColumnReaderCache* 
column_reader_cache) {
     // Get subcolumns path set from tablet schema
     const auto& path_set_info = 
opts->tablet_schema->path_set_info(target_col.parent_unique_id());
@@ -182,8 +182,8 @@ Status 
VariantColumnReader::_create_sparse_merge_reader(ColumnIteratorUPtr* iter
     VLOG_DEBUG << "subcolumns to merge " << src_subcolumns_for_sparse.size();
     // Create sparse column merge reader
     *iterator = std::make_unique<SparseColumnMergeIterator>(
-            path_set_info, std::move(inner_iter), 
std::move(src_subcolumns_for_sparse),
-            const_cast<StorageReadOptions*>(opts), target_col);
+            path_set_info, std::move(sparse_column_cache), 
std::move(src_subcolumns_for_sparse),
+            opts);
     return Status::OK();
 }
 
@@ -226,12 +226,29 @@ Status 
VariantColumnReader::_new_default_iter_with_same_nested(
     return Status::OK();
 }
 
-Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIteratorUPtr* 
iterator,
-                                                           const TabletColumn& 
target_col,
-                                                           const 
StorageReadOptions* opts,
-                                                           bool 
exceeded_sparse_column_limit,
-                                                           bool 
existed_in_sparse_column,
-                                                           ColumnReaderCache* 
column_reader_cache) {
+Result<SparseColumnCacheSPtr> VariantColumnReader::_get_shared_column_cache(
+        PathToSparseColumnCache* sparse_column_cache_ptr, const std::string& 
path) {
+    if (!sparse_column_cache_ptr || !sparse_column_cache_ptr->contains(path)) {
+        ColumnIteratorUPtr inner_iter;
+        
RETURN_IF_ERROR_RESULT(_sparse_column_reader->new_iterator(&inner_iter, 
nullptr));
+        vectorized::MutableColumnPtr sparse_column =
+                vectorized::ColumnVariant::create_sparse_column_fn();
+        auto sparse_column_cache = 
std::make_shared<SparseColumnCache>(std::move(inner_iter),
+                                                                       
std::move(sparse_column));
+        // if sparse_column_cache_ptr is nullptr, means the sparse column 
cache is not used
+        if (sparse_column_cache_ptr) {
+            sparse_column_cache_ptr->emplace(path, sparse_column_cache);
+        }
+        return sparse_column_cache;
+    }
+    return sparse_column_cache_ptr->at(path);
+}
+
+Status VariantColumnReader::_new_iterator_with_flat_leaves(
+        ColumnIteratorUPtr* iterator, const TabletColumn& target_col,
+        const StorageReadOptions* opts, bool exceeded_sparse_column_limit,
+        bool existed_in_sparse_column, ColumnReaderCache* column_reader_cache,
+        PathToSparseColumnCache* sparse_column_cache_ptr) {
     DCHECK(opts != nullptr);
     auto relative_path = target_col.path_info_ptr()->copy_pop_front();
     // compaction need to read flat leaves nodes data to prevent from 
amplification
@@ -240,11 +257,11 @@ Status 
VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIteratorUPtr* i
     if (!node) {
         if (relative_path.get_path() == SPARSE_COLUMN_PATH && 
_sparse_column_reader != nullptr) {
             // read sparse column and filter extracted columns in 
subcolumn_path_map
-            std::unique_ptr<ColumnIterator> inner_iter;
-            RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter, 
nullptr));
+            SparseColumnCacheSPtr sparse_column_cache = DORIS_TRY(
+                    _get_shared_column_cache(sparse_column_cache_ptr, 
SPARSE_COLUMN_PATH));
             // get subcolumns in sparse path set which will be merged into 
sparse column
-            RETURN_IF_ERROR(_create_sparse_merge_reader(
-                    iterator, opts, target_col, std::move(inner_iter), 
column_reader_cache));
+            RETURN_IF_ERROR(_create_sparse_merge_reader(iterator, opts, 
target_col,
+                                                        sparse_column_cache, 
column_reader_cache));
             return Status::OK();
         }
 
@@ -259,13 +276,11 @@ Status 
VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIteratorUPtr* i
         // even if the sparse column size is reached limit
         if (existed_in_sparse_column || exceeded_sparse_column_limit) {
             // Sparse column exists or reached sparse size limit, read sparse 
column
-            ColumnIteratorUPtr inner_iter;
-            RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter, 
nullptr));
+            SparseColumnCacheSPtr sparse_column_cache = DORIS_TRY(
+                    _get_shared_column_cache(sparse_column_cache_ptr, 
SPARSE_COLUMN_PATH));
             DCHECK(opts);
             *iterator = std::make_unique<SparseColumnExtractIterator>(
-                    relative_path.get_path(), std::move(inner_iter),
-                    // need to modify sparse_column_cache, so use const_cast 
here
-                    const_cast<StorageReadOptions*>(opts), target_col);
+                    relative_path.get_path(), std::move(sparse_column_cache), 
opts);
             return Status::OK();
         }
 
@@ -297,7 +312,8 @@ Status 
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
 Status VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
                                          const TabletColumn* target_col,
                                          const StorageReadOptions* opt,
-                                         ColumnReaderCache* 
column_reader_cache) {
+                                         ColumnReaderCache* 
column_reader_cache,
+                                         PathToSparseColumnCache* 
sparse_column_cache_ptr) {
     int32_t col_uid =
             target_col->unique_id() >= 0 ? target_col->unique_id() : 
target_col->parent_unique_id();
     // root column use unique id, leaf column use parent_unique_id
@@ -347,9 +363,9 @@ Status 
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
 
     if (need_read_flat_leaves(opt)) {
         // original path, compaction with wide schema
-        return _new_iterator_with_flat_leaves(iterator, *target_col, opt,
-                                              exceeded_sparse_column_limit,
-                                              existed_in_sparse_column, 
column_reader_cache);
+        return _new_iterator_with_flat_leaves(
+                iterator, *target_col, opt, exceeded_sparse_column_limit, 
existed_in_sparse_column,
+                column_reader_cache, sparse_column_cache_ptr);
     }
 
     // Check if path is prefix, example sparse columns path: a.b.c, a.b.e, 
access prefix: a.b.
@@ -374,13 +390,12 @@ Status 
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
         // {"b" : {"c":456}}   b.c in subcolumn
         // {"b" : 123}         b in sparse column
         // Then we should use hierarchical reader to read b
-        ColumnIteratorUPtr inner_iter;
-        RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter, 
nullptr));
+        SparseColumnCacheSPtr sparse_column_cache =
+                DORIS_TRY(_get_shared_column_cache(sparse_column_cache_ptr, 
SPARSE_COLUMN_PATH));
         DCHECK(opt);
         // Sparse column exists or reached sparse size limit, read sparse 
column
         *iterator = std::make_unique<SparseColumnExtractIterator>(
-                relative_path.get_path(), std::move(inner_iter),
-                const_cast<StorageReadOptions*>(opt), *target_col);
+                relative_path.get_path(), std::move(sparse_column_cache), opt);
         opt->stats->variant_subtree_sparse_iter_count++;
         return Status::OK();
     }
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h 
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
index 1ba16881419..8511e152dca 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
@@ -48,6 +48,120 @@ class InvertedIndexIterator;
 class InvertedIndexFileReader;
 class ColumnReaderCache;
 
+/**
+ * SparseColumnCache provides a caching layer for sparse column data access.
+ * 
+ * The "shared" aspect refers to the ability to share cached column data 
between
+ * multiple iterators or readers that access the same column 
(SPARSE_COLUMN_PATH). This reduces
+ * redundant I/O operations and memory usage when multiple consumers need the
+ * same column data.
+ * 
+ * Key features:
+ * - Caches column data after reading to avoid repeated I/O
+ * - Maintains state to track the current data validity
+ * - Supports both sequential (next_batch) and random (read_by_rowids) access 
patterns
+ * - Optimizes performance by reusing cached data when possible
+ * 
+ * The cache operates in different states:
+ * - INVALID: Cache is uninitialized
+ * - INITED: Iterator is initialized but no data cached
+ * - SEEKED_NEXT_BATCHED: Data cached from sequential read
+ * - READ_BY_ROWIDS: Data cached from random access read
+ */
+struct SparseColumnCache {
+    const ColumnIteratorUPtr sparse_column_iterator = nullptr;
+    vectorized::MutableColumnPtr sparse_column = nullptr;
+
+    enum class State : uint8_t {
+        INVALID = 0,
+        INITED = 1,
+        SEEKED_NEXT_BATCHED = 2,
+        READ_BY_ROWIDS = 3,
+    };
+    State state = State::INVALID;
+
+    ordinal_t offset = 0;              // Current offset position for 
sequential reads
+    std::unique_ptr<rowid_t[]> rowids; // Cached row IDs for random access 
reads
+    size_t length = 0;                 // Length of cached data
+
+    SparseColumnCache() = default;
+    SparseColumnCache(ColumnIteratorUPtr _column_iterator, 
vectorized::MutableColumnPtr _column)
+            : sparse_column_iterator(std::move(_column_iterator)),
+              sparse_column(std::move(_column)) {}
+
+    Status init(const ColumnIteratorOptions& opts) {
+        if (state >= State::INITED) {
+            return Status::OK();
+        }
+        reset(State::INITED);
+        return sparse_column_iterator->init(opts);
+    }
+
+    Status seek_to_ordinal(ordinal_t ord) {
+        // in the different batch, we need to reset the state to 
SEEKED_NEXT_BATCHED
+        if (state == State::SEEKED_NEXT_BATCHED && offset == ord) {
+            return Status::OK();
+        }
+        reset(State::SEEKED_NEXT_BATCHED);
+        RETURN_IF_ERROR(sparse_column_iterator->seek_to_ordinal(ord));
+        offset = ord;
+        return Status::OK();
+    }
+
+    Status next_batch(size_t* _n, bool* _has_null) {
+        // length is 0, means data is not cached, need to read from iterator
+        if (length != 0) {
+            DCHECK(state == State::SEEKED_NEXT_BATCHED);
+            *_n = length;
+            return Status::OK();
+        }
+        sparse_column->clear();
+        DCHECK(state == State::SEEKED_NEXT_BATCHED);
+        RETURN_IF_ERROR(sparse_column_iterator->next_batch(_n, sparse_column, 
_has_null));
+        length = *_n; // update length
+        return Status::OK();
+    }
+
+    Status read_by_rowids(const rowid_t* _rowids, const size_t _count) {
+        // if rowsids or count is different from cached data, need to read 
from iterator
+        // in the different batch, we need to reset the state to READ_BY_ROWIDS
+        if (is_read_by_rowids(_rowids, _count)) {
+            return Status::OK();
+        }
+        reset(State::READ_BY_ROWIDS);
+        RETURN_IF_ERROR(sparse_column_iterator->read_by_rowids(_rowids, 
_count, sparse_column));
+        length = _count;                              // update length
+        rowids = std::make_unique<rowid_t[]>(_count); // update rowids
+        std::copy(_rowids, _rowids + _count, rowids.get());
+        return Status::OK();
+    }
+
+    void reset(State _state) {
+        state = _state;
+        offset = 0;
+        length = 0;
+        sparse_column->clear();
+        rowids.reset();
+    }
+
+    bool is_read_by_rowids(const rowid_t* _rowids, const size_t _count) const {
+        if (state != State::READ_BY_ROWIDS) {
+            return false;
+        }
+        if (length != _count) {
+            return false;
+        }
+        return std::equal(_rowids, _rowids + _count, rowids.get());
+    }
+};
+
+using SparseColumnCacheSPtr = std::shared_ptr<SparseColumnCache>;
+
+// key is column path, value is the sparse column cache
+// now column path is only SPARSE_COLUMN_PATH, in the future, we can add more 
sparse column paths
+using PathToSparseColumnCache = std::unordered_map<std::string, 
SparseColumnCacheSPtr>;
+using PathToSparseColumnCacheUPtr = std::unique_ptr<PathToSparseColumnCache>;
+
 class VariantColumnReader : public ColumnReader {
 public:
     VariantColumnReader() = default;
@@ -59,7 +173,8 @@ public:
                         const StorageReadOptions* opt) override;
 
     Status new_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* col,
-                        const StorageReadOptions* opt, ColumnReaderCache* 
column_reader_cache);
+                        const StorageReadOptions* opt, ColumnReaderCache* 
column_reader_cache,
+                        PathToSparseColumnCache* sparse_column_cache_ptr = 
nullptr);
 
     virtual const SubcolumnColumnMetaInfo::Node* get_subcolumn_meta_by_path(
             const vectorized::PathInData& relative_path) const;
@@ -96,11 +211,11 @@ private:
     Status _new_default_iter_with_same_nested(ColumnIteratorUPtr* iterator, 
const TabletColumn& col,
                                               const StorageReadOptions* opt,
                                               ColumnReaderCache* 
column_reader_cache);
-    Status _new_iterator_with_flat_leaves(ColumnIteratorUPtr* iterator, const 
TabletColumn& col,
-                                          const StorageReadOptions* opts,
-                                          bool exceeded_sparse_column_limit,
-                                          bool existed_in_sparse_column,
-                                          ColumnReaderCache* 
column_reader_cache);
+    Status _new_iterator_with_flat_leaves(
+            ColumnIteratorUPtr* iterator, const TabletColumn& col, const 
StorageReadOptions* opts,
+            bool exceeded_sparse_column_limit, bool existed_in_sparse_column,
+            ColumnReaderCache* column_reader_cache,
+            PathToSparseColumnCache* sparse_column_cache_ptr = nullptr);
 
     Status _create_hierarchical_reader(ColumnIteratorUPtr* reader, int32_t 
col_uid,
                                        vectorized::PathInData path,
@@ -110,8 +225,11 @@ private:
                                        OlapReaderStatistics* stats);
     Status _create_sparse_merge_reader(ColumnIteratorUPtr* iterator, const 
StorageReadOptions* opts,
                                        const TabletColumn& target_col,
-                                       ColumnIteratorUPtr inner_iter,
+                                       SparseColumnCacheSPtr 
sparse_column_cache,
                                        ColumnReaderCache* column_reader_cache);
+
+    Result<SparseColumnCacheSPtr> _get_shared_column_cache(
+            PathToSparseColumnCache* sparse_column_cache_ptr, const 
std::string& path);
     std::unique_ptr<SubcolumnColumnMetaInfo> _subcolumns_meta_info;
     std::shared_ptr<ColumnReader> _sparse_column_reader;
     std::shared_ptr<ColumnReader> _root_column_reader;
diff --git 
a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp 
b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
index 420202b20dd..063709aff31 100644
--- a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
+++ b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
@@ -398,6 +398,9 @@ TEST_F(VariantColumnWriterReaderTest, 
test_write_data_normal) {
     };
 
     // 10. check sparse extract reader
+    PathToSparseColumnCacheUPtr sparse_column_cache =
+            std::make_unique<std::unordered_map<std::string, 
SparseColumnCacheSPtr>>();
+    stats.bytes_read = 0;
     for (int i = 3; i < 10; ++i) {
         std::string key = ".key" + std::to_string(i);
         TabletColumn subcolumn_in_sparse;
@@ -411,26 +414,16 @@ TEST_F(VariantColumnWriterReaderTest, 
test_write_data_normal) {
 
         ColumnIteratorUPtr it;
         st = variant_column_reader->new_iterator(&it, &subcolumn_in_sparse, 
&storage_read_opts,
-                                                 &column_reader_cache);
+                                                 &column_reader_cache, 
sparse_column_cache.get());
         EXPECT_TRUE(st.ok()) << st.msg();
         EXPECT_TRUE(assert_cast<SparseColumnExtractIterator*>(it.get()) != 
nullptr);
         st = it->init(column_iter_opts);
         EXPECT_TRUE(st.ok()) << st.msg();
 
+        int64_t before_bytes_read = stats.bytes_read;
         read_to_column_object(it);
-        {
-            // read with opt
-            auto* iter = assert_cast<SparseColumnExtractIterator*>(it.get());
-            StorageReadOptions storage_read_opts1;
-            storage_read_opts1.stats = &stats;
-            storage_read_opts1.io_ctx.reader_type = ReaderType::READER_QUERY;
-            iter->_read_opts = &storage_read_opts1;
-            st = iter->next_batch(&nrows, new_column_object, nullptr);
-            EXPECT_TRUE(st.ok()) << st.msg();
-            EXPECT_TRUE(stats.bytes_read > 0);
-            iter->_read_opts->io_ctx.reader_type = 
ReaderType::READER_BASE_COMPACTION;
-            st = iter->next_batch(&nrows, new_column_object, nullptr);
-            EXPECT_TRUE(st.ok()) << st.msg();
+        if (before_bytes_read != 0) {
+            EXPECT_EQ(stats.bytes_read, before_bytes_read);
         }
 
         for (int row = 0; row < 1000; ++row) {
@@ -676,6 +669,7 @@ TEST_F(VariantColumnWriterReaderTest, 
test_write_data_normal) {
                                              &column_reader_cache);
     EXPECT_TRUE(st.ok()) << st.msg();
     EXPECT_TRUE(assert_cast<SparseColumnExtractIterator*>(it5.get()) != 
nullptr);
+    EXPECT_TRUE(it5->init(column_iter_opts).ok());
 
     {
         // test SparseColumnExtractIterator seek_to_first
@@ -701,6 +695,8 @@ TEST_F(VariantColumnWriterReaderTest, 
test_write_data_normal) {
         MutableColumnPtr sparse_dst3 = ColumnVariant::create(3);
         size_t rs = 1000;
         bool has_null = false;
+        st = iter->seek_to_ordinal(0);
+        EXPECT_TRUE(st.ok()) << st.msg();
         st = iter->next_batch(&rs, sparse_dst3, &has_null);
         EXPECT_TRUE(st.ok()) << st.msg();
         EXPECT_TRUE(sparse_dst3->size() == row_ids1.size());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to