This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch cs_opt_version-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/cs_opt_version-3.1 by this
push:
new 518afdc58fe refactor variant type and iterator logic (#58591)
518afdc58fe is described below
commit 518afdc58fe88bd93da9c423d094b6464758c2fe
Author: lihangyu <[email protected]>
AuthorDate: Tue Dec 2 05:50:31 2025 +0800
refactor variant type and iterator logic (#58591)
---
be/src/olap/rowset/rowset_meta.cpp | 3 +-
be/src/olap/rowset/rowset_meta.h | 5 +-
be/src/olap/rowset/segment_v2/segment.cpp | 20 +-
be/src/olap/rowset/segment_v2/segment.h | 20 +-
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 16 +-
be/src/olap/rowset/segment_v2/segment_iterator.h | 2 +-
be/src/olap/rowset/segment_v2/stream_reader.h | 1 +
.../segment_v2/variant/variant_column_reader.cpp | 291 ++++++++++++---------
.../segment_v2/variant/variant_column_reader.h | 59 ++++-
be/src/olap/tablet_schema.cpp | 5 +-
.../rowset/segment_v2/column_reader_cache_test.cpp | 9 +-
.../variant_column_writer_reader_test.cpp | 21 +-
be/test/vec/common/schema_util_rowset_test.cpp | 3 +-
13 files changed, 277 insertions(+), 178 deletions(-)
diff --git a/be/src/olap/rowset/rowset_meta.cpp
b/be/src/olap/rowset/rowset_meta.cpp
index a1ecd6643e1..6f7f3bc9fe4 100644
--- a/be/src/olap/rowset/rowset_meta.cpp
+++ b/be/src/olap/rowset/rowset_meta.cpp
@@ -296,7 +296,8 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other)
{
set_total_disk_size(data_disk_size() + index_disk_size());
set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() ||
other.is_segments_key_bounds_truncated());
- if (_rowset_meta_pb.num_segment_rows_size() > 0 &&
other._rowset_meta_pb.num_segment_rows_size() > 0) {
+ if (_rowset_meta_pb.num_segment_rows_size() > 0 &&
+ other._rowset_meta_pb.num_segment_rows_size() > 0) {
for (auto row_count : other._rowset_meta_pb.num_segment_rows()) {
_rowset_meta_pb.add_num_segment_rows(row_count);
}
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 10a889d32ef..4fda6d0d220 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -141,12 +141,13 @@ public:
void set_num_rows(int64_t num_rows) {
_rowset_meta_pb.set_num_rows(num_rows); }
void set_num_segment_rows(const std::vector<uint32_t>& num_segment_rows) {
-
_rowset_meta_pb.mutable_num_segment_rows()->Assign(num_segment_rows.cbegin(),
num_segment_rows.cend());
+
_rowset_meta_pb.mutable_num_segment_rows()->Assign(num_segment_rows.cbegin(),
+
num_segment_rows.cend());
}
void get_num_segment_rows(std::vector<uint32_t>* num_segment_rows) const {
num_segment_rows->assign(_rowset_meta_pb.num_segment_rows().cbegin(),
- _rowset_meta_pb.num_segment_rows().cend());
+ _rowset_meta_pb.num_segment_rows().cend());
}
auto& get_num_segment_rows() const { return
_rowset_meta_pb.num_segment_rows(); }
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index d13aa16cb34..4f2c5d84e15 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -252,7 +252,7 @@ Status Segment::new_iterator(SchemaSPtr schema, const
StorageReadOptions& read_o
if (read_options.col_id_to_predicates.contains(column_id) &&
can_apply_predicate_safely(column_id,
read_options.col_id_to_predicates.at(column_id).get(),
- *schema,
read_options.io_ctx.reader_type) &&
+ *schema, read_options) &&
!reader->match_condition(entry.second.get())) {
// any condition not satisfied, return.
*iter = std::make_unique<EmptySegmentIterator>(*schema);
@@ -280,7 +280,7 @@ Status Segment::new_iterator(SchemaSPtr schema, const
StorageReadOptions& read_o
RETURN_IF_ERROR(st);
if (reader &&
can_apply_predicate_safely(runtime_predicate->column_id(),
runtime_predicate.get(),
- *schema,
read_options.io_ctx.reader_type) &&
+ *schema, read_options) &&
!reader->match_condition(&and_predicate)) {
// any condition not satisfied, return.
*iter = std::make_unique<EmptySegmentIterator>(*schema);
@@ -579,7 +579,7 @@ Status Segment::healthy_status() {
// Return the storage datatype of related column to field.
vectorized::DataTypePtr Segment::get_data_type_of(const TabletColumn& column,
- bool read_flat_leaves) {
+ const StorageReadOptions&
read_options) {
const vectorized::PathInDataPtr path = column.path_info_ptr();
// none variant column
@@ -603,10 +603,14 @@ vectorized::DataTypePtr Segment::get_data_type_of(const
TabletColumn& column,
// If status is not ok, it will throw exception(data corruption)
THROW_IF_ERROR(get_column_reader(unique_id, &v_reader, &stats));
DCHECK(v_reader != nullptr);
- const auto* variant_reader = static_cast<const
VariantColumnReader*>(v_reader.get());
- // Delegate type inference for variant paths to VariantColumnReader.
- return variant_reader->infer_data_type_for_path(column, relative_path,
read_flat_leaves,
-
_column_reader_cache.get(), unique_id);
+ auto* variant_reader = static_cast<VariantColumnReader*>(v_reader.get());
+ // Delegate type inference for variant paths to VariantColumnReader. The
decision logic
+ // is shared with VariantColumnReader::new_iterator via an internal read
plan so that
+ // iterator construction and storage type inference stay consistent.
+ vectorized::DataTypePtr type;
+ THROW_IF_ERROR(variant_reader->infer_data_type_for_path(&type, column,
read_options,
+
_column_reader_cache.get()));
+ return type;
}
Status Segment::_create_column_meta_once(OlapReaderStatistics* stats) {
@@ -929,7 +933,7 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema&
schema, SlotDescripto
// if segment cache miss, column reader will be created to make sure
the variant column result not coredump
RETURN_IF_ERROR(_create_column_meta_once(&stats));
- auto storage_type = get_data_type_of(column, false);
+ auto storage_type = get_data_type_of(column, storage_read_opt);
vectorized::MutableColumnPtr file_storage_column =
storage_type->create_column();
DCHECK(storage_type != nullptr);
diff --git a/be/src/olap/rowset/segment_v2/segment.h
b/be/src/olap/rowset/segment_v2/segment.h
index c2616dcd809..00c183684dd 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -180,20 +180,20 @@ public:
// another method `get_metadata_size` not include the column reader, only
the segment object itself.
int64_t meta_mem_usage() const { return _meta_mem_usage; }
- // Get the inner file column's data type
- // ignore_chidren set to false will treat field as variant
- // when it contains children with field paths.
- // nullptr will returned if storage type does not contains such column
- std::shared_ptr<const vectorized::IDataType> get_data_type_of(const
TabletColumn& column,
- bool
read_flat_leaves);
-
- // If column in segment is the same type in schema, then it is safe to
apply predicate
+ // Get the inner file column's data type.
+ // When `read_options` is provided, the decision (e.g. flat-leaf vs
hierarchical) can depend
+ // on the reader type and tablet schema. nullptr will be returned if
storage type does not
+ // contain such column.
+ std::shared_ptr<const vectorized::IDataType> get_data_type_of(
+ const TabletColumn& column, const StorageReadOptions&
read_options);
+
+ // If column in segment is the same type in schema, then it is safe to
apply predicate.
template <typename Predicate>
bool can_apply_predicate_safely(int cid, Predicate* pred, const Schema&
schema,
- ReaderType read_type) {
+ const StorageReadOptions& read_options) {
const doris::Field* col = schema.column(cid);
vectorized::DataTypePtr storage_column_type =
- get_data_type_of(col->get_desc(), read_type !=
ReaderType::READER_QUERY);
+ get_data_type_of(col->get_desc(), read_options);
if (storage_column_type == nullptr) {
// Default column iterator
return true;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 1c720b5948d..5c5dd293916 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -291,7 +291,7 @@ Status SegmentIterator::_init_impl(const
StorageReadOptions& opts) {
for (const auto& predicate : opts.column_predicates) {
if (!_segment->can_apply_predicate_safely(predicate->column_id(),
predicate, *_schema,
- _opts.io_ctx.reader_type)) {
+ _opts)) {
continue;
}
_col_predicates.emplace_back(predicate);
@@ -315,8 +315,7 @@ Status SegmentIterator::_init_impl(const
StorageReadOptions& opts) {
for (int i = 0; i < _schema->columns().size(); ++i) {
const Field* col = _schema->column(i);
if (col) {
- auto storage_type = _segment->get_data_type_of(
- col->get_desc(), _opts.io_ctx.reader_type !=
ReaderType::READER_QUERY);
+ auto storage_type = _segment->get_data_type_of(col->get_desc(),
_opts);
if (storage_type == nullptr) {
storage_type =
vectorized::DataTypeFactory::instance().create_data_type(*col);
}
@@ -593,7 +592,7 @@ Status
SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
for (auto& cid : cids) {
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
if (!_segment->can_apply_predicate_safely(cid,
_opts.col_id_to_predicates.at(cid).get(),
- *_schema,
_opts.io_ctx.reader_type)) {
+ *_schema, _opts)) {
continue;
}
// get row ranges by bf index of this column,
@@ -622,7 +621,7 @@ Status
SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
for (const auto& cid : cids) {
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
if (!_segment->can_apply_predicate_safely(cid,
_opts.col_id_to_predicates.at(cid).get(),
- *_schema,
_opts.io_ctx.reader_type)) {
+ *_schema, _opts)) {
continue;
}
// do not check zonemap if predicate does not support zonemap
@@ -655,7 +654,7 @@ Status
SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
_opts.topn_filter_target_node_id);
if
(_segment->can_apply_predicate_safely(runtime_predicate->column_id(),
runtime_predicate.get(), *_schema,
-
_opts.io_ctx.reader_type)) {
+ _opts)) {
AndBlockColumnPredicate and_predicate;
and_predicate.add_column_predicate(
SingleColumnBlockPredicate::create_unique(runtime_predicate.get()));
@@ -686,9 +685,8 @@ Status
SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
if (_opts.io_ctx.reader_type == ReaderType::READER_QUERY) {
RowRanges dict_row_ranges = RowRanges::create_single(num_rows());
for (auto cid : cids) {
- if (!_segment->can_apply_predicate_safely(cid,
-
_opts.col_id_to_predicates.at(cid).get(),
- *_schema,
_opts.io_ctx.reader_type)) {
+ if (!_segment->can_apply_predicate_safely(
+ cid, _opts.col_id_to_predicates.at(cid).get(),
*_schema, _opts)) {
continue;
}
RowRanges tmp_row_ranges =
RowRanges::create_single(num_rows());
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 7f0d7913335..fc9df6ca2ad 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -263,7 +263,7 @@ private:
continue;
}
vectorized::DataTypePtr storage_type =
-
_segment->get_data_type_of(_schema->column(cid)->get_desc(), false);
+
_segment->get_data_type_of(_schema->column(cid)->get_desc(), _opts);
if (storage_type &&
!storage_type->equals(*block->get_by_position(block_cid).type)) {
// Do additional cast
vectorized::MutableColumnPtr tmp =
storage_type->create_column();
diff --git a/be/src/olap/rowset/segment_v2/stream_reader.h
b/be/src/olap/rowset/segment_v2/stream_reader.h
index b009df3b20e..43f95e29306 100644
--- a/be/src/olap/rowset/segment_v2/stream_reader.h
+++ b/be/src/olap/rowset/segment_v2/stream_reader.h
@@ -18,6 +18,7 @@
#pragma once
#include <memory>
+
#include "olap/rowset/segment_v2/column_reader.h"
#include "vec/columns/column.h"
#include "vec/columns/subcolumn_tree.h"
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
index d38533fe0ed..e872b2b9a64 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp
@@ -233,20 +233,38 @@ Status
VariantColumnReader::_new_default_iter_with_same_nested(
return Status::OK();
}
-Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIteratorUPtr*
iterator,
- const TabletColumn&
target_col,
- const
StorageReadOptions* opts,
- bool
exceeded_sparse_column_limit,
- bool
existed_in_sparse_column,
- ColumnReaderCache*
column_reader_cache) {
+// Helper to build a logical VARIANT type for the given column (nullable or
not).
+static vectorized::DataTypePtr create_variant_type(const TabletColumn& column)
{
+ // Keep using DataTypeObject here (rather than newer DataTypeVariant) since
+ // the latter may not exist on older branches. This matches the existing
+ // behaviour of VariantColumnReader::infer_data_type_for_path.
+ if (column.is_nullable()) {
+ return
vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>(
+ column.variant_max_subcolumns_count()));
+ }
+ return
std::make_shared<vectorized::DataTypeObject>(column.variant_max_subcolumns_count());
+}
+
+Status VariantColumnReader::_build_read_plan_flat_leaves(ReadPlan* plan,
+ const TabletColumn&
target_col,
+ const
StorageReadOptions* opts,
+ ColumnReaderCache*
column_reader_cache) {
// make sure external meta is loaded otherwise can't find any meta data
for extracted columns
RETURN_IF_ERROR(load_external_meta_once());
DCHECK(opts != nullptr);
auto relative_path = target_col.path_info_ptr()->copy_pop_front();
- // compaction need to read flat leaves nodes data to prevent from
amplification
- const auto* node =
+ plan->relative_path = relative_path;
+ plan->node =
target_col.has_path_info() ?
_subcolumns_meta_info->find_leaf(relative_path) : nullptr;
+ plan->root = _subcolumns_meta_info->get_root();
+
+ bool existed_in_sparse_column =
+ !_statistics->sparse_column_non_null_size.empty() &&
+
_statistics->sparse_column_non_null_size.contains(relative_path.get_path());
+ bool exceeded_sparse_column_limit = is_exceeded_sparse_column_limit();
+
+ const auto* node = plan->node;
if (!node) {
if (relative_path.get_path() == SPARSE_COLUMN_PATH) {
if (_sparse_column_reader == nullptr) {
@@ -255,50 +273,46 @@ Status
VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIteratorUPtr* i
target_col.path_info_ptr()->get_path());
}
// read sparse column and filter extracted columns in
subcolumn_path_map
- std::unique_ptr<ColumnIterator> inner_iter;
- RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter,
nullptr));
- // get subcolumns in sparse path set which will be merged into
sparse column
- RETURN_IF_ERROR(_create_sparse_merge_reader(
- iterator, opts, target_col, std::move(inner_iter),
column_reader_cache));
+ plan->kind = ReadKind::SPARSE_MERGE;
+ plan->type =
vectorized::DataTypeFactory::instance().create_data_type(target_col);
return Status::OK();
}
if (target_col.is_nested_subcolumn()) {
// using the sibling of the nested column to fill the target
nested column
- RETURN_IF_ERROR(_new_default_iter_with_same_nested(iterator,
target_col, opts,
-
column_reader_cache));
+ plan->kind = ReadKind::DEFAULT_NESTED;
+ plan->type =
vectorized::DataTypeFactory::instance().create_data_type(target_col);
return Status::OK();
}
- // If the path is typed, it means the path is not a sparse column, so
we can't read the sparse column
- // even if the sparse column size is reached limit
+ // If the path is typed, it means the path is not a sparse column, so
we can't read the
+ // sparse column even if the sparse column size is reached limit.
if (existed_in_sparse_column || exceeded_sparse_column_limit) {
- // Sparse column exists or reached sparse size limit, read sparse
column
- ColumnIteratorUPtr inner_iter;
- RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter,
nullptr));
- DCHECK(opts);
- *iterator = std::make_unique<SparseColumnExtractIterator>(
- relative_path.get_path(), std::move(inner_iter),
- // need to modify sparse_column_cache, so use const_cast
here
- const_cast<StorageReadOptions*>(opts), target_col);
+ // Sparse column exists or reached sparse size limit, read sparse
column.
+ plan->kind = ReadKind::SPARSE_EXTRACT;
+ plan->type = create_variant_type(target_col);
return Status::OK();
}
VLOG_DEBUG << "new_default_iter: " <<
target_col.path_info_ptr()->get_path();
- RETURN_IF_ERROR(Segment::new_default_iterator(target_col, iterator));
+ plan->kind = ReadKind::DEFAULT_FILL;
+ plan->type =
vectorized::DataTypeFactory::instance().create_data_type(target_col);
return Status::OK();
}
+
if (relative_path.empty()) {
// root path, use VariantRootColumnIterator
- *iterator = std::make_unique<VariantRootColumnIterator>(
- std::make_unique<FileColumnIterator>(_root_column_reader));
+ plan->kind = ReadKind::ROOT_FLAT;
+ plan->type =
vectorized::DataTypeFactory::instance().create_data_type(target_col);
return Status::OK();
}
- VLOG_DEBUG << "new iterator: " << target_col.path_info_ptr()->get_path();
std::shared_ptr<ColumnReader> column_reader;
RETURN_IF_ERROR(column_reader_cache->get_path_column_reader(
target_col.parent_unique_id(), node->path, &column_reader,
opts->stats, node));
- RETURN_IF_ERROR(column_reader->new_iterator(iterator, nullptr));
+ VLOG_DEBUG << "new iterator: " << target_col.path_info_ptr()->get_path();
+ plan->kind = ReadKind::LEAF;
+ plan->type = node->data.file_column_type;
+ plan->leaf_column_reader = std::move(column_reader);
return Status::OK();
}
@@ -350,14 +364,21 @@ Status
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
const TabletColumn* target_col,
const StorageReadOptions* opt,
ColumnReaderCache*
column_reader_cache) {
- int32_t col_uid =
- target_col->unique_id() >= 0 ? target_col->unique_id() :
target_col->parent_unique_id();
+ ReadPlan plan;
+ RETURN_IF_ERROR(_build_read_plan(&plan, *target_col, opt,
column_reader_cache));
+ return _create_iterator_from_plan(iterator, plan, *target_col, opt,
column_reader_cache);
+}
+
+Status VariantColumnReader::_build_read_plan(ReadPlan* plan, const
TabletColumn& target_col,
+ const StorageReadOptions* opt,
+ ColumnReaderCache*
column_reader_cache) {
// root column use unique id, leaf column use parent_unique_id
- auto relative_path = target_col->path_info_ptr()->copy_pop_front();
+ int32_t col_uid =
+ target_col.unique_id() >= 0 ? target_col.unique_id() :
target_col.parent_unique_id();
+ auto relative_path = target_col.path_info_ptr()->copy_pop_front();
const auto* root = _subcolumns_meta_info->get_root();
- const auto* node = target_col->has_path_info()
- ?
_subcolumns_meta_info->find_exact(relative_path)
- : nullptr;
+ const auto* node =
+ target_col.has_path_info() ?
_subcolumns_meta_info->find_exact(relative_path) : nullptr;
// try rebuild path with hierarchical
// example path(['a.b']) -> path(['a', 'b'])
@@ -376,8 +397,9 @@ Status
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
bool exceeded_sparse_column_limit = is_exceeded_sparse_column_limit();
// If the variant column has extracted columns and is a compaction reader,
then read flat leaves
- // Otherwise read hierarchical data, since the variant subcolumns are
flattened in schema_util::get_compaction_schema
- // For checksum reader, we need to read flat leaves to get the correct
data if has extracted columns
+ // Otherwise read hierarchical data, since the variant subcolumns are
flattened in
+ // schema_util::get_compaction_schema. For checksum reader, we need to
read flat leaves to
+ // get the correct data if has extracted columns.
auto need_read_flat_leaves = [](const StorageReadOptions* opts) {
return opts != nullptr && opts->tablet_schema != nullptr &&
std::ranges::any_of(
@@ -387,20 +409,27 @@ Status
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
opts->io_ctx.reader_type == ReaderType::READER_CHECKSUM);
};
+ plan->relative_path = relative_path;
+ plan->node = node;
+ plan->root = root;
+
if (need_read_flat_leaves(opt)) {
// original path, compaction with wide schema
- return _new_iterator_with_flat_leaves(iterator, *target_col, opt,
- exceeded_sparse_column_limit,
- existed_in_sparse_column,
column_reader_cache);
+ return _build_read_plan_flat_leaves(plan, target_col, opt,
column_reader_cache);
}
// Check if path is prefix, example sparse columns path: a.b.c, a.b.e,
access prefix: a.b.
// Or access root path
if (has_prefix_path(relative_path)) {
// Example {"b" : {"c":456,"e":7.111}}
- // b.c is sparse column, b.e is subcolumn, so b is both the prefix of
sparse column and subcolumn
- return _create_hierarchical_reader(iterator, col_uid, relative_path,
node, root,
- column_reader_cache, opt->stats);
+ // b.c is sparse column, b.e is subcolumn, so b is both the prefix of
sparse column and
+ // subcolumn
+ plan->kind = ReadKind::HIERARCHICAL;
+ // For hierarchical reads we always expose the logical VARIANT type
instead of trying to
+ // construct a complex ARRAY/MAP/STRUCT from a synthetic TabletColumn
descriptor (which may
+ // not have complete subtype information, e.g. in tests).
+ plan->type = create_variant_type(target_col);
+ return Status::OK();
}
// if path exists in sparse column, read sparse column with extract reader
@@ -409,12 +438,8 @@ Status
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
// {"b" : {"c":456}} b.c in subcolumn
// {"b" : 123} b in sparse column
// Then we should use hierarchical reader to read b
- ColumnIteratorUPtr inner_iter;
- RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter,
nullptr));
- DCHECK(opt);
- // Sparse column exists or reached sparse size limit, read sparse
column
- *iterator = std::make_unique<SparseColumnExtractIterator>(
- relative_path.get_path(), std::move(inner_iter), nullptr,
*target_col);
+ plan->kind = ReadKind::SPARSE_EXTRACT;
+ plan->type = create_variant_type(target_col);
return Status::OK();
}
@@ -427,7 +452,10 @@ Status
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
std::shared_ptr<ColumnReader> leaf_column_reader;
RETURN_IF_ERROR(column_reader_cache->get_path_column_reader(
col_uid, leaf_node->path, &leaf_column_reader, opt->stats,
leaf_node));
- RETURN_IF_ERROR(leaf_column_reader->new_iterator(iterator, nullptr));
+ plan->kind = ReadKind::LEAF;
+ plan->type = leaf_column_reader->get_vec_data_type();
+ plan->relative_path = relative_path;
+ plan->leaf_column_reader = std::move(leaf_column_reader);
} else {
if (_ext_meta_reader && _ext_meta_reader->available()) {
// Get path reader from external meta
@@ -437,7 +465,10 @@ Status
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
DCHECK(!has_prefix_path(relative_path));
if (st.ok()) {
// Try external meta fallback: build a leaf reader on demand
from externalized meta
- RETURN_IF_ERROR(leaf_column_reader->new_iterator(iterator,
nullptr));
+ plan->kind = ReadKind::LEAF;
+ plan->type = leaf_column_reader->get_vec_data_type();
+ plan->relative_path = relative_path;
+ plan->leaf_column_reader = std::move(leaf_column_reader);
return Status::OK();
}
if (!st.is<ErrorCode::NOT_FOUND>()) {
@@ -447,15 +478,87 @@ Status
VariantColumnReader::new_iterator(ColumnIteratorUPtr* iterator,
}
if (exceeded_sparse_column_limit) {
// maybe exist prefix path in sparse column
- return _create_hierarchical_reader(iterator, col_uid,
relative_path, node, root,
- column_reader_cache,
opt->stats);
+ plan->kind = ReadKind::HIERARCHICAL;
+ plan->type = create_variant_type(target_col);
+ plan->relative_path = relative_path;
+ plan->node = node;
+ plan->root = root;
+ return Status::OK();
}
- // Sparse column not exists and not reached stats limit, then the
target path is not exist, get a default iterator
- RETURN_IF_ERROR(Segment::new_default_iterator(*target_col, iterator));
+ // Sparse column not exists and not reached stats limit, then the
target path is not
+ // exist, get a default iterator
+ plan->kind = ReadKind::DEFAULT_FILL;
+ plan->type =
vectorized::DataTypeFactory::instance().create_data_type(target_col);
+ plan->relative_path = relative_path;
}
return Status::OK();
}
+Status VariantColumnReader::_create_iterator_from_plan(ColumnIteratorUPtr*
iterator,
+ const ReadPlan& plan,
+ const TabletColumn&
target_col,
+ const
StorageReadOptions* opt,
+ ColumnReaderCache*
column_reader_cache) {
+ switch (plan.kind) {
+ case ReadKind::ROOT_FLAT: {
+ *iterator = std::make_unique<VariantRootColumnIterator>(
+ std::make_unique<FileColumnIterator>(_root_column_reader));
+ return Status::OK();
+ }
+ case ReadKind::HIERARCHICAL: {
+ int32_t col_uid = target_col.unique_id() >= 0 ? target_col.unique_id()
+ :
target_col.parent_unique_id();
+ RETURN_IF_ERROR(_create_hierarchical_reader(iterator, col_uid,
plan.relative_path,
+ plan.node, plan.root,
column_reader_cache,
+ opt ? opt->stats :
nullptr));
+ return Status::OK();
+ }
+ case ReadKind::LEAF: {
+ DCHECK(plan.leaf_column_reader != nullptr);
+ RETURN_IF_ERROR(plan.leaf_column_reader->new_iterator(iterator,
nullptr));
+ return Status::OK();
+ }
+ case ReadKind::SPARSE_EXTRACT: {
+ DCHECK(_sparse_column_reader != nullptr);
+ ColumnIteratorUPtr inner_iter;
+ RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter,
nullptr));
+ *iterator = std::make_unique<SparseColumnExtractIterator>(
+ plan.relative_path.get_path(), std::move(inner_iter),
+ // need to modify sparse_column_cache, so use const_cast here
+ opt ? const_cast<StorageReadOptions*>(opt) : nullptr,
target_col);
+ return Status::OK();
+ }
+ case ReadKind::SPARSE_MERGE: {
+ DCHECK(_sparse_column_reader != nullptr);
+ std::unique_ptr<ColumnIterator> inner_iter;
+ RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter,
nullptr));
+ RETURN_IF_ERROR(_create_sparse_merge_reader(iterator, opt, target_col,
+ std::move(inner_iter),
column_reader_cache));
+ return Status::OK();
+ }
+ case ReadKind::DEFAULT_NESTED: {
+ RETURN_IF_ERROR(
+ _new_default_iter_with_same_nested(iterator, target_col, opt,
column_reader_cache));
+ return Status::OK();
+ }
+ case ReadKind::DEFAULT_FILL: {
+ RETURN_IF_ERROR(Segment::new_default_iterator(target_col, iterator));
+ return Status::OK();
+ }
+ }
+ return Status::InternalError("Unknown ReadKind for VariantColumnReader");
+}
+
+Status VariantColumnReader::_new_iterator_with_flat_leaves(
+ ColumnIteratorUPtr* iterator, vectorized::DataTypePtr* type, const
TabletColumn& target_col,
+ const StorageReadOptions* opts, bool /*exceeded_sparse_column_limit*/,
+ bool /*existed_in_sparse_column*/, ColumnReaderCache*
column_reader_cache) {
+ ReadPlan plan;
+ RETURN_IF_ERROR(_build_read_plan_flat_leaves(&plan, target_col, opts,
column_reader_cache));
+ *type = plan.type;
+ return _create_iterator_from_plan(iterator, plan, target_col, opts,
column_reader_cache);
+}
+
Status VariantColumnReader::init(const ColumnReaderOptions& opts,
ColumnMetaAccessor* accessor,
const std::shared_ptr<SegmentFooterPB>&
footer, int32_t column_uid,
uint64_t num_rows, io::FileReaderSPtr
file_reader) {
@@ -601,7 +704,7 @@ Status VariantColumnReader::init(const ColumnReaderOptions&
opts, ColumnMetaAcce
RETURN_IF_ERROR(_ext_meta_reader->init_from_footer(footer, file_reader,
_root_unique_id));
return Status::OK();
}
-
+
Status VariantColumnReader::create_reader_from_external_meta(const
std::string& path,
const
ColumnReaderOptions& opts,
const
io::FileReaderSPtr& file_reader,
@@ -664,7 +767,8 @@ TabletIndexes
VariantColumnReader::find_subcolumn_tablet_indexes(
// if parent column has index, add index to _variant_subcolumns_indexes
else if (!parent_index.empty() &&
data_type->get_storage_field_type() !=
FieldType::OLAP_FIELD_TYPE_VARIANT &&
- data_type->get_storage_field_type() !=
FieldType::OLAP_FIELD_TYPE_MAP /*SPARSE COLUMN*/) {
+ data_type->get_storage_field_type() !=
+ FieldType::OLAP_FIELD_TYPE_MAP /*SPARSE COLUMN*/) {
// type in column maynot be real type, so use data_type to get the
real type
TabletColumn target_column =
vectorized::schema_util::get_column_by_type(
data_type, column.name(),
@@ -705,68 +809,15 @@ void VariantColumnReader::get_nested_paths(
}
}
-vectorized::DataTypePtr VariantColumnReader::infer_data_type_for_path(
- const TabletColumn& column, const vectorized::PathInData&
relative_path,
- bool read_flat_leaves, ColumnReaderCache* cache, int32_t col_uid)
const {
- // english only in comments
- // Locate the subcolumn node by path.
- const auto* node = get_subcolumn_meta_by_path(relative_path);
-
- // If path explicitly refers to sparse column internal path, fall back to
schema type.
- if (relative_path.get_path().find(SPARSE_COLUMN_PATH) !=
std::string::npos) {
- return
vectorized::DataTypeFactory::instance().create_data_type(column);
- }
-
- // Use variant type when the path is a prefix of any existing subcolumn
path.
- if (has_prefix_path(relative_path)) {
- return
vectorized::DataTypeFactory::instance().create_data_type(column);
- }
-
- // Try to reuse an existing leaf ColumnReader to infer its vectorized data
type.
- if (cache != nullptr) {
- std::shared_ptr<ColumnReader> leaf_reader;
- if (cache->get_path_column_reader(col_uid, relative_path,
&leaf_reader, nullptr).ok() &&
- leaf_reader != nullptr) {
- return leaf_reader->get_vec_data_type();
- }
- }
-
- // Node not found for the given path within the variant subcolumns.
- if (node == nullptr) {
- // Nested subcolumn is not present in sparse column: keep schema type.
- if (column.is_nested_subcolumn()) {
- return
vectorized::DataTypeFactory::instance().create_data_type(column);
- }
-
- // When the path is in the sparse column or exceeded the limit, return
variant type.
- if (exist_in_sparse_column(relative_path) ||
is_exceeded_sparse_column_limit()) {
- return column.is_nullable() ? vectorized::make_nullable(
-
std::make_shared<vectorized::DataTypeObject>(
-
column.variant_max_subcolumns_count()))
- :
std::make_shared<vectorized::DataTypeObject>(
-
column.variant_max_subcolumns_count());
- }
- // Path is not present in this segment: use default schema type.
- return
vectorized::DataTypeFactory::instance().create_data_type(column);
- }
-
- // For nested subcolumns, always use the physical file_column_type
recorded in meta.
- if (column.is_nested_subcolumn()) {
- return node->data.file_column_type;
- }
-
- const bool exist_in_sparse = exist_in_sparse_column(relative_path);
-
- // Condition to return the specific underlying type of the node:
- // 1. We are reading flat leaves (ignoring hierarchy).
- // 2. OR the path does not exist in sparse column and sparse column limit
is not exceeded
- // (meaning it is a pure materialized leaf).
- if (read_flat_leaves || (!exist_in_sparse &&
!is_exceeded_sparse_column_limit())) {
- return node->data.file_column_type;
- }
-
- // For non-compaction reads where we still treat this as logical VARIANT.
- return vectorized::DataTypeFactory::instance().create_data_type(column);
+Status VariantColumnReader::infer_data_type_for_path(vectorized::DataTypePtr*
type,
+ const TabletColumn&
column,
+ const StorageReadOptions&
opts,
+ ColumnReaderCache*
column_reader_cache) {
+ DCHECK(column.has_path_info());
+ ReadPlan plan;
+ RETURN_IF_ERROR(_build_read_plan(&plan, column, &opts,
column_reader_cache));
+ *type = plan.type;
+ return Status::OK();
}
Status VariantRootColumnIterator::_process_root_column(
diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
index a217a7eee48..01dff89763d 100644
--- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h
@@ -29,8 +29,8 @@
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/rowset/segment_v2/indexed_column_reader.h"
#include "olap/rowset/segment_v2/page_handle.h"
-#include "olap/rowset/segment_v2/variant_statistics.h"
#include "olap/rowset/segment_v2/variant/variant_external_meta_reader.h"
+#include "olap/rowset/segment_v2/variant_statistics.h"
#include "olap/tablet_schema.h"
#include "vec/columns/column_object.h"
#include "vec/columns/subcolumn_tree.h"
@@ -99,14 +99,12 @@ public:
void get_nested_paths(std::unordered_set<vectorized::PathInData,
vectorized::PathInData::Hash>*
nested_paths) const;
- // Infer the storage data type for a variant subcolumn identified by
`relative_path`.
- // This encapsulates the same decision logic as Segment::get_data_type_of
for variant paths,
- // including handling of sparse columns, prefix paths, and flat-leaf
compaction mode.
- vectorized::DataTypePtr infer_data_type_for_path(const TabletColumn&
column,
- const
vectorized::PathInData& relative_path,
- bool read_flat_leaves,
- ColumnReaderCache* cache,
- int32_t col_uid) const;
+ // Infer the storage data type for a variant subcolumn using full
StorageReadOptions
+ // (reader type, tablet schema, etc). This shares the same decision logic
as
+ // `new_iterator` via an internal read plan, but does not create any
iterator.
+ Status infer_data_type_for_path(vectorized::DataTypePtr* type, const
TabletColumn& column,
+ const StorageReadOptions& opts,
+ ColumnReaderCache* column_reader_cache);
// Create a ColumnReader for a sub-column identified by `relative_path`.
// This method will first try inline footer.columns via footer_ordinal and
then
@@ -136,11 +134,52 @@ public:
bool has_prefix_path(const vectorized::PathInData& relative_path) const;
private:
+ // Describe how a variant sub-path should be read. This is a logical plan
only and
+ // does not create any concrete ColumnIterator.
+ enum class ReadKind {
+ ROOT_FLAT, // root variant using `VariantRootColumnIterator`
+ HIERARCHICAL, // hierarchical merge (root + subcolumns + sparse)
+ LEAF, // direct leaf reader
+ SPARSE_EXTRACT, // extract single path from sparse column
+ SPARSE_MERGE, // merge subcolumns into sparse column
+ DEFAULT_NESTED, // fill nested subcolumn using sibling nested column
+ DEFAULT_FILL // default iterator when path not exist
+ };
+
+ struct ReadPlan {
+ ReadKind kind {ReadKind::DEFAULT_FILL};
+ vectorized::DataTypePtr type;
+
+ // path & meta context
+ vectorized::PathInData relative_path;
+
+ std::shared_ptr<ColumnReader> leaf_column_reader = nullptr;
+ const SubcolumnColumnMetaInfo::Node* node = nullptr;
+ const SubcolumnColumnMetaInfo::Node* root = nullptr;
+ };
+
// init for compaction read
Status _new_default_iter_with_same_nested(ColumnIteratorUPtr* iterator,
const TabletColumn& col,
const StorageReadOptions* opt,
ColumnReaderCache*
column_reader_cache);
- Status _new_iterator_with_flat_leaves(ColumnIteratorUPtr* iterator, const
TabletColumn& col,
+
+ // Build read plan for flat-leaf (compaction/checksum) mode. Only decides
the
+ // resulting type and how to read, without creating iterators.
+ Status _build_read_plan_flat_leaves(ReadPlan* plan, const TabletColumn&
col,
+ const StorageReadOptions* opts,
+ ColumnReaderCache*
column_reader_cache);
+
+ // Build read plan for the general hierarchical reading mode.
+ Status _build_read_plan(ReadPlan* plan, const TabletColumn& target_col,
+ const StorageReadOptions* opt, ColumnReaderCache*
column_reader_cache);
+
+ // Materialize a concrete ColumnIterator according to the previously built
plan.
+ Status _create_iterator_from_plan(ColumnIteratorUPtr* iterator, const
ReadPlan& plan,
+ const TabletColumn& target_col, const
StorageReadOptions* opt,
+ ColumnReaderCache* column_reader_cache);
+
+ Status _new_iterator_with_flat_leaves(ColumnIteratorUPtr* iterator,
+ vectorized::DataTypePtr* type, const
TabletColumn& col,
const StorageReadOptions* opts,
bool exceeded_sparse_column_limit,
bool existed_in_sparse_column,
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 561a7a6ef57..ac436d3d233 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -1390,7 +1390,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB*
tablet_schema_pb) const {
_row_store_column_unique_ids.begin(),
_row_store_column_unique_ids.end());
tablet_schema_pb->set_variant_enable_flatten_nested(_variant_enable_flatten_nested);
tablet_schema_pb->set_is_external_segment_meta_used_default(
- _is_external_segment_meta_used_default);
+ _is_external_segment_meta_used_default);
}
size_t TabletSchema::row_size() const {
@@ -1720,7 +1720,8 @@ bool operator==(const TabletSchema& a, const
TabletSchema& b) {
if (a._storage_dict_page_size != b._storage_dict_page_size) return false;
if (a._skip_write_index_on_load != b._skip_write_index_on_load) return
false;
if (a._variant_enable_flatten_nested != b._variant_enable_flatten_nested)
return false;
- if (a._is_external_segment_meta_used_default !=
b._is_external_segment_meta_used_default) return false;
+ if (a._is_external_segment_meta_used_default !=
b._is_external_segment_meta_used_default)
+ return false;
return true;
}
diff --git a/be/test/olap/rowset/segment_v2/column_reader_cache_test.cpp
b/be/test/olap/rowset/segment_v2/column_reader_cache_test.cpp
index 6a2448d0468..4e0cbdd2a94 100644
--- a/be/test/olap/rowset/segment_v2/column_reader_cache_test.cpp
+++ b/be/test/olap/rowset/segment_v2/column_reader_cache_test.cpp
@@ -270,7 +270,8 @@ TEST_F(ColumnReaderCacheTest, VariantColumnPathReading) {
vectorized::PathInData path("field1");
std::shared_ptr<ColumnReader> path_reader;
Status status = _cache->get_path_column_reader(1, path, &path_reader,
&_stats);
- EXPECT_TRUE(status.ok());
+ // For non-existent subcolumn path, ColumnReaderCache propagates NOT_FOUND
and returns nullptr.
+ EXPECT_TRUE(status.is<ErrorCode::NOT_FOUND>());
EXPECT_EQ(path_reader, nullptr);
}
@@ -279,7 +280,8 @@ TEST_F(ColumnReaderCacheTest, NonExistentColumn) {
// Don't set up any column mapping
std::shared_ptr<ColumnReader> reader;
Status status = _cache->get_column_reader(999, &reader, &_stats);
- EXPECT_TRUE(status.ok());
+ // Non-existent column uid should return NOT_FOUND and nullptr reader.
+ EXPECT_TRUE(status.is<ErrorCode::NOT_FOUND>());
EXPECT_EQ(reader, nullptr);
}
@@ -297,7 +299,8 @@ TEST_F(ColumnReaderCacheTest, NonExistentVariantPath) {
vectorized::PathInData non_existent_path("non_existent_field");
std::shared_ptr<ColumnReader> reader;
Status status = _cache->get_path_column_reader(1, non_existent_path,
&reader, &_stats);
- EXPECT_TRUE(status.ok());
+ // Missing variant path should surface as NOT_FOUND with nullptr reader.
+ EXPECT_TRUE(status.is<ErrorCode::NOT_FOUND>());
EXPECT_EQ(reader, nullptr);
}
diff --git
a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
index 42c8a9a4bc3..463c15e5bea 100644
--- a/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
+++ b/be/test/olap/rowset/segment_v2/variant_column_writer_reader_test.cpp
@@ -2068,11 +2068,12 @@ TEST_F(VariantColumnWriterReaderTest,
test_nested_subcolumn) {
storageReadOptions.io_ctx.reader_type =
ReaderType::READER_CUMULATIVE_COMPACTION;
MockColumnReaderCache column_reader_cache(footer, file_reader,
_tablet_schema);
+ DataTypePtr nested_storage_type;
ColumnIteratorUPtr nested_column_iter;
- st =
variant_column_reader->_new_iterator_with_flat_leaves(&nested_column_iter,
target_column,
-
&storageReadOptions, false, false,
-
&column_reader_cache);
+ st = variant_column_reader->_new_iterator_with_flat_leaves(
+ &nested_column_iter, &nested_storage_type, target_column,
&storageReadOptions, false,
+ false, &column_reader_cache);
EXPECT_TRUE(st.ok()) << st.msg();
// check iter for read_by_rowids, next_batch
auto nested_iter =
assert_cast<DefaultNestedColumnIterator*>(nested_column_iter.get());
@@ -2100,8 +2101,8 @@ TEST_F(VariantColumnWriterReaderTest,
test_nested_subcolumn) {
{
ColumnIteratorUPtr nested_column_iter11;
st = variant_column_reader->_new_iterator_with_flat_leaves(
- &nested_column_iter11, target_column, &storageReadOptions,
false, false,
- &column_reader_cache);
+ &nested_column_iter11, &nested_storage_type, target_column,
&storageReadOptions,
+ false, false, &column_reader_cache);
EXPECT_TRUE(st.ok()) << st.msg();
st = nested_column_iter11->init(nested_column_iter_opts);
EXPECT_TRUE(st.ok()) << st.msg();
@@ -2146,9 +2147,9 @@ TEST_F(VariantColumnWriterReaderTest,
test_nested_subcolumn) {
<< target_column._column_path->has_nested_part();
ColumnIteratorUPtr nested_column_iter1;
- st =
variant_column_reader->_new_iterator_with_flat_leaves(&nested_column_iter1,
target_column,
-
&storageReadOptions, false, false,
-
&column_reader_cache);
+ st = variant_column_reader->_new_iterator_with_flat_leaves(
+ &nested_column_iter1, &nested_storage_type, target_column,
&storageReadOptions, false,
+ false, &column_reader_cache);
EXPECT_TRUE(st.ok()) << st.msg();
// check iter for read_by_rowids, next_batch
// dst is array<nullable(string)>
@@ -2166,8 +2167,8 @@ TEST_F(VariantColumnWriterReaderTest,
test_nested_subcolumn) {
// make read by nested_iter1 directly
ColumnIteratorUPtr nested_column_iter11;
st = variant_column_reader->_new_iterator_with_flat_leaves(
- &nested_column_iter11, target_column, &storageReadOptions,
false, false,
- &column_reader_cache);
+ &nested_column_iter11, &nested_storage_type, target_column,
&storageReadOptions,
+ false, false, &column_reader_cache);
EXPECT_TRUE(st.ok()) << st.msg();
st = nested_column_iter11->init(nested_column_iter_opts);
EXPECT_TRUE(st.ok()) << st.msg();
diff --git a/be/test/vec/common/schema_util_rowset_test.cpp
b/be/test/vec/common/schema_util_rowset_test.cpp
index 0f5c2a35820..7f4f08f3f36 100644
--- a/be/test/vec/common/schema_util_rowset_test.cpp
+++ b/be/test/vec/common/schema_util_rowset_test.cpp
@@ -372,8 +372,7 @@ TEST_F(SchemaUtilRowsetTest,
mixed_external_segment_meta_old_new) {
// 4. check that VariantCompactionUtil::check_path_stats works across
mixed segments
// This will internally create Segment / ColumnReader instances and should
be
// insensitive to whether a particular segment uses inline or external
meta.
- EXPECT_TRUE(
- schema_util::check_path_stats(rowsets, rowsets[0], tablet).ok());
+ EXPECT_TRUE(schema_util::check_path_stats(rowsets, rowsets[0],
tablet).ok());
}
TEST_F(SchemaUtilRowsetTest,
collect_path_stats_and_get_extended_compaction_schema) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]