This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push:
new 98dd3ee257c support vertical compaction merge sparse (#48401)
98dd3ee257c is described below
commit 98dd3ee257c06ab03074d4ac90abf23b39f7dd16
Author: lihangyu <[email protected]>
AuthorDate: Tue Mar 4 10:19:56 2025 +0800
support vertical compaction merge sparse (#48401)
---
be/src/cloud/cloud_base_compaction.h | 2 +-
be/src/cloud/cloud_rowset_writer.cpp | 5 +-
be/src/olap/base_tablet.cpp | 25 ++
be/src/olap/base_tablet.h | 3 +
be/src/olap/compaction.cpp | 18 +-
be/src/olap/compaction.h | 4 +-
be/src/olap/iterators.h | 3 +
be/src/olap/rowset/beta_rowset_writer.cpp | 9 +-
be/src/olap/rowset/rowset_writer_context.h | 3 +
be/src/olap/rowset/segment_v2/column_reader.cpp | 206 +++++++----
be/src/olap/rowset/segment_v2/column_reader.h | 12 +-
be/src/olap/rowset/segment_v2/column_writer.cpp | 6 +
be/src/olap/rowset/segment_v2/column_writer.h | 51 +++
.../rowset/segment_v2/hierarchical_data_reader.cpp | 177 ++++++++--
.../rowset/segment_v2/hierarchical_data_reader.h | 192 ++++++++++-
be/src/olap/rowset/segment_v2/segment.cpp | 6 +-
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 3 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 50 ++-
be/src/olap/rowset/segment_v2/segment_writer.h | 1 +
be/src/olap/rowset/segment_v2/stream_reader.h | 1 +
.../segment_v2/variant_column_writer_impl.cpp | 382 +++++++++++++++------
.../rowset/segment_v2/variant_column_writer_impl.h | 6 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 7 +-
be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 1 +
be/src/olap/tablet_schema.cpp | 7 +
be/src/olap/tablet_schema.h | 36 +-
be/src/vec/columns/column_dummy.h | 1 +
be/src/vec/columns/column_object.cpp | 6 +-
be/src/vec/columns/column_object.h | 22 +-
be/src/vec/common/schema_util.cpp | 190 +++++++++-
be/src/vec/common/schema_util.h | 14 +-
be/src/vec/json/path_in_data.h | 5 +
gensrc/proto/segment_v2.proto | 2 +-
.../suites/variant_github_events_p2/load.groovy | 7 +-
.../variant_p0/update/inverted_index/load.groovy | 9 +-
.../variant_p0/update/inverted_index/query.groovy | 3 +-
36 files changed, 1193 insertions(+), 282 deletions(-)
diff --git a/be/src/cloud/cloud_base_compaction.h
b/be/src/cloud/cloud_base_compaction.h
index 4240458f21b..13eeccb3d58 100644
--- a/be/src/cloud/cloud_base_compaction.h
+++ b/be/src/cloud/cloud_base_compaction.h
@@ -46,7 +46,7 @@ private:
void _filter_input_rowset();
- void build_basic_info();
+ Status build_basic_info();
ReaderType compaction_type() const override { return
ReaderType::READER_BASE_COMPACTION; }
diff --git a/be/src/cloud/cloud_rowset_writer.cpp
b/be/src/cloud/cloud_rowset_writer.cpp
index ebc411697ee..343ccc23b27 100644
--- a/be/src/cloud/cloud_rowset_writer.cpp
+++ b/be/src/cloud/cloud_rowset_writer.cpp
@@ -60,7 +60,10 @@ Status CloudRowsetWriter::init(const RowsetWriterContext&
rowset_writer_context)
DCHECK_NE(_context.newest_write_timestamp, -1);
_rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
}
- _rowset_meta->set_tablet_schema(_context.tablet_schema);
+ auto schema = _context.tablet_schema->need_record_variant_extended_schema()
+ ? _context.tablet_schema
+ :
_context.tablet_schema->copy_without_variant_extracted_columns();
+ _rowset_meta->set_tablet_schema(schema);
_context.segment_collector =
std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
_context.file_writer_creator =
std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
return Status::OK();
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index a4720f89d19..06ad16ab2af 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -30,6 +30,7 @@
#include "olap/delete_bitmap_calculator.h"
#include "olap/iterators.h"
#include "olap/memtable.h"
+#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/primary_key_index.h"
#include "olap/rowid_conversion.h"
@@ -163,6 +164,30 @@ TabletSchemaSPtr
BaseTablet::tablet_schema_with_merged_max_schema_version(
return target_schema;
}
+Status BaseTablet::get_compaction_schema(const
std::vector<RowsetMetaSharedPtr>& rowset_metas,
+ TabletSchemaSPtr& target_schema) {
+ RowsetMetaSharedPtr max_schema_version_rs = *std::max_element(
+ rowset_metas.begin(), rowset_metas.end(),
+ [](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
+ return !a->tablet_schema()
+ ? true
+ : (!b->tablet_schema()
+ ? false
+ :
a->tablet_schema()->schema_version() <
+
b->tablet_schema()->schema_version());
+ });
+ target_schema = max_schema_version_rs->tablet_schema();
+ if (target_schema->num_variant_columns() > 0) {
+ RowsetIdUnorderedSet rowset_ids;
+ for (const RowsetMetaSharedPtr& rs_meta : rowset_metas) {
+ rowset_ids.emplace(rs_meta->rowset_id());
+ }
+ RETURN_IF_ERROR(vectorized::schema_util::get_compaction_schema(
+ get_rowset_by_ids(&rowset_ids), target_schema));
+ }
+ return Status::OK();
+}
+
Status BaseTablet::set_tablet_state(TabletState state) {
if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state !=
TABLET_SHUTDOWN) {
return Status::Error<META_INVALID_ARGUMENT>(
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index c6de447200f..1af9327b4a8 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -135,6 +135,9 @@ public:
static TabletSchemaSPtr tablet_schema_with_merged_max_schema_version(
const std::vector<RowsetMetaSharedPtr>& rowset_metas);
+ Status get_compaction_schema(const std::vector<RowsetMetaSharedPtr>&
rowset_metas,
+ TabletSchemaSPtr& target_schema);
+
////////////////////////////////////////////////////////////////////////////
// begin MoW functions
////////////////////////////////////////////////////////////////////////////
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 571ec7f9525..acfcc34470f 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -289,7 +289,7 @@ Tablet* CompactionMixin::tablet() {
}
Status CompactionMixin::do_compact_ordered_rowsets() {
- build_basic_info();
+ RETURN_IF_ERROR(build_basic_info());
RowsetWriterContext ctx;
RETURN_IF_ERROR(construct_output_rowset_writer(ctx));
@@ -323,7 +323,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
return Status::OK();
}
-void CompactionMixin::build_basic_info() {
+Status CompactionMixin::build_basic_info() {
for (auto& rowset : _input_rowsets) {
_input_rowsets_data_size += rowset->data_disk_size();
_input_rowsets_index_size += rowset->index_disk_size();
@@ -344,6 +344,10 @@ void CompactionMixin::build_basic_info() {
std::transform(_input_rowsets.begin(), _input_rowsets.end(),
rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return
rowset->rowset_meta(); });
_cur_tablet_schema =
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
+ if (!_cur_tablet_schema->need_record_variant_extended_schema()) {
+ RETURN_IF_ERROR(_tablet->get_compaction_schema(rowset_metas,
_cur_tablet_schema));
+ }
+ return Status::OK();
}
bool CompactionMixin::handle_ordered_data_compaction() {
@@ -461,7 +465,7 @@ Status CompactionMixin::execute_compact_impl(int64_t
permits) {
_state = CompactionState::SUCCESS;
return Status::OK();
}
- build_basic_info();
+ RETURN_IF_ERROR(build_basic_info());
VLOG_DEBUG << "dump tablet schema: " <<
_cur_tablet_schema->dump_structure();
@@ -1348,7 +1352,7 @@ void Compaction::_load_segment_to_cache() {
}
}
-void CloudCompactionMixin::build_basic_info() {
+Status CloudCompactionMixin::build_basic_info() {
_output_version =
Version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
@@ -1358,6 +1362,10 @@ void CloudCompactionMixin::build_basic_info() {
std::transform(_input_rowsets.begin(), _input_rowsets.end(),
rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return
rowset->rowset_meta(); });
_cur_tablet_schema =
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
+ if (!_cur_tablet_schema->need_record_variant_extended_schema()) {
+ RETURN_IF_ERROR(_tablet->get_compaction_schema(rowset_metas,
_cur_tablet_schema));
+ }
+ return Status::OK();
}
int64_t CloudCompactionMixin::get_compaction_permits() {
@@ -1375,7 +1383,7 @@
CloudCompactionMixin::CloudCompactionMixin(CloudStorageEngine& engine, CloudTabl
Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {
OlapStopWatch watch;
- build_basic_info();
+ RETURN_IF_ERROR(build_basic_info());
LOG(INFO) << "start " << compaction_name() << ". tablet=" <<
_tablet->tablet_id()
<< ", output_version=" << _output_version << ", permits: " <<
permits;
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 057f4084b06..973d4e9d0ab 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -165,7 +165,7 @@ protected:
private:
Status execute_compact_impl(int64_t permits);
- void build_basic_info();
+ Status build_basic_info();
// Return true if do ordered data compaction successfully
bool handle_ordered_data_compaction();
@@ -204,7 +204,7 @@ private:
Status execute_compact_impl(int64_t permits);
- void build_basic_info();
+ Status build_basic_info();
virtual Status modify_rowsets();
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 1d3c2ddf6b6..963f4d23598 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -28,6 +28,7 @@
#include "olap/rowset/segment_v2/row_ranges.h"
#include "olap/tablet_schema.h"
#include "runtime/runtime_state.h"
+#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
@@ -119,6 +120,8 @@ public:
std::map<std::string, TypeDescriptor> target_cast_type_for_variants;
RowRanges row_ranges;
size_t topn_limit = 0;
+ // Cache for sparse column data to avoid redundant reads
+ vectorized::ColumnPtr sparse_column_cache;
};
struct CompactionSampleInfo {
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index dc155efe016..a7745d2bc56 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -301,7 +301,10 @@ Status BaseBetaRowsetWriter::init(const
RowsetWriterContext& rowset_writer_conte
_rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
}
_rowset_meta->set_tablet_uid(_context.tablet_uid);
- _rowset_meta->set_tablet_schema(_context.tablet_schema);
+ auto schema = _context.tablet_schema->need_record_variant_extended_schema()
+ ? _context.tablet_schema
+ :
_context.tablet_schema->copy_without_variant_extracted_columns();
+ _rowset_meta->set_tablet_schema(schema);
_context.segment_collector =
std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
_context.file_writer_creator =
std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
return Status::OK();
@@ -801,6 +804,10 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
// update rowset meta tablet schema if tablet schema updated
auto rowset_schema = _context.merged_tablet_schema != nullptr ?
_context.merged_tablet_schema
:
_context.tablet_schema;
+
+ rowset_schema = rowset_schema->need_record_variant_extended_schema()
+ ? rowset_schema
+ :
rowset_schema->copy_without_variant_extracted_columns();
_rowset_meta->set_tablet_schema(rowset_schema);
// If segment compaction occurs, the idx file info will become inaccurate.
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index cbdba6991ae..44902459b11 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -19,6 +19,9 @@
#include <gen_cpp/olap_file.pb.h>
+#include <string_view>
+#include <unordered_map>
+
#include "olap/olap_define.h"
#include "olap/partial_update_info.h"
#include "olap/storage_policy.h"
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index c71b45b82c1..dfe7c02c7a8 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -32,6 +32,7 @@
#include "common/status.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
+#include "io/io_common.h"
#include "olap/block_column_predicate.h"
#include "olap/column_predicate.h"
#include "olap/comparison_predicate.h"
@@ -223,11 +224,35 @@ Status ColumnReader::create_agg_state(const
ColumnReaderOptions& opts, const Col
agg_state_type->get_name(), int(type));
}
+bool ColumnReader::is_compaction_reader_type(ReaderType type) {
+ return type == ReaderType::READER_BASE_COMPACTION ||
+ type == ReaderType::READER_CUMULATIVE_COMPACTION ||
+ type == ReaderType::READER_COLD_DATA_COMPACTION ||
+ type == ReaderType::READER_SEGMENT_COMPACTION ||
+ type == ReaderType::READER_FULL_COMPACTION;
+}
+
const SubcolumnColumnReaders::Node* VariantColumnReader::get_reader_by_path(
const vectorized::PathInData& relative_path) const {
return _subcolumn_readers->find_leaf(relative_path);
}
+bool VariantColumnReader::exist_in_sparse_column(
+ const vectorized::PathInData& relative_path) const {
+ // Check if path exist in sparse column
+ bool existed_in_sparse_column =
+ !_statistics->sparse_column_non_null_size.empty() &&
+
_statistics->sparse_column_non_null_size.find(relative_path.get_path()) !=
+ _statistics->sparse_column_non_null_size.end();
+ const std::string& prefix = relative_path.get_path() + ".";
+ bool prefix_existed_in_sparse_column =
+ !_statistics->sparse_column_non_null_size.empty() &&
+ (_statistics->sparse_column_non_null_size.lower_bound(prefix) !=
+ _statistics->sparse_column_non_null_size.end()) &&
+
_statistics->sparse_column_non_null_size.lower_bound(prefix)->first.starts_with(prefix);
+ return existed_in_sparse_column || prefix_existed_in_sparse_column;
+}
+
int64_t VariantColumnReader::get_metadata_size() const {
int64_t size = ColumnReader::get_metadata_size();
if (_statistics) {
@@ -268,30 +293,40 @@ Status
VariantColumnReader::_create_hierarchical_reader(ColumnIterator** reader,
return Status::OK();
}
-bool VariantColumnReader::_read_flat_leaves(ReaderType type, const
TabletColumn& target_col) {
- auto relative_path = target_col.path_info_ptr()->copy_pop_front();
- bool is_compaction_type =
- (type == ReaderType::READER_BASE_COMPACTION ||
- type == ReaderType::READER_CUMULATIVE_COMPACTION ||
- type == ReaderType::READER_COLD_DATA_COMPACTION ||
- type == ReaderType::READER_SEGMENT_COMPACTION ||
- type == ReaderType::READER_FULL_COMPACTION || type ==
ReaderType::READER_CHECKSUM);
- // For compaction operations (e.g., base compaction, cumulative
compaction, cold data compaction,
- // segment compaction, full compaction, or checksum reading), a legacy
compaction style is applied
- // when reading variant columns.
- //
- // Specifically:
- // 1. If the target column is a root column (i.e., relative_path is empty)
and it does not have any
- // subcolumns (i.e., target_col.variant_max_subcolumns_count() <= 0),
then the legacy compaction style
- // is used.
- // 2. If the target column is a nested subcolumn (i.e., relative_path is
not empty), then the legacy
- // compaction style is also used.
- //
- // This ensures that during compaction, the reading behavior for variant
columns remains consistent
- // with historical processing methods, preventing potential data
amplification issues.
- return is_compaction_type &&
- ((relative_path.empty() &&
target_col.variant_max_subcolumns_count() <= 0) ||
- !relative_path.empty());
+Status VariantColumnReader::_create_sparse_merge_reader(ColumnIterator**
iterator,
+ const
StorageReadOptions* opts,
+ const TabletColumn&
target_col,
+ ColumnIterator*
inner_iter) {
+ // Get subcolumns path set from tablet schema
+ const auto& path_set_info =
opts->tablet_schema->path_set_info(target_col.parent_unique_id());
+
+ // Build substream reader tree for merging subcolumns into sparse column
+ SubstreamReaderTree src_subcolumns_for_sparse;
+ for (const auto& subcolumn_reader : *_subcolumn_readers) {
+ const auto& path = subcolumn_reader->path.get_path();
+ if (path_set_info.sparse_path_set.find(StringRef(path)) ==
+ path_set_info.sparse_path_set.end()) {
+ // The subcolumn is not a sparse column, skip it
+ continue;
+ }
+ // Create subcolumn iterator
+ ColumnIterator* it;
+ RETURN_IF_ERROR(subcolumn_reader->data.reader->new_iterator(&it));
+ std::unique_ptr<ColumnIterator> it_ptr(it);
+
+ // Create substream reader and add to tree
+ SubstreamIterator
reader(subcolumn_reader->data.file_column_type->create_column(),
+ std::move(it_ptr),
subcolumn_reader->data.file_column_type);
+ if (!src_subcolumns_for_sparse.add(subcolumn_reader->path,
std::move(reader))) {
+ return Status::InternalError("Failed to add node path {}", path);
+ }
+ }
+
+ // Create sparse column merge reader
+ *iterator = new SparseColumnMergeReader(
+ path_set_info.sub_path_set,
std::unique_ptr<ColumnIterator>(inner_iter),
+ std::move(src_subcolumns_for_sparse),
const_cast<StorageReadOptions*>(opts));
+ return Status::OK();
}
Status
VariantColumnReader::_new_default_iter_with_same_nested(ColumnIterator**
iterator,
@@ -331,20 +366,43 @@ Status
VariantColumnReader::_new_default_iter_with_same_nested(ColumnIterator**
}
Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIterator**
iterator,
- const TabletColumn&
target_col) {
+ const TabletColumn&
target_col,
+ const
StorageReadOptions* opts,
+ bool
exceeded_sparse_column_limit,
+ bool
existed_in_sparse_column) {
+ DCHECK(opts != nullptr);
auto relative_path = target_col.path_info_ptr()->copy_pop_front();
// compaction need to read flat leaves nodes data to prevent from
amplification
const auto* node =
target_col.has_path_info() ?
_subcolumn_readers->find_leaf(relative_path) : nullptr;
if (!node) {
+ if (existed_in_sparse_column || exceeded_sparse_column_limit) {
+ // Sparse column exists or reached sparse size limit, read sparse
column
+ ColumnIterator* inner_iter;
+ RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
+ DCHECK(opts);
+ *iterator = new SparseColumnExtractReader(
+ relative_path.get_path(),
std::unique_ptr<ColumnIterator>(inner_iter),
+ // need to modify sparse_column_cache, so use const_cast
here
+ const_cast<StorageReadOptions*>(opts));
+ return Status::OK();
+ }
+ if (relative_path.get_path() == SPARSE_COLUMN_PATH) {
+ // read sparse column and filter extracted columns in
subcolumn_path_map
+ ColumnIterator* inner_iter;
+ RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
+ // get subcolumns in sparse path set which will be merged into
sparse column
+ RETURN_IF_ERROR(_create_sparse_merge_reader(iterator, opts,
target_col, inner_iter));
+ return Status::OK();
+ }
if (target_col.is_nested_subcolumn()) {
// using the sibling of the nested column to fill the target
nested column
RETURN_IF_ERROR(_new_default_iter_with_same_nested(iterator,
target_col));
- } else {
- std::unique_ptr<ColumnIterator> it;
- RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &it));
- *iterator = it.release();
+ return Status::OK();
}
+ std::unique_ptr<ColumnIterator> it;
+ RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &it));
+ *iterator = it.release();
return Status::OK();
}
if (relative_path.empty()) {
@@ -365,9 +423,48 @@ Status VariantColumnReader::new_iterator(ColumnIterator**
iterator, const Tablet
const auto* node =
target_col.has_path_info() ?
_subcolumn_readers->find_exact(relative_path) : nullptr;
- if (opt != nullptr && _read_flat_leaves(opt->io_ctx.reader_type,
target_col)) {
+ // Check if path exist in sparse column
+ bool existed_in_sparse_column =
+ !_statistics->sparse_column_non_null_size.empty() &&
+
_statistics->sparse_column_non_null_size.find(relative_path.get_path()) !=
+ _statistics->sparse_column_non_null_size.end();
+
+ // Otherwise the prefix is not exist and the sparse column size is reached
limit
+ // which means the path maybe exist in sparse_column
+ bool exceeded_sparse_column_limit =
!_statistics->sparse_column_non_null_size.empty() &&
+
_statistics->sparse_column_non_null_size.size() >
+
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE;
+
+ // For compaction operations, read flat leaves, otherwise read
hierarchical data
+ // Since the variant subcolumns are flattened in
schema_util::get_compaction_schema
+ if (opt != nullptr && is_compaction_reader_type(opt->io_ctx.reader_type)) {
// original path, compaction with wide schema
- return _new_iterator_with_flat_leaves(iterator, target_col);
+ return _new_iterator_with_flat_leaves(
+ iterator, target_col, opt, exceeded_sparse_column_limit,
existed_in_sparse_column);
+ }
+
+ // Check if path is prefix, example sparse columns path: a.b.c, a.b.e,
access prefix: a.b.
+ // then we must read the sparse columns
+ const std::string& prefix = relative_path.get_path() + ".";
+ bool prefix_existed_in_sparse_column =
+ !_statistics->sparse_column_non_null_size.empty() &&
+ (_statistics->sparse_column_non_null_size.lower_bound(prefix) !=
+ _statistics->sparse_column_non_null_size.end()) &&
+
_statistics->sparse_column_non_null_size.lower_bound(prefix)->first.starts_with(prefix);
+ // if prefix exists in sparse column, read sparse column with hierarchical
reader
+ if (prefix_existed_in_sparse_column) {
+ return _create_hierarchical_reader(iterator, relative_path, nullptr,
root);
+ }
+
+ // if path exists in sparse column, read sparse column with extract reader
+ if (existed_in_sparse_column) {
+ // Sparse column exists or reached sparse size limit, read sparse
column
+ ColumnIterator* inner_iter;
+ RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
+ DCHECK(opt);
+ *iterator = new SparseColumnExtractReader(
+ relative_path.get_path(),
std::unique_ptr<ColumnIterator>(inner_iter), nullptr);
+ return Status::OK();
}
if (node != nullptr) {
@@ -381,34 +478,7 @@ Status VariantColumnReader::new_iterator(ColumnIterator**
iterator, const Tablet
RETURN_IF_ERROR(_create_hierarchical_reader(iterator,
relative_path, node, root));
}
} else {
- // Check if path exist in sparse column
- bool existed_in_sparse_column =
- _statistics &&
-
_statistics->sparse_column_non_null_size.find(relative_path.get_path()) !=
- _statistics->sparse_column_non_null_size.end();
- if (existed_in_sparse_column) {
- // Sparse column exists or reached sparse size limit, read sparse
column
- ColumnIterator* inner_iter;
- RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
- *iterator = new SparseColumnExtractReader(relative_path.get_path(),
-
std::unique_ptr<ColumnIterator>(inner_iter));
- return Status::OK();
- }
- // Check if path is prefix, example sparse columns path: a.b.c, a.b.e,
access prefix: a.b.
- // then we must read the sparse columns
- bool prefix_existed_in_sparse_column =
- _statistics &&
-
(_statistics->sparse_column_non_null_size.lower_bound(relative_path.get_path())
!=
- _statistics->sparse_column_non_null_size.end()) &&
-
_statistics->sparse_column_non_null_size.lower_bound(relative_path.get_path())
- ->first.starts_with(relative_path.get_path() + ".");
-
- // Otherwise the prefix is not exist and the sparse column size is
reached limit
- // which means the path maybe exist in sparse_column
- bool exceeded_sparse_column_limit =
- _statistics && _statistics->sparse_column_non_null_size.size()
>
-
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE;
- if (prefix_existed_in_sparse_column || exceeded_sparse_column_limit) {
+ if (exceeded_sparse_column_limit) {
return _create_hierarchical_reader(iterator, relative_path,
nullptr, root);
}
// Sparse column not exists and not reached stats limit, then the
target path is not exist, get a default iterator
@@ -424,6 +494,7 @@ Status VariantColumnReader::init(const ColumnReaderOptions&
opts, const SegmentF
io::FileReaderSPtr file_reader) {
// init sub columns
_subcolumn_readers = std::make_unique<SubcolumnColumnReaders>();
+ _statistics = std::make_unique<VariantStatistics>();
const ColumnMetaPB& self_column_pb = footer.columns(column_id);
for (const ColumnMetaPB& column_pb : footer.columns()) {
// Find all columns belonging to the current variant column
@@ -449,12 +520,20 @@ Status VariantColumnReader::init(const
ColumnReaderOptions& opts, const SegmentF
ColumnReader::create(opts, column_pb, footer.num_rows(),
file_reader, &reader));
vectorized::PathInData path;
path.from_protobuf(column_pb.column_path_info());
+
// init sparse column
- if (path.get_path() == SPARSE_COLUMN_PATH) {
+ if (path.copy_pop_front().get_path() == SPARSE_COLUMN_PATH) {
+ DCHECK(column_pb.has_variant_statistics());
+ const auto& variant_stats = column_pb.variant_statistics();
+ for (const auto& [path, size] :
variant_stats.sparse_column_non_null_size()) {
+ _statistics->sparse_column_non_null_size.emplace(path, size);
+ }
RETURN_IF_ERROR(ColumnReader::create(opts, column_pb,
footer.num_rows(), file_reader,
&_sparse_column_reader));
continue;
}
+
+ // init subcolumns
auto relative_path = path.copy_pop_front();
auto get_data_type_fn = [&]() {
// root subcolumn is ColumnObject::MostCommonType which is jsonb
@@ -476,6 +555,10 @@ Status VariantColumnReader::init(const
ColumnReaderOptions& opts, const SegmentF
SubcolumnReader {std::move(reader), get_data_type_fn()});
} else {
// check the root is already a leaf node
+ if (column_pb.has_none_null_size()) {
+
_statistics->subcolumns_non_null_size.emplace(relative_path.get_path(),
+
column_pb.none_null_size());
+ }
_subcolumn_readers->add(relative_path,
SubcolumnReader {std::move(reader),
get_data_type_fn()});
// init TabletIndex for subcolumns
@@ -500,9 +583,6 @@ Status VariantColumnReader::init(const ColumnReaderOptions&
opts, const SegmentF
for (const auto& [path, size] :
variant_stats.sparse_column_non_null_size()) {
_statistics->sparse_column_non_null_size.emplace(path, size);
}
- for (const auto& [path, size] :
variant_stats.subcolumn_non_null_size()) {
- _statistics->subcolumns_non_null_size.emplace(path, size);
- }
}
return Status::OK();
}
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h
b/be/src/olap/rowset/segment_v2/column_reader.h
index 49058ef511f..311be3f9d73 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -144,6 +144,8 @@ public:
std::unique_ptr<ColumnReader>* reader);
enum DictEncodingType { UNKNOWN_DICT_ENCODING, PARTIAL_DICT_ENCODING,
ALL_DICT_ENCODING };
+ static bool is_compaction_reader_type(ReaderType type);
+
~ColumnReader() override;
// create a new column iterator. Client should delete returned iterator
@@ -329,15 +331,21 @@ public:
TabletIndex* find_subcolumn_tablet_index(const std::string&);
+ bool exist_in_sparse_column(const vectorized::PathInData& path) const;
+
private:
- bool _read_flat_leaves(ReaderType type, const TabletColumn& target_col);
// init for compaction read
Status _new_default_iter_with_same_nested(ColumnIterator** iterator, const
TabletColumn& col);
- Status _new_iterator_with_flat_leaves(ColumnIterator** iterator, const
TabletColumn& col);
+ Status _new_iterator_with_flat_leaves(ColumnIterator** iterator, const
TabletColumn& col,
+ const StorageReadOptions* opts,
+ bool exceeded_sparse_column_limit,
+ bool existed_in_sparse_column);
Status _create_hierarchical_reader(ColumnIterator** reader,
vectorized::PathInData path,
const SubcolumnColumnReaders::Node*
node,
const SubcolumnColumnReaders::Node*
root);
+ Status _create_sparse_merge_reader(ColumnIterator** iterator, const
StorageReadOptions* opts,
+ const TabletColumn& target_col,
ColumnIterator* inner_iter);
std::unique_ptr<SubcolumnColumnReaders> _subcolumn_readers;
std::unique_ptr<ColumnReader> _sparse_column_reader;
std::unique_ptr<VariantStatistics> _statistics;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 3f001cb5671..eb42acb539d 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -296,6 +296,12 @@ Status ColumnWriter::create_agg_state_writer(const
ColumnWriterOptions& opts,
Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts,
const TabletColumn* column,
io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>*
writer) {
+ if (column->is_extracted_column()) {
+ VLOG_DEBUG << "gen subwriter for " <<
column->path_info_ptr()->get_path();
+ *writer = std::unique_ptr<ColumnWriter>(new VariantSubcolumnWriter(
+ opts, column,
std::unique_ptr<Field>(FieldFactory::create(*column))));
+ return Status::OK();
+ }
*writer = std::unique_ptr<ColumnWriter>(new VariantColumnWriter(
opts, column,
std::unique_ptr<Field>(FieldFactory::create(*column))));
return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h
b/be/src/olap/rowset/segment_v2/column_writer.h
index 53d7c5d0234..41464982476 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -35,6 +35,7 @@
#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "util/bitmap.h" // for BitmapChange
#include "util/slice.h" // for OwnedSlice
+#include "vec/columns/column.h"
namespace doris {
@@ -475,6 +476,56 @@ private:
ColumnWriterOptions _opts;
};
+// used for compaction to write sub variant column
+class VariantSubcolumnWriter : public ColumnWriter {
+public:
+ explicit VariantSubcolumnWriter(const ColumnWriterOptions& opts, const
TabletColumn* column,
+ std::unique_ptr<Field> field);
+
+ ~VariantSubcolumnWriter() override = default;
+
+ Status init() override;
+
+ Status append_data(const uint8_t** ptr, size_t num_rows) override;
+
+ uint64_t estimate_buffer_size() override;
+
+ Status finish() override;
+ Status write_data() override;
+ Status write_ordinal_index() override;
+
+ Status write_zone_map() override;
+
+ Status write_bitmap_index() override;
+ Status write_inverted_index() override;
+ Status write_bloom_filter_index() override;
+ ordinal_t get_next_rowid() const override { return _next_rowid; }
+
+ Status append_nulls(size_t num_rows) override {
+ return Status::NotSupported("variant writer can not append_nulls");
+ }
+ Status append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) override;
+
+ Status finish_current_page() override {
+ return Status::NotSupported("variant writer has no data, can not
finish_current_page");
+ }
+
+ size_t get_non_null_size() const { return none_null_size; }
+
+ Status finalize();
+
+private:
+ bool is_finalized() const;
+ bool _is_finalized = false;
+ ordinal_t _next_rowid = 0;
+ size_t none_null_size = 0;
+ vectorized::MutableColumnPtr _column;
+ const TabletColumn* _tablet_column = nullptr;
+ ColumnWriterOptions _opts;
+ std::unique_ptr<ColumnWriter> _writer;
+ std::unique_ptr<TabletIndex> _index;
+};
+
class VariantColumnWriter : public ColumnWriter {
public:
explicit VariantColumnWriter(const ColumnWriterOptions& opts, const
TabletColumn* column,
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
index b0788c10b54..e739028a3e7 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
@@ -381,7 +381,12 @@ Status
HierarchicalDataReader::_process_sparse_column(vectorized::ColumnObject&
// b maybe in sparse column, and b.c is in subolumn,
put `b` into root column to distinguish
// from "" which is empty path and root
if (container_variant.is_null_root()) {
- container_variant.add_sub_column({},
sparse_data_offsets.size());
+ // root was created with nrows with Nothing type,
resize it to fit the size of sparse column
+
container_variant.get_root()->resize(sparse_data_offsets.size());
+ // bool added =
container_variant.add_sub_column({}, sparse_data_offsets.size());
+ // if (!added) {
+ // return Status::InternalError("Failed to add
subcolumn for sparse column");
+ // }
}
const auto& data =
ColumnObject::deserialize_from_sparse_column(
&src_sparse_data_values, lower_bound_index);
@@ -435,16 +440,12 @@ Status
HierarchicalDataReader::_init_null_map_and_clear_columns(
return Status::OK();
}
-Status SparseColumnExtractReader::init(const ColumnIteratorOptions& opts) {
- return _sparse_column_reader->init(opts);
-}
-
-Status SparseColumnExtractReader::seek_to_first() {
- return _sparse_column_reader->seek_to_first();
-}
-
-Status SparseColumnExtractReader::seek_to_ordinal(ordinal_t ord) {
- return _sparse_column_reader->seek_to_ordinal(ord);
+bool is_compaction_type(ReaderType type) {
+ return type == ReaderType::READER_BASE_COMPACTION ||
+ type == ReaderType::READER_CUMULATIVE_COMPACTION ||
+ type == ReaderType::READER_COLD_DATA_COMPACTION ||
+ type == ReaderType::READER_SEGMENT_COMPACTION ||
+ type == ReaderType::READER_FULL_COMPACTION;
}
void
SparseColumnExtractReader::_fill_path_column(vectorized::MutableColumnPtr& dst)
{
@@ -469,39 +470,147 @@ void
SparseColumnExtractReader::_fill_path_column(vectorized::MutableColumnPtr&
#ifndef NDEBUG
var.check_consistency();
#endif
- _sparse_column->clear();
+ // _sparse_column->clear();
}
-Status SparseColumnExtractReader::next_batch(size_t* n,
vectorized::MutableColumnPtr& dst,
- bool* has_null) {
- _sparse_column->clear();
- RETURN_IF_ERROR(_sparse_column_reader->next_batch(n, _sparse_column,
has_null));
- const auto& offsets = assert_cast<const
vectorized::ColumnMap&>(*_sparse_column).get_offsets();
- // Check if we don't have any paths in shared data in current range.
- if (offsets.back() == offsets[-1]) {
- dst->insert_many_defaults(*n);
- } else {
- _fill_path_column(dst);
+Status SparseColumnMergeReader::seek_to_first() {
+ RETURN_IF_ERROR(_sparse_column_reader->seek_to_first());
+ for (auto& entry : _src_subcolumns_for_sparse) {
+ RETURN_IF_ERROR(entry->data.iterator->seek_to_first());
}
return Status::OK();
}
-Status SparseColumnExtractReader::read_by_rowids(const rowid_t* rowids, const
size_t count,
- vectorized::MutableColumnPtr&
dst) {
- _sparse_column->clear();
- RETURN_IF_ERROR(_sparse_column_reader->read_by_rowids(rowids, count,
_sparse_column));
- const auto& offsets = assert_cast<const
vectorized::ColumnMap&>(*_sparse_column).get_offsets();
- // Check if we don't have any paths in shared data in current range.
- if (offsets.back() == offsets[-1]) {
- dst->insert_many_defaults(count);
- } else {
- _fill_path_column(dst);
+Status SparseColumnMergeReader::seek_to_ordinal(ordinal_t ord) {
+ RETURN_IF_ERROR(_sparse_column_reader->seek_to_ordinal(ord));
+ for (auto& entry : _src_subcolumns_for_sparse) {
+ RETURN_IF_ERROR(entry->data.iterator->seek_to_ordinal(ord));
+ }
+ return Status::OK();
+}
+
+Status SparseColumnMergeReader::init(const ColumnIteratorOptions& opts) {
+ RETURN_IF_ERROR(_sparse_column_reader->init(opts));
+ for (auto& entry : _src_subcolumns_for_sparse) {
+ entry->data.serde = entry->data.type->get_serde();
+ RETURN_IF_ERROR(entry->data.iterator->init(opts));
+ const auto& path = entry->path.get_path();
+ _sorted_src_subcolumn_for_sparse.emplace_back(StringRef(path.data(),
path.size()), entry);
}
+
+ // sort src subcolumns by path
+ std::sort(
+ _sorted_src_subcolumn_for_sparse.begin(),
_sorted_src_subcolumn_for_sparse.end(),
+ [](const auto& lhsItem, const auto& rhsItem) { return
lhsItem.first < rhsItem.first; });
return Status::OK();
}
-ordinal_t SparseColumnExtractReader::get_current_ordinal() const {
- return _sparse_column_reader->get_current_ordinal();
+void SparseColumnMergeReader::_serialize_nullable_column_to_sparse(
+ const SubstreamReaderTree::Node* src_subcolumn,
+ vectorized::ColumnString& dst_sparse_column_paths,
+ vectorized::ColumnString& dst_sparse_column_values, const StringRef&
src_path, size_t row) {
+ // every subcolumn is always Nullable
+ const auto& nullable_serde =
+
assert_cast<vectorized::DataTypeNullableSerDe&>(*src_subcolumn->data.serde);
+ const auto& nullable_col =
+ assert_cast<const vectorized::ColumnNullable&,
TypeCheckOnRelease::DISABLE>(
+ *src_subcolumn->data.column);
+ if (nullable_col.is_null_at(row)) {
+ return;
+ }
+ // insert key
+ dst_sparse_column_paths.insert_data(src_path.data, src_path.size);
+ // insert value
+ vectorized::ColumnString::Chars& chars =
dst_sparse_column_values.get_chars();
+
nullable_serde.get_nested_serde()->write_one_cell_to_binary(nullable_col.get_nested_column(),
+ chars, row);
+ dst_sparse_column_values.get_offsets().push_back(chars.size());
+}
+
+void
SparseColumnMergeReader::_process_data_without_sparse_column(vectorized::MutableColumnPtr&
dst,
+ size_t
num_rows) {
+ if (_src_subcolumns_for_sparse.empty()) {
+ dst->insert_many_defaults(num_rows);
+ } else {
+ // merge subcolumns to sparse column
+ // Otherwise insert required src dense columns into sparse column.
+ auto& map_column = assert_cast<vectorized::ColumnMap&>(*dst);
+ auto& sparse_column_keys =
assert_cast<vectorized::ColumnString&>(map_column.get_keys());
+ auto& sparse_column_values =
+
assert_cast<vectorized::ColumnString&>(map_column.get_values());
+ auto& sparse_column_offsets = map_column.get_offsets();
+ for (size_t i = 0; i != num_rows; ++i) {
+ // Paths in sorted_src_subcolumn_for_sparse_column are already
sorted.
+ for (const auto& entry : _sorted_src_subcolumn_for_sparse) {
+ const auto& path = entry.first;
+ _serialize_nullable_column_to_sparse(entry.second.get(),
sparse_column_keys,
+ sparse_column_values,
path, i);
+ }
+ sparse_column_offsets.push_back(sparse_column_keys.size());
+ }
+ }
+}
+
+void SparseColumnMergeReader::_merge_to(vectorized::MutableColumnPtr& dst) {
+ auto& column_map = assert_cast<vectorized::ColumnMap&>(*dst);
+ auto& dst_sparse_column_paths =
assert_cast<vectorized::ColumnString&>(column_map.get_keys());
+ auto& dst_sparse_column_values =
+ assert_cast<vectorized::ColumnString&>(column_map.get_values());
+ auto& dst_sparse_column_offsets = column_map.get_offsets();
+
+ const auto& src_column_map = assert_cast<const
vectorized::ColumnMap&>(*_sparse_column);
+ const auto& src_sparse_column_paths =
+ assert_cast<const
vectorized::ColumnString&>(*src_column_map.get_keys_ptr());
+ const auto& src_sparse_column_values =
+ assert_cast<const
vectorized::ColumnString&>(*src_column_map.get_values_ptr());
+ const auto& src_serialized_sparse_column_offsets =
src_column_map.get_offsets();
+ DCHECK_EQ(src_sparse_column_paths.size(), src_sparse_column_values.size());
+ // Src object column contains some paths in serialized sparse column in
specified range.
+ // Iterate over this range and insert all required paths into serialized
sparse column or subcolumns.
+ for (size_t row = 0; row != _sparse_column->size(); ++row) {
+ // Use separate index to iterate over sorted
sorted_src_subcolumn_for_sparse_column.
+ size_t sorted_src_subcolumn_for_sparse_column_idx = 0;
+ size_t sorted_src_subcolumn_for_sparse_column_size =
_src_subcolumns_for_sparse.size();
+
+ size_t offset = src_serialized_sparse_column_offsets[row - 1];
+ size_t end = src_serialized_sparse_column_offsets[row];
+ // Iterator over [path, binary value]
+ for (size_t i = offset; i != end; ++i) {
+ const StringRef src_sparse_path_string =
src_sparse_column_paths.get_data_at(i);
+ // Check if we have this path in subcolumns. This path already
materialized in subcolumns.
+ // So we don't need to insert it into sparse column.
+ if (!_src_subcolumn_map.contains(src_sparse_path_string)) {
+ // Before inserting this path into sparse column check if we
need to
+ // insert subcolumns from
sorted_src_subcolumn_for_sparse_column before.
+ while (sorted_src_subcolumn_for_sparse_column_idx <
+ sorted_src_subcolumn_for_sparse_column_size &&
+
_sorted_src_subcolumn_for_sparse[sorted_src_subcolumn_for_sparse_column_idx]
+ .first < src_sparse_path_string) {
+ auto& [src_path, src_subcolumn] =
_sorted_src_subcolumn_for_sparse
+ [sorted_src_subcolumn_for_sparse_column_idx++];
+ _serialize_nullable_column_to_sparse(src_subcolumn.get(),
+
dst_sparse_column_paths,
+
dst_sparse_column_values, src_path, row);
+ }
+
+ /// Insert path and value from src sparse column to our sparse
column.
+ dst_sparse_column_paths.insert_from(src_sparse_column_paths,
i);
+ dst_sparse_column_values.insert_from(src_sparse_column_values,
i);
+ }
+ }
+
+ // Insert remaining dynamic paths from
src_dynamic_paths_for_sparse_data.
+ while (sorted_src_subcolumn_for_sparse_column_idx <
+ sorted_src_subcolumn_for_sparse_column_size) {
+ auto& [src_path, src_subcolumn] =
+
_sorted_src_subcolumn_for_sparse[sorted_src_subcolumn_for_sparse_column_idx++];
+ _serialize_nullable_column_to_sparse(src_subcolumn.get(),
dst_sparse_column_paths,
+ dst_sparse_column_values,
src_path, row);
+ }
+
+ // All the sparse columns in this row are null.
+ dst_sparse_column_offsets.push_back(dst_sparse_column_paths.size());
+ }
}
} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index a99c15bb12a..591b706e0e7 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -17,6 +17,8 @@
#pragma once
+#include <parallel_hashmap/phmap.h>
+
#include <memory>
#include <string_view>
#include <unordered_map>
@@ -164,34 +166,194 @@ private:
}
};
-// Extract path from sparse column
-class SparseColumnExtractReader : public ColumnIterator {
+// Base class for sparse column processors with common functionality
+class BaseSparseColumnProcessor : public ColumnIterator {
+protected:
+ vectorized::MutableColumnPtr _sparse_column;
+ StorageReadOptions* _read_opts; // Shared cache pointer
+ std::unique_ptr<ColumnIterator> _sparse_column_reader;
+
+ // Pure virtual method for data processing when encounter existing sparse
columns(to be implemented by subclasses)
+ virtual void
_process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst,
+ size_t num_rows) =
0;
+
+ // Pure virtual method for data processing when no sparse columns(to be
implemented by subclasses)
+ virtual void
_process_data_without_sparse_column(vectorized::MutableColumnPtr& dst,
+ size_t num_rows) = 0;
+
public:
- SparseColumnExtractReader(std::string path,
- std::unique_ptr<ColumnIterator>&&
sparse_column_reader)
- : _path(std::move(path)),
_sparse_column_reader(std::move(sparse_column_reader)) {
+ BaseSparseColumnProcessor(std::unique_ptr<ColumnIterator>&& reader,
StorageReadOptions* opts)
+ : _read_opts(opts), _sparse_column_reader(std::move(reader)) {
_sparse_column = vectorized::ColumnObject::create_sparse_column_fn();
}
- Status init(const ColumnIteratorOptions& opts) override;
+ // Common initialization for all processors
+ Status init(const ColumnIteratorOptions& opts) override {
+ return _sparse_column_reader->init(opts);
+ }
- Status seek_to_first() override;
+ // Standard seek implementations
+ Status seek_to_first() override { return
_sparse_column_reader->seek_to_first(); }
- Status seek_to_ordinal(ordinal_t ord) override;
+ Status seek_to_ordinal(ordinal_t ord) override {
+ return _sparse_column_reader->seek_to_ordinal(ord);
+ }
- Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool*
has_null) override;
+ ordinal_t get_current_ordinal() const override {
+ throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "not
implement");
+ }
+
+ // Template method pattern for batch processing
+ template <typename ReadMethod>
+ Status _process_batch(ReadMethod&& read_method, size_t nrows,
+ vectorized::MutableColumnPtr& dst) {
+ // Cache check and population logic
+ if (_read_opts && _read_opts->sparse_column_cache &&
+
ColumnReader::is_compaction_reader_type(_read_opts->io_ctx.reader_type)) {
+ _sparse_column = _read_opts->sparse_column_cache->assume_mutable();
+ } else {
+ _sparse_column->clear();
+ RETURN_IF_ERROR(read_method());
+
+ if (_read_opts) {
+ _read_opts->sparse_column_cache =
_sparse_column->assume_mutable();
+ }
+ }
+ const auto& offsets =
+ assert_cast<const
vectorized::ColumnMap&>(*_sparse_column).get_offsets();
+ if (offsets.back() == offsets[-1]) {
+ // no sparse column in this batch
+ _process_data_without_sparse_column(dst, nrows);
+ } else {
+ // merge subcolumns to existing sparse columns
+ _process_data_with_existing_sparse_column(dst, nrows);
+ }
+ return Status::OK();
+ }
+
+ // Batch processing using template method
+ Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool*
has_null) override {
+ return _process_batch(
+ [&]() { return _sparse_column_reader->next_batch(n,
_sparse_column, has_null); },
+ *n, dst);
+ }
+
+ // RowID-based read using template method
Status read_by_rowids(const rowid_t* rowids, const size_t count,
- vectorized::MutableColumnPtr& dst) override;
+ vectorized::MutableColumnPtr& dst) override {
+ return _process_batch(
+ [&]() {
+ return _sparse_column_reader->read_by_rowids(rowids,
count, _sparse_column);
+ },
+ count, dst);
+ }
+};
- ordinal_t get_current_ordinal() const override;
+// Implementation for path extraction processor
+class SparseColumnExtractReader : public BaseSparseColumnProcessor {
+public:
+ SparseColumnExtractReader(std::string_view path,
std::unique_ptr<ColumnIterator> reader,
+ StorageReadOptions* opts)
+ : BaseSparseColumnProcessor(std::move(reader), opts), _path(path)
{}
private:
- void _fill_path_column(vectorized::MutableColumnPtr& dst);
- vectorized::MutableColumnPtr _sparse_column;
std::string _path;
- // may shared among different column iterators
- std::unique_ptr<ColumnIterator> _sparse_column_reader;
+
+ // Fill column by finding path in sparse column
+ void
_process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst,
+ size_t num_rows) override {
+ _fill_path_column(dst);
+ }
+
+ void _fill_path_column(vectorized::MutableColumnPtr& dst);
+
+ void _process_data_without_sparse_column(vectorized::MutableColumnPtr& dst,
+ size_t num_rows) override {
+ dst->insert_many_defaults(num_rows);
+ }
+};
+
+// Implementation for merge processor
+class SparseColumnMergeReader : public BaseSparseColumnProcessor {
+public:
+ SparseColumnMergeReader(const TabletSchema::PathSet& path_map,
+ std::unique_ptr<ColumnIterator>&&
sparse_column_reader,
+ SubstreamReaderTree&& src_subcolumns_for_sparse,
+ StorageReadOptions* opts)
+ : BaseSparseColumnProcessor(std::move(sparse_column_reader), opts),
+ _src_subcolumn_map(path_map),
+ _src_subcolumns_for_sparse(src_subcolumns_for_sparse) {}
+ Status init(const ColumnIteratorOptions& opts) override;
+
+ // Batch processing using template method
+ Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool*
has_null) override {
+ // read subcolumns first
+ RETURN_IF_ERROR(_read_subcolumns([&](SubstreamReaderTree::Node* entry)
{
+ bool has_null = false;
+ return entry->data.iterator->next_batch(n, entry->data.column,
&has_null);
+ }));
+ // then read sparse column
+ return _process_batch(
+ [&]() { return _sparse_column_reader->next_batch(n,
_sparse_column, has_null); },
+ *n, dst);
+ }
+
+ // RowID-based read using template method
+ Status read_by_rowids(const rowid_t* rowids, const size_t count,
+ vectorized::MutableColumnPtr& dst) override {
+ // read subcolumns first
+ RETURN_IF_ERROR(_read_subcolumns([&](SubstreamReaderTree::Node* entry)
{
+ return entry->data.iterator->read_by_rowids(rowids, count,
entry->data.column);
+ }));
+ // then read sparse column
+ return _process_batch(
+ [&]() {
+ return _sparse_column_reader->read_by_rowids(rowids,
count, _sparse_column);
+ },
+ count, dst);
+ }
+
+ Status seek_to_first() override;
+
+ Status seek_to_ordinal(ordinal_t ord) override;
+
+private:
+ template <typename ReadFunction>
+ Status _read_subcolumns(ReadFunction&& read_func) {
+ // clear previous data
+ for (auto& entry : _src_subcolumns_for_sparse) {
+ entry->data.column->clear();
+ }
+ // read subcolumns
+ for (auto& entry : _src_subcolumns_for_sparse) {
+ RETURN_IF_ERROR(read_func(entry.get()));
+ }
+ return Status::OK();
+ }
+
+ // subcolumns in src tablet schema, which will be filtered
+ const TabletSchema::PathSet& _src_subcolumn_map;
+ // subcolumns to merge to sparse column
+ SubstreamReaderTree _src_subcolumns_for_sparse;
+ std::vector<std::pair<StringRef,
std::shared_ptr<SubstreamReaderTree::Node>>>
+ _sorted_src_subcolumn_for_sparse;
+
+ // Path filtering implementation
+ void
_process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst,
+ size_t num_rows) override {
+ _merge_to(dst);
+ }
+
+ void _merge_to(vectorized::MutableColumnPtr& dst);
+
+ void _process_data_without_sparse_column(vectorized::MutableColumnPtr& dst,
+ size_t num_rows) override;
+
+ void _serialize_nullable_column_to_sparse(const SubstreamReaderTree::Node*
src_subcolumn,
+ vectorized::ColumnString&
dst_sparse_column_paths,
+ vectorized::ColumnString&
dst_sparse_column_values,
+ const StringRef& src_path,
size_t row);
};
} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index df7e47854f4..58fb5341e92 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -583,7 +583,9 @@ vectorized::DataTypePtr Segment::get_data_type_of(const
TabletColumn& column,
->get_reader_by_path(relative_path)
: nullptr;
if (node) {
- if (read_flat_leaves || (node->children.empty())) {
+ bool exist_in_sparse =
((VariantColumnReader*)(_column_readers.at(unique_id).get()))
+
->exist_in_sparse_column(relative_path);
+ if (read_flat_leaves || (node->children.empty() &&
!exist_in_sparse)) {
return node->data.file_column_type;
}
}
@@ -591,7 +593,7 @@ vectorized::DataTypePtr Segment::get_data_type_of(const
TabletColumn& column,
if (read_flat_leaves && !node) {
return nullptr;
}
- // it contains children or column missing in storage, so treat it as
variant
+ // it contains children, exist in sparse column or column missing in
storage, so treat it as variant
return column.is_nullable()
?
vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>(
column.variant_max_subcolumns_count()))
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 5e2f6dcbed7..a9db0a81307 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -2023,7 +2023,8 @@ void SegmentIterator::_clear_iterators() {
Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
bool is_mem_reuse = block->mem_reuse();
DCHECK(is_mem_reuse);
-
+ // Clear the sparse column cache before processing a new batch
+ _opts.sparse_column_cache = nullptr;
SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
if (UNLIKELY(!_lazy_inited)) {
RETURN_IF_ERROR(_lazy_init());
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index ece06ffdcf4..1720b0ddba4 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -39,6 +39,7 @@
#include "olap/data_dir.h"
#include "olap/key_coder.h"
#include "olap/olap_common.h"
+#include "olap/olap_define.h"
#include "olap/partial_update_info.h"
#include "olap/primary_key_index.h"
#include "olap/row_cursor.h" // RowCursor // IWYU pragma:
keep
@@ -161,14 +162,12 @@ void SegmentWriter::init_column_meta(ColumnMetaPB* meta,
uint32_t column_id,
init_column_meta(meta->add_children_columns(), column_id,
column.get_sub_column(i),
tablet_schema);
}
- // add sparse column to footer
- for (uint32_t i = 0; i < column.num_sparse_columns(); i++) {
- init_column_meta(meta->add_sparse_columns(), -1,
column.sparse_column_at(i), tablet_schema);
- }
meta->set_result_is_nullable(column.get_result_is_nullable());
meta->set_function_name(column.get_aggregation_name());
meta->set_be_exec_version(column.get_be_exec_version());
-
meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
+ if (column.is_variant_type()) {
+
meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
+ }
}
Status SegmentWriter::init() {
@@ -364,9 +363,7 @@ void SegmentWriter::_maybe_invalid_row_cache(const
std::string& key) {
// which will be used to contruct the new schema for rowset
Status SegmentWriter::append_block_with_variant_subcolumns(vectorized::Block&
data) {
if (_tablet_schema->num_variant_columns() == 0 ||
- // need to handle sparse columns if variant_max_subcolumns_count > 0
- std::any_of(_tablet_schema->columns().begin(),
_tablet_schema->columns().end(),
- [](const auto& col) { return
col->variant_max_subcolumns_count() > 0; })) {
+ !_tablet_schema->need_record_variant_extended_schema()) {
return Status::OK();
}
size_t column_id = _tablet_schema->num_columns();
@@ -809,6 +806,10 @@ Status SegmentWriter::append_block(const
vectorized::Block* block, size_t row_po
}
RETURN_IF_ERROR(_column_writers[id]->append(converted_result.second->get_nullmap(),
converted_result.second->get_data(), num_rows));
+
+ // caculate stats for variant type
+ // TODO it's tricky here, maybe come up with a better idea
+ _maybe_calculate_variant_stats(block, id, cid);
}
if (_has_key) {
if (_is_mow_with_cluster_key()) {
@@ -1324,5 +1325,38 @@ inline bool SegmentWriter::_is_mow() {
inline bool SegmentWriter::_is_mow_with_cluster_key() {
return _is_mow() && !_tablet_schema->cluster_key_uids().empty();
}
+
+// Compaction will extend sparse column and is visible during read and write,
in order to
+// persit variant stats info, we should do extra caculation during flushing
segment, otherwise
+// the info is lost
+void SegmentWriter::_maybe_calculate_variant_stats(const vectorized::Block*
block, size_t id,
+ size_t cid) {
+ // Only process sparse columns during compaction
+ if (!_tablet_schema->columns()[cid]->is_sparse_column() ||
+ _opts.write_type != DataWriteType::TYPE_COMPACTION) {
+ return;
+ }
+
+ // Get parent column's unique ID for matching
+ int64_t parent_unique_id =
_tablet_schema->columns()[cid]->parent_unique_id();
+
+ // Find matching column in footer
+ for (auto& column : *_footer.mutable_columns()) {
+ // Check if this is the target sparse column
+ if (!column.has_column_path_info() ||
+ !column.column_path_info().path().ends_with(SPARSE_COLUMN_PATH) ||
+ column.column_path_info().parrent_column_unique_id() !=
parent_unique_id) {
+ continue;
+ }
+
+ // Found matching column, calculate statistics
+ auto* stats = column.mutable_variant_statistics();
+
vectorized::schema_util::calculate_variant_stats(*block->get_by_position(id).column,
stats);
+
+ VLOG_DEBUG << "sparse stats columns " <<
stats->sparse_column_non_null_size_size();
+ break;
+ }
+}
+
} // namespace segment_v2
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 60300383d72..f505aaeaebb 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -175,6 +175,7 @@ private:
Status _write_footer();
Status _write_raw_data(const std::vector<Slice>& slices);
void _maybe_invalid_row_cache(const std::string& key);
+ void _maybe_calculate_variant_stats(const vectorized::Block* block, size_t
id, size_t cid);
std::string _encode_keys(const
std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t pos);
// used for unique-key with merge on write and segment min_max key
diff --git a/be/src/olap/rowset/segment_v2/stream_reader.h
b/be/src/olap/rowset/segment_v2/stream_reader.h
index 5b71e00101f..1b18436e086 100644
--- a/be/src/olap/rowset/segment_v2/stream_reader.h
+++ b/be/src/olap/rowset/segment_v2/stream_reader.h
@@ -37,6 +37,7 @@ struct SubstreamIterator {
vectorized::MutableColumnPtr column;
std::unique_ptr<ColumnIterator> iterator;
std::shared_ptr<const vectorized::IDataType> type;
+ std::shared_ptr<vectorized::DataTypeSerDe> serde;
bool inited = false;
size_t rows_read = 0;
SubstreamIterator() = default;
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
index 2f19309520a..087de2b5e02 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
@@ -19,24 +19,121 @@
#include <fmt/core.h>
#include <gen_cpp/segment_v2.pb.h>
+#include <memory>
+#include <set>
+
#include "common/config.h"
#include "common/status.h"
+#include "exec/decompressor.h"
#include "olap/olap_common.h"
+#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/column_writer.h"
#include "olap/segment_loader.h"
+#include "olap/tablet_schema.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_object.h"
#include "vec/columns/columns_number.h"
#include "vec/common/schema_util.h"
+#include "vec/data_types/data_type_factory.hpp"
#include "vec/json/path_in_data.h"
#include "vec/olap/olap_data_convertor.h"
namespace doris::segment_v2 {
+void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const
TabletColumn& column,
+ CompressionTypePB compression_type) {
+ meta->Clear();
+ meta->set_column_id(column_id);
+ meta->set_type(int(column.type()));
+ meta->set_length(column.length());
+ meta->set_encoding(DEFAULT_ENCODING);
+ meta->set_compression(compression_type);
+ meta->set_is_nullable(column.is_nullable());
+ meta->set_default_value(column.default_value());
+ meta->set_precision(column.precision());
+ meta->set_frac(column.frac());
+ if (column.has_path_info()) {
+ column.path_info_ptr()->to_protobuf(meta->mutable_column_path_info(),
+ column.parent_unique_id());
+ }
+ meta->set_unique_id(column.unique_id());
+ for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
+ _init_column_meta(meta->add_children_columns(), column_id,
column.get_sub_column(i),
+ compression_type);
+ }
+ if (column.is_variant_type()) {
+
meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
+ }
+};
+
+Status _create_column_writer(uint32_t cid, const TabletColumn& column,
+ const TabletSchemaSPtr& tablet_schema,
+ InvertedIndexFileWriter*
inverted_index_file_writer,
+ std::unique_ptr<ColumnWriter>* writer,
+ std::unique_ptr<TabletIndex>& subcolumn_index,
+ ColumnWriterOptions* opt, size_t
none_null_value_size) {
+ _init_column_meta(opt->meta, cid, column, opt->compression_type);
+ // record none null value size for statistics
+ opt->meta->set_none_null_size(none_null_value_size);
+ opt->need_zone_map = tablet_schema->keys_type() != KeysType::AGG_KEYS;
+ opt->need_bloom_filter = column.is_bf_column();
+ opt->need_bitmap_index = column.has_bitmap_index();
+ const auto& index =
tablet_schema->inverted_index(column.parent_unique_id());
+
+ // init inverted index
+ if (index != nullptr &&
+
segment_v2::InvertedIndexColumnWriter::check_support_inverted_index(column)) {
+ subcolumn_index = std::make_unique<TabletIndex>(*index);
+
subcolumn_index->set_escaped_escaped_index_suffix_path(column.path_info_ptr()->get_path());
+ opt->inverted_index = subcolumn_index.get();
+ opt->need_inverted_index = true;
+ DCHECK(inverted_index_file_writer != nullptr);
+ opt->inverted_index_file_writer = inverted_index_file_writer;
+ }
+
+#define DISABLE_INDEX_IF_FIELD_TYPE(TYPE, type_name) \
+ if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \
+ opt->need_zone_map = false; \
+ opt->need_bloom_filter = false; \
+ opt->need_bitmap_index = false; \
+ }
+
+ DISABLE_INDEX_IF_FIELD_TYPE(ARRAY, "array")
+ DISABLE_INDEX_IF_FIELD_TYPE(JSONB, "jsonb")
+ DISABLE_INDEX_IF_FIELD_TYPE(VARIANT, "variant")
+
+#undef DISABLE_INDEX_IF_FIELD_TYPE
+
+#undef CHECK_FIELD_TYPE
+
+ RETURN_IF_ERROR(ColumnWriter::create(*opt, &column, opt->file_writer,
writer));
+ RETURN_IF_ERROR((*writer)->init());
+
+ return Status::OK();
+}
+
+Status convert_and_write_column(vectorized::OlapBlockDataConvertor* converter,
+ const TabletColumn& column, ColumnWriter*
writer,
+
+ const vectorized::ColumnPtr& src_column,
size_t num_rows,
+ int column_id) {
+ converter->add_column_data_convertor(column);
+
RETURN_IF_ERROR(converter->set_source_content_with_specifid_column({src_column,
nullptr, ""}, 0,
+
num_rows, column_id));
+ auto [status, converted_column] =
converter->convert_column_data(column_id);
+ RETURN_IF_ERROR(status);
+
+ const uint8_t* nullmap = converted_column->get_nullmap();
+ RETURN_IF_ERROR(writer->append(nullmap, converted_column->get_data(),
num_rows));
+
+ converter->clear_source_content();
+ return Status::OK();
+}
+
VariantColumnWriterImpl::VariantColumnWriterImpl(const ColumnWriterOptions&
opts,
const TabletColumn* column) {
_opts = opts;
@@ -174,6 +271,7 @@ Status
VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject* p
vectorized::make_nullable(std::make_shared<vectorized::ColumnObject::MostCommonType>());
ptr->ensure_root_node_type(expected_root_type);
+ DCHECK_EQ(ptr->get_root()->get_ptr()->size(), num_rows);
converter->add_column_data_convertor(*_tablet_column);
DCHECK_EQ(ptr->get_root()->get_ptr()->size(), num_rows);
RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
@@ -194,6 +292,7 @@ Status
VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject* p
_opts.meta->set_num_rows(num_rows);
return Status::OK();
+ return Status::OK();
}
Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject*
ptr,
@@ -209,13 +308,15 @@ Status
VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt
auto full_path =
full_path_builder.append(_tablet_column->name_lower_case(), false)
.append(entry->path.get_parts(), false)
.build();
- // set unique_id and parent_unique_id, will use parent_unique_id to
get iterator correct
- return vectorized::schema_util::get_column_by_type(
+ // set unique_id and parent_unique_id, will use unique_id to get
iterator correct
+ auto column = vectorized::schema_util::get_column_by_type(
final_data_type_from_object, column_name,
vectorized::schema_util::ExtraInfo {.unique_id =
_tablet_column->unique_id(),
.parent_unique_id =
_tablet_column->unique_id(),
.path_info = full_path});
+ return column;
};
+ _subcolumns_indexes.resize(ptr->get_subcolumns().size());
// convert sub column data from engine format to storage layer format
for (const auto& entry :
vectorized::schema_util::get_sorted_subcolumns(ptr->get_subcolumns())) {
@@ -229,26 +330,45 @@ Status
VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt
continue;
}
CHECK(entry->data.is_finalized());
+
+ // create subcolumn writer
int current_column_id = column_id++;
TabletColumn tablet_column = generate_column_info(entry);
+ ColumnWriterOptions opts;
+ opts.meta = _opts.footer->add_columns();
+ opts.inverted_index_file_writer = _opts.inverted_index_file_writer;
+ opts.compression_type = _opts.compression_type;
+ opts.rowset_ctx = _opts.rowset_ctx;
+ opts.file_writer = _opts.file_writer;
+ std::unique_ptr<ColumnWriter> writer;
vectorized::schema_util::inherit_column_attributes(*_tablet_column,
tablet_column);
- RETURN_IF_ERROR(_create_column_writer(current_column_id, tablet_column,
-
_opts.rowset_ctx->tablet_schema));
- converter->add_column_data_convertor(tablet_column);
- RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
- {entry->data.get_finalized_column_ptr()->get_ptr(),
- entry->data.get_least_common_type(), tablet_column.name()},
- 0, num_rows, current_column_id));
- auto [status, column] =
converter->convert_column_data(current_column_id);
- if (!status.ok()) {
- return status;
- }
- const uint8_t* nullmap = column->get_nullmap();
- RETURN_IF_ERROR(_subcolumn_writers[current_column_id - 1]->append(
- nullmap, column->get_data(), num_rows));
- converter->clear_source_content();
+ RETURN_IF_ERROR(_create_column_writer(
+ current_column_id, tablet_column,
_opts.rowset_ctx->tablet_schema,
+ _opts.inverted_index_file_writer, &writer,
_subcolumns_indexes[current_column_id],
+ &opts, entry->data.get_non_null_value_size()));
+ _subcolumn_writers.push_back(std::move(writer));
+ _subcolumn_opts.push_back(opts);
+
+ // set convertors
+ // converter->add_column_data_convertor(tablet_column);
+ // RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
+ // {entry->data.get_finalized_column_ptr()->get_ptr(),
+ // entry->data.get_least_common_type(), tablet_column.name()},
+ // 0, num_rows, current_column_id));
+ // auto [status, column] =
converter->convert_column_data(current_column_id);
+ // if (!status.ok()) {
+ // return status;
+ // }
+ // const uint8_t* nullmap = column->get_nullmap();
+ // RETURN_IF_ERROR(_subcolumn_writers[current_column_id - 1]->append(
+ // nullmap, column->get_data(), num_rows));
+ // converter->clear_source_content();
_subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows);
+ RETURN_IF_ERROR(convert_and_write_column(
+ converter, tablet_column, _subcolumn_writers[current_column_id
- 1].get(),
+ entry->data.get_finalized_column_ptr()->get_ptr(),
ptr->rows(), current_column_id));
+
// get stastics
_statistics.subcolumns_non_null_size.emplace(entry->path.get_path(),
entry->data.get_non_null_value_size());
@@ -264,7 +384,7 @@ Status VariantColumnWriterImpl::_process_sparse_column(
ColumnWriterOptions sparse_writer_opts;
sparse_writer_opts.meta = _opts.footer->add_columns();
- _init_column_meta(sparse_writer_opts.meta, column_id, sparse_column);
+ _init_column_meta(sparse_writer_opts.meta, column_id, sparse_column,
_opts.compression_type);
RETURN_IF_ERROR(ColumnWriter::create_map_writer(sparse_writer_opts,
&sparse_column,
_opts.file_writer,
&_sparse_column_writer));
RETURN_IF_ERROR(_sparse_column_writer->init());
@@ -306,14 +426,13 @@ Status VariantColumnWriterImpl::_process_sparse_column(
for (const auto& [path, size] : sparse_data_paths_statistics) {
_statistics.sparse_column_non_null_size.emplace(path.to_string(),
size);
}
+ // set statistics info
+ _statistics.to_pb(sparse_writer_opts.meta->mutable_variant_statistics());
sparse_writer_opts.meta->set_num_rows(num_rows);
return Status::OK();
}
void VariantStatistics::to_pb(VariantStatisticsPB* stats) const {
- for (const auto& [path, value] : subcolumns_non_null_size) {
- stats->mutable_subcolumn_non_null_size()->emplace(path, value);
- }
for (const auto& [path, value] : sparse_column_non_null_size) {
stats->mutable_sparse_column_non_null_size()->emplace(path, value);
}
@@ -322,10 +441,6 @@ void VariantStatistics::to_pb(VariantStatisticsPB* stats)
const {
}
void VariantStatistics::from_pb(const VariantStatisticsPB& stats) {
- // make sure the ref of path, todo not use ref
- for (const auto& [path, value] : stats.subcolumn_non_null_size()) {
- subcolumns_non_null_size[path] = value;
- }
for (const auto& [path, value] : stats.sparse_column_non_null_size()) {
sparse_column_non_null_size[path] = value;
}
@@ -359,14 +474,14 @@ Status VariantColumnWriterImpl::finalize() {
// convert root column data from engine format to storage layer format
RETURN_IF_ERROR(_process_root_column(ptr, olap_data_convertor.get(),
num_rows, column_id));
- // process and append each subcolumns to sub columns writers buffer
- RETURN_IF_ERROR(_process_subcolumns(ptr, olap_data_convertor.get(),
num_rows, column_id));
+ if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION) {
+ // process and append each subcolumns to sub columns writers buffer
+ RETURN_IF_ERROR(_process_subcolumns(ptr, olap_data_convertor.get(),
num_rows, column_id));
- // process sparse column and append to sparse writer buffer
- RETURN_IF_ERROR(_process_sparse_column(ptr, olap_data_convertor.get(),
num_rows, column_id));
-
- // set statistics info
- _statistics.to_pb(_opts.meta->mutable_variant_statistics());
+ // process sparse column and append to sparse writer buffer
+ RETURN_IF_ERROR(
+ _process_sparse_column(ptr, olap_data_convertor.get(),
num_rows, column_id));
+ }
_is_finalized = true;
return Status::OK();
@@ -396,7 +511,7 @@ uint64_t VariantColumnWriterImpl::estimate_buffer_size() {
for (auto& column_writer : _subcolumn_writers) {
size += column_writer->estimate_buffer_size();
}
- size += _sparse_column_writer->estimate_buffer_size();
+ size += _sparse_column_writer ?
_sparse_column_writer->estimate_buffer_size() : 0;
return size;
}
@@ -408,7 +523,9 @@ Status VariantColumnWriterImpl::finish() {
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->finish());
}
- RETURN_IF_ERROR(_sparse_column_writer->finish());
+ if (_sparse_column_writer) {
+ RETURN_IF_ERROR(_sparse_column_writer->finish());
+ }
return Status::OK();
}
Status VariantColumnWriterImpl::write_data() {
@@ -419,7 +536,9 @@ Status VariantColumnWriterImpl::write_data() {
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->write_data());
}
- RETURN_IF_ERROR(_sparse_column_writer->write_data());
+ if (_sparse_column_writer) {
+ RETURN_IF_ERROR(_sparse_column_writer->write_data());
+ }
return Status::OK();
}
Status VariantColumnWriterImpl::write_ordinal_index() {
@@ -430,7 +549,9 @@ Status VariantColumnWriterImpl::write_ordinal_index() {
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->write_ordinal_index());
}
- RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index());
+ if (_sparse_column_writer) {
+ RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index());
+ }
return Status::OK();
}
@@ -489,91 +610,126 @@ Status VariantColumnWriterImpl::append_nullable(const
uint8_t* null_map, const u
return Status::OK();
}
-void VariantColumnWriterImpl::_init_column_meta(ColumnMetaPB* meta, uint32_t
column_id,
- const TabletColumn& column) {
- meta->set_column_id(column_id);
- meta->set_type(int(column.type()));
- meta->set_length(column.length());
- meta->set_encoding(DEFAULT_ENCODING);
- meta->set_compression(_opts.compression_type);
- meta->set_is_nullable(column.is_nullable());
- meta->set_default_value(column.default_value());
- meta->set_precision(column.precision());
- meta->set_frac(column.frac());
- if (column.has_path_info()) {
- column.path_info_ptr()->to_protobuf(meta->mutable_column_path_info(),
- column.parent_unique_id());
- }
- meta->set_unique_id(column.unique_id());
- for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
- _init_column_meta(meta->add_children_columns(), column_id,
column.get_sub_column(i));
- }
-};
+VariantSubcolumnWriter::VariantSubcolumnWriter(const ColumnWriterOptions& opts,
+ const TabletColumn* column,
+ std::unique_ptr<Field> field)
+ : ColumnWriter(std::move(field), opts.meta->is_nullable()) {
+ //
+ _tablet_column = column;
+ _opts = opts;
+ _column = vectorized::ColumnObject::create(true);
+}
-Status VariantColumnWriterImpl::_create_column_writer(uint32_t cid, const
TabletColumn& column,
- const TabletSchemaSPtr&
tablet_schema) {
- ColumnWriterOptions opts;
- opts.meta = _opts.footer->add_columns();
-
- _init_column_meta(opts.meta, cid, column);
-
- opts.need_zone_map = tablet_schema->keys_type() != KeysType::AGG_KEYS;
- opts.need_bloom_filter = column.is_bf_column();
-
- // const auto* tablet_index =
tablet_schema->get_ngram_bf_index(parent_column.unique_id());
- // if (tablet_index) {
- // opts.need_bloom_filter = true;
- // opts.is_ngram_bf_index = true;
- // //narrow convert from int32_t to uint8_t and uint16_t which is
dangerous
- // auto gram_size = tablet_index->get_gram_size();
- // auto gram_bf_size = tablet_index->get_gram_bf_size();
- // if (gram_size > 256 || gram_size < 1) {
- // return Status::NotSupported("Do not support ngram bloom filter
for ngram_size: ",
- // gram_size);
- // }
- // if (gram_bf_size > 65535 || gram_bf_size < 64) {
- // return Status::NotSupported("Do not support ngram bloom filter
for bf_size: ",
- // gram_bf_size);
- // }
- // opts.gram_size = gram_size;
- // opts.gram_bf_size = gram_bf_size;
- // }
+Status VariantSubcolumnWriter::init() {
+ return Status::OK();
+}
- opts.need_bitmap_index = column.has_bitmap_index();
- const auto& index =
tablet_schema->inverted_index(column.parent_unique_id());
- if (index != nullptr &&
-
segment_v2::InvertedIndexColumnWriter::check_support_inverted_index(column)) {
- auto subcolumn_index = std::make_unique<TabletIndex>(*index);
-
subcolumn_index->set_escaped_escaped_index_suffix_path(column.path_info_ptr()->get_path());
- opts.inverted_index = subcolumn_index.get();
- opts.need_inverted_index = true;
- DCHECK(_opts.inverted_index_file_writer != nullptr);
- opts.inverted_index_file_writer = _opts.inverted_index_file_writer;
- _subcolumns_indexes.emplace_back(std::move(subcolumn_index));
- }
+Status VariantSubcolumnWriter::append_data(const uint8_t** ptr, size_t
num_rows) {
+ const auto& src = *reinterpret_cast<const vectorized::ColumnObject*>(*ptr);
+ auto* dst_ptr = assert_cast<vectorized::ColumnObject*>(_column.get());
+ dst_ptr->insert_range_from(src, 0, num_rows);
+ return Status::OK();
+}
-#define DISABLE_INDEX_IF_FIELD_TYPE(TYPE, type_name) \
- if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \
- opts.need_zone_map = false; \
- opts.need_bloom_filter = false; \
- opts.need_bitmap_index = false; \
- }
+uint64_t VariantSubcolumnWriter::estimate_buffer_size() {
+ return _column->byte_size();
+}
- DISABLE_INDEX_IF_FIELD_TYPE(ARRAY, "array")
- DISABLE_INDEX_IF_FIELD_TYPE(JSONB, "jsonb")
- DISABLE_INDEX_IF_FIELD_TYPE(VARIANT, "variant")
+bool VariantSubcolumnWriter::is_finalized() const {
+ const auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get());
+ return ptr->is_finalized() && _is_finalized;
+}
-#undef DISABLE_INDEX_IF_FIELD_TYPE
+Status VariantSubcolumnWriter::finalize() {
+ auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get());
+ ptr->finalize();
-#undef CHECK_FIELD_TYPE
+ DCHECK(ptr->is_finalized());
- std::unique_ptr<ColumnWriter> writer;
- RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _opts.file_writer,
&writer));
- RETURN_IF_ERROR(writer->init());
- _subcolumn_writers.push_back(std::move(writer));
- _subcolumn_opts.push_back(opts);
+ TabletColumn flush_column = vectorized::schema_util::get_column_by_type(
+ ptr->get_root_type(), _tablet_column->name(),
+ vectorized::schema_util::ExtraInfo {
+ .unique_id = _tablet_column->unique_id(),
+ .parent_unique_id = _tablet_column->parent_unique_id(),
+ .path_info = *_tablet_column->path_info_ptr()});
+ ColumnWriterOptions opts = _opts;
+ const auto& parent_column =
+
_opts.rowset_ctx->tablet_schema->column_by_uid(_tablet_column->parent_unique_id());
+ // refresh opts and get writer with flush column
+ vectorized::schema_util::inherit_column_attributes(parent_column,
flush_column);
+ RETURN_IF_ERROR(_create_column_writer(
+ 0, flush_column, _opts.rowset_ctx->tablet_schema,
_opts.inverted_index_file_writer,
+ &_writer, _index, &opts,
+ ptr->get_subcolumns().get_root()->data.get_non_null_value_size()));
+ _opts = opts;
+ auto olap_data_convertor =
std::make_unique<vectorized::OlapBlockDataConvertor>();
+ int column_id = 0;
+ RETURN_IF_ERROR(convert_and_write_column(olap_data_convertor.get(),
flush_column, _writer.get(),
+ ptr->get_root()->get_ptr(),
ptr->rows(), column_id));
+ _is_finalized = true;
return Status::OK();
-};
+}
+
+Status VariantSubcolumnWriter::finish() {
+ if (!is_finalized()) {
+ RETURN_IF_ERROR(finalize());
+ }
+ RETURN_IF_ERROR(_writer->finish());
+ return Status::OK();
+}
+Status VariantSubcolumnWriter::write_data() {
+ if (!is_finalized()) {
+ RETURN_IF_ERROR(finalize());
+ }
+ RETURN_IF_ERROR(_writer->write_data());
+ return Status::OK();
+}
+Status VariantSubcolumnWriter::write_ordinal_index() {
+ if (!is_finalized()) {
+ RETURN_IF_ERROR(finalize());
+ }
+ RETURN_IF_ERROR(_writer->write_ordinal_index());
+ return Status::OK();
+}
+
+Status VariantSubcolumnWriter::write_zone_map() {
+ if (!is_finalized()) {
+ RETURN_IF_ERROR(finalize());
+ }
+ if (_opts.need_zone_map) {
+ RETURN_IF_ERROR(_writer->write_zone_map());
+ }
+ return Status::OK();
+}
+
+Status VariantSubcolumnWriter::write_bitmap_index() {
+ return Status::OK();
+}
+Status VariantSubcolumnWriter::write_inverted_index() {
+ if (!is_finalized()) {
+ RETURN_IF_ERROR(finalize());
+ }
+ if (_opts.need_inverted_index) {
+ RETURN_IF_ERROR(_writer->write_inverted_index());
+ }
+ return Status::OK();
+}
+Status VariantSubcolumnWriter::write_bloom_filter_index() {
+ if (!is_finalized()) {
+ RETURN_IF_ERROR(finalize());
+ }
+ if (_opts.need_bloom_filter) {
+ RETURN_IF_ERROR(_writer->write_bloom_filter_index());
+ }
+ return Status::OK();
+}
+
+Status VariantSubcolumnWriter::append_nullable(const uint8_t* null_map, const
uint8_t** ptr,
+ size_t num_rows) {
+ // the root contains the same nullable info
+ RETURN_IF_ERROR(append_data(ptr, num_rows));
+ return Status::OK();
+}
} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
index 5835868c33f..d9974dd6f2e 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
@@ -65,14 +65,9 @@ public:
Status append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows);
private:
- // not including root column
- void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const
TabletColumn& column);
-
// subcolumn path from variant stats info to distinguish from sparse column
Status _get_subcolumn_paths_from_stats(std::set<std::string>& paths);
- Status _create_column_writer(uint32_t cid, const TabletColumn& column,
- const TabletSchemaSPtr& tablet_schema);
Status _process_root_column(vectorized::ColumnObject* ptr,
vectorized::OlapBlockDataConvertor* converter,
size_t num_rows,
int& column_id);
@@ -98,6 +93,7 @@ private:
// staticstics which will be persisted in the footer
VariantStatistics _statistics;
+ // hold the references of subcolumns indexes
std::vector<std::unique_ptr<TabletIndex>> _subcolumns_indexes;
};
} // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 89442676670..14694f8648f 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -164,7 +164,9 @@ void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB*
meta, uint32_t colum
for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
_init_column_meta(meta->add_children_columns(), column_id,
column.get_sub_column(i));
}
-
meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
+ if (column.is_variant_type()) {
+
meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
+ }
}
Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const
TabletColumn& column,
@@ -1025,8 +1027,7 @@ Status VerticalSegmentWriter::batch_block(const
vectorized::Block* block, size_t
// which will be used to contruct the new schema for rowset
Status
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data)
{
if (_tablet_schema->num_variant_columns() == 0 ||
- std::any_of(_tablet_schema->columns().begin(),
_tablet_schema->columns().end(),
- [](const auto& col) { return
col->variant_max_subcolumns_count() > 0; })) {
+ !_tablet_schema->need_record_variant_extended_schema()) {
return Status::OK();
}
size_t column_id = _tablet_schema->num_columns();
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index f493f21ac97..624140d3bf4 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -180,6 +180,7 @@ Status VerticalBetaRowsetWriter<T>::_create_segment_writer(
writer_options.enable_unique_key_merge_on_write =
context.enable_unique_key_merge_on_write;
writer_options.rowset_ctx = &context;
writer_options.max_rows_per_segment = context.max_rows_per_segment;
+ writer_options.write_type = context.write_type;
// TODO if support VerticalSegmentWriter, also need to handle cluster key
primary key index
*writer = std::make_unique<segment_v2::SegmentWriter>(
segment_file_writer.get(), seg_id, context.tablet_schema,
context.tablet,
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 4508e1c4145..782a6dd4eca 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -27,6 +27,8 @@
#include <algorithm>
#include <cctype>
// IWYU pragma: no_include <bits/std_abs.h>
+#include <vec/common/schema_util.h>
+
#include <cmath> // IWYU pragma: keep
#include <memory>
#include <ostream>
@@ -1067,6 +1069,7 @@ void TabletSchema::copy_from(const TabletSchema&
tablet_schema) {
tablet_schema.to_schema_pb(&tablet_schema_pb);
init_from_pb(tablet_schema_pb);
_table_id = tablet_schema.table_id();
+ _path_set_info_map = tablet_schema._path_set_info_map;
}
void TabletSchema::shawdow_copy_without_columns(const TabletSchema&
tablet_schema) {
@@ -1339,6 +1342,10 @@ const TabletColumn&
TabletColumn::sparse_column_at(size_t ordinal) const {
return *_sparse_cols[ordinal];
}
+bool TabletColumn::is_sparse_column() const {
+ return _column_path != nullptr && _column_path->get_relative_path() ==
SPARSE_COLUMN_PATH;
+}
+
const TabletColumn& TabletSchema::column_by_uid(int32_t col_unique_id) const {
return *_cols.at(_field_id_to_index.at(col_unique_id));
}
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 9ff39af14ca..707ecd1bac8 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -165,7 +165,9 @@ public:
// If it is an extracted column from variant column
bool is_extracted_column() const {
return _column_path != nullptr && !_column_path->empty() &&
_parent_col_unique_id > 0;
- };
+ }
+ // If it is sparse column of variant type
+ bool is_sparse_column() const;
std::string suffix_path() const {
return is_extracted_column() ? _column_path->get_path() : "";
}
@@ -404,6 +406,19 @@ public:
void set_storage_page_size(long storage_page_size) { _storage_page_size =
storage_page_size; }
long storage_page_size() const { return _storage_page_size; }
+ // Currently if variant_max_subcolumns_count = 0, then we need to record
variant extended schema
+ // for compability reason
+ bool need_record_variant_extended_schema() const { return
variant_max_subcolumns_count() == 0; }
+
+ int32_t variant_max_subcolumns_count() const {
+ for (const auto& col : _cols) {
+ if (col->is_variant_type()) {
+ return col->variant_max_subcolumns_count();
+ }
+ }
+ return 0;
+ }
+
const std::vector<const TabletIndex*> inverted_indexes() const {
std::vector<const TabletIndex*> inverted_indexes;
for (const auto& index : _indexes) {
@@ -537,6 +552,21 @@ public:
int64_t get_metadata_size() const override;
+ using PathSet = phmap::flat_hash_set<std::string>;
+
+ struct PathsSetInfo {
+ PathSet sub_path_set; // extracted columns
+ PathSet sparse_path_set; // sparse columns
+ };
+
+ const PathsSetInfo& path_set_info(int32_t unique_id) const {
+ return _path_set_info_map.at(unique_id);
+ }
+
+ void set_path_set_info(std::unordered_map<int32_t, PathsSetInfo>&
path_set_info_map) {
+ _path_set_info_map = path_set_info_map;
+ }
+
private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
friend bool operator!=(const TabletSchema& a, const TabletSchema& b);
@@ -591,6 +621,10 @@ private:
bool _variant_enable_flatten_nested = false;
int64_t _vl_field_mem_size {0}; // variable length field
+
+ // key: unique_id of column
+ // value: extracted path set and sparse path set
+ std::unordered_map<int32_t, PathsSetInfo> _path_set_info_map;
};
bool operator==(const TabletSchema& a, const TabletSchema& b);
diff --git a/be/src/vec/columns/column_dummy.h
b/be/src/vec/columns/column_dummy.h
index b8c2363fe6a..27dda8fea1f 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -40,6 +40,7 @@ public:
MutableColumnPtr clone_resized(size_t s) const override { return
clone_dummy(s); }
size_t size() const override { return s; }
+ void resize(size_t _s) override { s = _s; }
void insert_default() override { ++s; }
void pop_back(size_t n) override { s -= n; }
size_t byte_size() const override { return 0; }
diff --git a/be/src/vec/columns/column_object.cpp
b/be/src/vec/columns/column_object.cpp
index 6b2a69ca8c5..d2528bdb76c 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -2252,9 +2252,11 @@ bool ColumnObject::is_null_root() const {
}
bool ColumnObject::is_scalar_variant() const {
- // Only root itself
+ const auto& sparse_offsets = serialized_sparse_column_offsets().data();
+ // Only root itself is scalar, and no sparse data
return !is_null_root() && subcolumns.get_leaves().size() == 1 &&
- subcolumns.get_root()->is_scalar();
+ subcolumns.get_root()->is_scalar() &&
+ sparse_offsets[num_rows - 1] == 0; // no sparse data
}
const DataTypePtr ColumnObject::NESTED_TYPE =
std::make_shared<vectorized::DataTypeNullable>(
diff --git a/be/src/vec/columns/column_object.h
b/be/src/vec/columns/column_object.h
index bef2b62e822..8d90cc1aac1 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -294,7 +294,7 @@ public:
void check_consistency() const;
MutableColumnPtr get_root() {
- if (subcolumns.empty() ||
is_nothing(subcolumns.get_root()->data.get_least_common_type())) {
+ if (subcolumns.empty()) {
return nullptr;
}
return
subcolumns.get_mutable_root()->data.get_finalized_column_ptr()->assume_mutable();
@@ -592,6 +592,16 @@ public:
const auto& value = assert_cast<const
ColumnString&>(column_map.get_values());
return {&key, &value};
}
+
+ ColumnArray::Offsets64& ALWAYS_INLINE serialized_sparse_column_offsets() {
+ auto& column_map = assert_cast<ColumnMap&>(*serialized_sparse_column);
+ return column_map.get_offsets();
+ }
+
+ const ColumnArray::Offsets64& ALWAYS_INLINE
serialized_sparse_column_offsets() const {
+ const auto& column_map = assert_cast<const
ColumnMap&>(*serialized_sparse_column);
+ return column_map.get_offsets();
+ }
// Insert all the data from sparse data with specified path to sub column.
static void fill_path_column_from_sparse_data(Subcolumn& subcolumn,
NullMap* null_map,
StringRef path,
@@ -622,16 +632,6 @@ private:
// unnest nested type columns, and flat them into finlized array subcolumns
void unnest(Subcolumns::NodePtr& entry, Subcolumns& subcolumns) const;
- ColumnArray::Offsets64& ALWAYS_INLINE serialized_sparse_column_offsets() {
- auto& column_map = assert_cast<ColumnMap&>(*serialized_sparse_column);
- return column_map.get_offsets();
- }
-
- const ColumnArray::Offsets64& ALWAYS_INLINE
serialized_sparse_column_offsets() const {
- const auto& column_map = assert_cast<const
ColumnMap&>(*serialized_sparse_column);
- return column_map.get_offsets();
- }
-
void insert_from_sparse_column_and_fill_remaing_dense_column(
const ColumnObject& src,
std::vector<std::pair<std::string_view, Subcolumn>>&&
diff --git a/be/src/vec/common/schema_util.cpp
b/be/src/vec/common/schema_util.cpp
index aa07adf05a7..fb48714ef8a 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -42,6 +42,13 @@
#include "common/status.h"
#include "exprs/json_functions.h"
#include "olap/olap_common.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_fwd.h"
+#include "olap/rowset/segment_v2/variant_column_writer_impl.h"
+#include "olap/segment_loader.h"
+#include "olap/tablet.h"
+#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
@@ -49,6 +56,7 @@
#include "util/defer_op.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
+#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_object.h"
#include "vec/columns/columns_number.h"
@@ -620,11 +628,12 @@ bool has_schema_index_diff(const TabletSchema*
new_schema, const TabletSchema* o
TabletColumn create_sparse_column(const TabletColumn& variant) {
TabletColumn res;
- res.set_name(SPARSE_COLUMN_PATH);
- res.set_unique_id(variant.unique_id());
+ res.set_name(variant.name_lower_case() + "." + SPARSE_COLUMN_PATH);
+ res.set_unique_id(variant.parent_unique_id() > 0 ?
variant.parent_unique_id()
+ : variant.unique_id());
res.set_type(FieldType::OLAP_FIELD_TYPE_MAP);
res.set_aggregation_method(variant.aggregation());
- res.set_path_info(PathInData {SPARSE_COLUMN_PATH});
+ res.set_path_info(PathInData {variant.name_lower_case() + "." +
SPARSE_COLUMN_PATH});
res.set_parent_unique_id(variant.unique_id());
TabletColumn child_tcolumn;
@@ -634,4 +643,179 @@ TabletColumn create_sparse_column(const TabletColumn&
variant) {
return res;
}
+using PathToNoneNullValues = std::unordered_map<std::string, size_t>;
+
+Status collect_path_stats(const RowsetSharedPtr& rs,
+ std::unordered_map<int32_t, PathToNoneNullValues>&
uid_to_path_stats) {
+ SegmentCacheHandle segment_cache;
+ RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
+ std::static_pointer_cast<BetaRowset>(rs), &segment_cache));
+
+ for (const auto& column : rs->tablet_schema()->columns()) {
+ if (!column->is_variant_type()) {
+ continue;
+ }
+
+ for (const auto& segment : segment_cache.get_segments()) {
+ auto column_reader_or =
segment->get_column_reader(column->unique_id());
+ if (!column_reader_or.has_value()) {
+ continue;
+ }
+ auto* column_reader = column_reader_or.value();
+ if (!column_reader) {
+ continue;
+ }
+
+ CHECK(column_reader->get_meta_type() ==
FieldType::OLAP_FIELD_TYPE_VARIANT);
+ const auto* source_stats =
+ static_cast<const
segment_v2::VariantColumnReader*>(column_reader)->get_stats();
+ CHECK(source_stats);
+
+ // 合并子列统计信息
+ for (const auto& [path, size] :
source_stats->subcolumns_non_null_size) {
+ uid_to_path_stats[column->unique_id()][path] += size;
+ }
+
+ // 合并稀疏列统计信息
+ for (const auto& [path, size] :
source_stats->sparse_column_non_null_size) {
+ CHECK(!path.empty());
+ uid_to_path_stats[column->unique_id()][path] += size;
+ }
+ }
+ }
+ return Status::OK();
+}
+
+void get_subpaths(const TabletColumn& variant,
+ const std::unordered_map<int32_t, PathToNoneNullValues>&
path_stats,
+ std::unordered_map<int32_t, TabletSchema::PathsSetInfo>&
uid_to_paths_set_info) {
+ for (const auto& [uid, stats] : path_stats) {
+ if (stats.size() > variant.variant_max_subcolumns_count()) {
+ // 按非空值数量排序
+ std::vector<std::pair<size_t, std::string_view>> paths_with_sizes;
+ paths_with_sizes.reserve(stats.size());
+ for (const auto& [path, size] : stats) {
+ paths_with_sizes.emplace_back(size, path);
+ }
+ std::sort(paths_with_sizes.begin(), paths_with_sizes.end(),
std::greater());
+
+ // 选取前N个路径作为子列,其余路径作为稀疏列
+ for (const auto& [size, path] : paths_with_sizes) {
+ if (uid_to_paths_set_info[uid].sub_path_set.size() <
+ variant.variant_max_subcolumns_count()) {
+ uid_to_paths_set_info[uid].sub_path_set.emplace(path);
+ } else {
+ uid_to_paths_set_info[uid].sparse_path_set.emplace(path);
+ }
+ }
+ } else {
+ // 使用所有路径
+ for (const auto& [path, _] : stats) {
+ uid_to_paths_set_info[uid].sub_path_set.emplace(path);
+ }
+ }
+ }
+}
+
+// Build the temporary schema for compaction
+// 1. collect path stats from all rowsets
+// 2. get the subpaths and sparse paths for each unique id
+// 3. build the output schema with the subpaths and sparse paths
+// 4. set the path set info for each unique id
+// 5. append the subpaths and sparse paths to the output schema
+// 6. return the output schema
+Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets,
+ TabletSchemaSPtr& target) {
+ std::unordered_map<int32_t, PathToNoneNullValues> uid_to_path_stats;
+
+ // 收集统计信息
+ for (const auto& rs : rowsets) {
+ RETURN_IF_ERROR(collect_path_stats(rs, uid_to_path_stats));
+ }
+
+ // 构建输出schema
+ TabletSchemaSPtr output_schema = std::make_shared<TabletSchema>();
+ output_schema->shawdow_copy_without_columns(*target);
+ std::unordered_map<int32_t, TabletSchema::PathsSetInfo>
uid_to_paths_set_info;
+ for (const TabletColumnPtr& column : target->columns()) {
+ output_schema->append_column(*column);
+ if (!column->is_variant_type()) {
+ continue;
+ }
+
+ // 获取子路径
+ get_subpaths(*column, uid_to_path_stats, uid_to_paths_set_info);
+ std::vector<StringRef> sorted_subpaths(
+
uid_to_paths_set_info[column->unique_id()].sub_path_set.begin(),
+ uid_to_paths_set_info[column->unique_id()].sub_path_set.end());
+ std::sort(sorted_subpaths.begin(), sorted_subpaths.end());
+ // 添加子列
+ for (const auto& subpath : sorted_subpaths) {
+ TabletColumn subcolumn;
+ subcolumn.set_name(column->name() + "." + subpath.to_string());
+ subcolumn.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
+ subcolumn.set_parent_unique_id(column->unique_id());
+ subcolumn.set_path_info(PathInData(column->name() + "." +
subpath.to_string()));
+ subcolumn.set_aggregation_method(column->aggregation());
+
subcolumn.set_variant_max_subcolumns_count(column->variant_max_subcolumns_count());
+ subcolumn.set_is_nullable(true);
+ output_schema->append_column(subcolumn);
+ }
+ // 添加稀疏列
+ TabletColumn sparse_column = create_sparse_column(*column);
+ output_schema->append_column(sparse_column);
+ }
+
+ target = output_schema;
+ // used to merge & filter path to sparse column during reading in
compaction
+ target->set_path_set_info(uid_to_paths_set_info);
+ VLOG_DEBUG << "dump schema " << target->dump_full_schema();
+ return Status::OK();
+}
+
+// Calculate statistics about variant data paths from the encoded sparse column
+void calculate_variant_stats(const IColumn& encoded_sparse_column,
+ segment_v2::VariantStatisticsPB* stats) {
+ // Cast input column to ColumnMap type since sparse column is stored as a
map
+ const auto& map_column = assert_cast<const
ColumnMap&>(encoded_sparse_column);
+
+ // Map to store path frequencies - tracks how many times each path appears
+ std::unordered_map<StringRef, size_t> sparse_data_paths_statistics;
+
+ // Get the keys column which contains the paths as strings
+ const auto& sparse_data_paths =
+ assert_cast<const ColumnString*>(map_column.get_keys_ptr().get());
+
+ // Iterate through all paths in the sparse column
+ for (size_t i = 0; i != sparse_data_paths->size(); ++i) {
+ auto path = sparse_data_paths->get_data_at(i);
+
+ // If path already exists in statistics, increment its count
+ if (auto it = sparse_data_paths_statistics.find(path);
+ it != sparse_data_paths_statistics.end()) {
+ ++it->second;
+ }
+ // If path doesn't exist and we haven't hit the max statistics size
limit,
+ // add it with count 1
+ else if (sparse_data_paths_statistics.size() <
+ VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
+ sparse_data_paths_statistics.emplace(path, 1);
+ }
+ }
+
+ // Copy the collected statistics into the protobuf stats object
+ // This maps each path string to its frequency count
+ for (const auto& [path, size] : sparse_data_paths_statistics) {
+ const auto& sparse_path = path.to_string();
+ auto it = stats->sparse_column_non_null_size().find(sparse_path);
+ if (it == stats->sparse_column_non_null_size().end()) {
+ stats->mutable_sparse_column_non_null_size()->emplace(sparse_path,
size);
+ } else {
+ size_t original_size = it->second;
+ stats->mutable_sparse_column_non_null_size()->emplace(sparse_path,
+
original_size + size);
+ }
+ }
+}
+
} // namespace doris::vectorized::schema_util
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 4a916618da6..6e3d049f199 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -27,6 +27,7 @@
#include <string>
#include "common/status.h"
+#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "udf/udf.h"
#include "vec/aggregate_functions/aggregate_function.h"
@@ -41,7 +42,9 @@
namespace doris {
enum class FieldType;
-
+namespace segment_v2 {
+struct VariantStatisticsPB;
+} // namespace segment_v2
namespace vectorized {
class Block;
class IColumn;
@@ -87,7 +90,7 @@ TabletColumn get_column_by_type(const
vectorized::DataTypePtr& data_type, const
// 3. encode sparse sub columns
Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
const ParseConfig& config);
-Status encode_variant_sparse_subcolumns(ColumnObject& column);
+// Status encode_variant_sparse_subcolumns(ColumnObject& column);
// Pick the tablet schema with the highest schema version as the reference.
// Then update all variant columns to there least common types.
@@ -130,4 +133,11 @@ bool has_schema_index_diff(const TabletSchema* new_schema,
const TabletSchema* o
// create ColumnMap<String, String>
TabletColumn create_sparse_column(const TabletColumn& variant);
+// Build the temporary schema for compaction, this will reduce the memory
usage of compacting variant columns
+Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets,
TabletSchemaSPtr& target);
+
+// Calculate statistics about variant data paths from the encoded sparse column
+void calculate_variant_stats(const IColumn& encoded_sparse_column,
+ segment_v2::VariantStatisticsPB* stats);
+
} // namespace doris::vectorized::schema_util
diff --git a/be/src/vec/json/path_in_data.h b/be/src/vec/json/path_in_data.h
index 8d94b02f37a..d4a84323231 100644
--- a/be/src/vec/json/path_in_data.h
+++ b/be/src/vec/json/path_in_data.h
@@ -73,6 +73,11 @@ public:
static UInt128 get_parts_hash(const Parts& parts_);
bool empty() const { return parts.empty(); }
const vectorized::String& get_path() const { return path; }
+ // if path is v.a.b, then relative path will return a.b
+ // make sure the parts is not empty
+ std::string_view get_relative_path() const {
+ return {path.begin() + parts[0].key.size() + 1, path.end()};
+ }
const Parts& get_parts() const { return parts; }
bool is_nested(size_t i) const { return parts[i].is_nested; }
bool has_nested_part() const { return has_nested; }
diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto
index c51982a8dab..37a208ad5b6 100644
--- a/gensrc/proto/segment_v2.proto
+++ b/gensrc/proto/segment_v2.proto
@@ -161,7 +161,6 @@ message ColumnPathInfo {
message VariantStatisticsPB {
// in the order of subcolumns in variant
- map<string, uint32> subcolumn_non_null_size = 1;
map<string, uint32> sparse_column_non_null_size = 2;
}
@@ -205,6 +204,7 @@ message ColumnMetaPB {
optional int32 be_exec_version = 20; // used on agg_state type
optional VariantStatisticsPB variant_statistics = 21; // only used in
variant type
optional int32 variant_max_subcolumns_count = 22 [default = 0];
+ optional uint32 none_null_size = 23;
}
message PrimaryKeyIndexMetaPB {
diff --git a/regression-test/suites/variant_github_events_p2/load.groovy
b/regression-test/suites/variant_github_events_p2/load.groovy
index 48a3507f303..2e06d207d17 100644
--- a/regression-test/suites/variant_github_events_p2/load.groovy
+++ b/regression-test/suites/variant_github_events_p2/load.groovy
@@ -150,6 +150,8 @@ suite("regression_test_variant_github_events_p2",
"nonConcurrent,p2"){
def table_name = "github_events"
sql """DROP TABLE IF EXISTS ${table_name}"""
table_name = "github_events"
+ int rand_subcolumns_count = Math.floor(Math.random() * (611 - 511 + 1)) +
511
+ // int rand_subcolumns_count = 0;
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
k bigint,
@@ -158,7 +160,7 @@ suite("regression_test_variant_github_events_p2",
"nonConcurrent,p2"){
)
DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 4
- properties("replication_num" = "1", "disable_auto_compaction" =
"true", "variant_enable_flatten_nested" = "false");
+ properties("replication_num" = "1", "disable_auto_compaction" =
"true", "variant_enable_flatten_nested" = "false",
"variant_max_subcolumns_count" = "${rand_subcolumns_count}");
"""
// 2015
@@ -227,7 +229,8 @@ suite("regression_test_variant_github_events_p2",
"nonConcurrent,p2"){
)
UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 4
- properties("replication_num" = "1", "disable_auto_compaction" =
"false", "variant_enable_flatten_nested" = "false");
+ properties("replication_num" = "1", "disable_auto_compaction" =
"false", "variant_enable_flatten_nested" = "false",
+ "variant_max_subcolumns_count" =
"${rand_subcolumns_count}");
"""
sql """insert into github_events2 select * from github_events order by k"""
sql """select v['payload']['commits'] from github_events order by k ;"""
diff --git
a/regression-test/suites/variant_p0/update/inverted_index/load.groovy
b/regression-test/suites/variant_p0/update/inverted_index/load.groovy
index 79f602d2a16..495be8bcfca 100644
--- a/regression-test/suites/variant_p0/update/inverted_index/load.groovy
+++ b/regression-test/suites/variant_p0/update/inverted_index/load.groovy
@@ -14,8 +14,8 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
-suite("update_test_index_load", "p0") {
+
+suite("update_test_index_load", "nonConcurrent,p0") {
def load_json_data = {table_name, file_name ->
// load the json data
@@ -61,7 +61,8 @@ suite("update_test_index_load", "p0") {
"replication_num" = "1",
"disable_auto_compaction" = "true",
"bloom_filter_columns" = "v",
- "inverted_index_storage_format" = ${format}
+ "inverted_index_storage_format" = ${format},
+ "variant_max_subcolumns_count" = "9999"
);
"""
@@ -70,6 +71,7 @@ suite("update_test_index_load", "p0") {
}
try {
GetDebugPoint().enableDebugPointForAllBEs("segment_iterator.apply_inverted_index")
+ sql "set enable_common_expr_pushdown = true"
sql """set enable_match_without_inverted_index = false"""
sql """ set inverted_index_skip_threshold = 0 """
sql """ set enable_inverted_index_query = true """
@@ -104,3 +106,4 @@ suite("update_test_index_load", "p0") {
create_table_load_data.call("test_update_index_compact2_v1", "V1")
create_table_load_data.call("test_update_index_compact2_v2", "V2")
}
+
\ No newline at end of file
diff --git
a/regression-test/suites/variant_p0/update/inverted_index/query.groovy
b/regression-test/suites/variant_p0/update/inverted_index/query.groovy
index d5bdfcc5f72..d78f2d41b59 100644
--- a/regression-test/suites/variant_p0/update/inverted_index/query.groovy
+++ b/regression-test/suites/variant_p0/update/inverted_index/query.groovy
@@ -19,7 +19,7 @@ import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility
-suite("update_test_index_query", "p0") {
+suite("update_test_index_query", "nonConcurrent,p0") {
def load_json_data = {table_name, file_name ->
// load the json data
@@ -103,6 +103,7 @@ suite("update_test_index_query", "p0") {
def normal_check = {check_table_name->
try {
GetDebugPoint().enableDebugPointForAllBEs("segment_iterator.apply_inverted_index")
+ sql "set enable_common_expr_pushdown = true"
sql """set enable_match_without_inverted_index = false"""
sql """ set inverted_index_skip_threshold = 0 """
sql """ set enable_inverted_index_query = true """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]