This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push:
new 46e029051d7 add path stats check and fix sparse cache (#48834)
46e029051d7 is described below
commit 46e029051d713a8b008c185e7d6e9bac83547540
Author: lihangyu <[email protected]>
AuthorDate: Mon Mar 10 10:42:30 2025 +0800
add path stats check and fix sparse cache (#48834)
---
be/src/olap/compaction.cpp | 6 ++
be/src/olap/iterators.h | 3 +-
be/src/olap/rowset/segment_v2/column_reader.cpp | 14 ++--
.../rowset/segment_v2/hierarchical_data_reader.h | 35 +++++-----
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 2 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 12 ++--
be/src/olap/rowset/segment_v2/segment_writer.h | 3 +-
.../segment_v2/variant_column_writer_impl.cpp | 12 ++--
be/src/vec/common/schema_util.cpp | 78 ++++++++++++++++------
be/src/vec/common/schema_util.h | 8 ++-
10 files changed, 116 insertions(+), 57 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index acfcc34470f..545096771d1 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -74,6 +74,7 @@
#include "runtime/thread_context.h"
#include "util/time.h"
#include "util/trace.h"
+#include "vec/common/schema_util.h"
using std::vector;
@@ -1329,6 +1330,11 @@ Status Compaction::check_correctness() {
_tablet->tablet_id(), _input_row_num, _stats.merged_rows,
_stats.filtered_rows,
_output_rowset->num_rows());
}
+ if (_tablet->keys_type() == KeysType::DUP_KEYS) {
+ // only check path stats for dup_keys since the rows may be merged in
other models
+
RETURN_IF_ERROR(vectorized::schema_util::check_path_stats(_input_rowsets,
_output_rowset,
+
_tablet->tablet_id()));
+ }
return Status::OK();
}
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 963f4d23598..f0cc7784f5c 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -121,7 +121,8 @@ public:
RowRanges row_ranges;
size_t topn_limit = 0;
// Cache for sparse column data to avoid redundant reads
- vectorized::ColumnPtr sparse_column_cache;
+ // col_unique_id -> cached column_ptr
+ std::unordered_map<int32_t, vectorized::ColumnPtr> sparse_column_cache;
};
struct CompactionSampleInfo {
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 313eca9b016..166dd9f0a97 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -324,9 +324,10 @@ Status
VariantColumnReader::_create_sparse_merge_reader(ColumnIterator** iterato
VLOG_DEBUG << "subcolumns to merge " << src_subcolumns_for_sparse.size();
// Create sparse column merge reader
- *iterator = new SparseColumnMergeReader(
- path_set_info.sub_path_set,
std::unique_ptr<ColumnIterator>(inner_iter),
- std::move(src_subcolumns_for_sparse),
const_cast<StorageReadOptions*>(opts));
+ *iterator = new SparseColumnMergeReader(path_set_info.sub_path_set,
+
std::unique_ptr<ColumnIterator>(inner_iter),
+
std::move(src_subcolumns_for_sparse),
+
const_cast<StorageReadOptions*>(opts), target_col);
return Status::OK();
}
@@ -385,7 +386,7 @@ Status
VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIterator** iter
*iterator = new SparseColumnExtractReader(
relative_path.get_path(),
std::unique_ptr<ColumnIterator>(inner_iter),
// need to modify sparse_column_cache, so use const_cast
here
- const_cast<StorageReadOptions*>(opts));
+ const_cast<StorageReadOptions*>(opts), target_col);
return Status::OK();
}
if (relative_path.get_path() == SPARSE_COLUMN_PATH) {
@@ -465,8 +466,9 @@ Status VariantColumnReader::new_iterator(ColumnIterator**
iterator, const Tablet
ColumnIterator* inner_iter;
RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
DCHECK(opt);
- *iterator = new SparseColumnExtractReader(
- relative_path.get_path(),
std::unique_ptr<ColumnIterator>(inner_iter), nullptr);
+ *iterator = new SparseColumnExtractReader(relative_path.get_path(),
+
std::unique_ptr<ColumnIterator>(inner_iter),
+ nullptr, target_col);
return Status::OK();
}
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index 591b706e0e7..5ea7ac59ad7 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -172,7 +172,7 @@ protected:
vectorized::MutableColumnPtr _sparse_column;
StorageReadOptions* _read_opts; // Shared cache pointer
std::unique_ptr<ColumnIterator> _sparse_column_reader;
-
+ const TabletColumn& _col;
// Pure virtual method for data processing when encounter existing sparse
columns(to be implemented by subclasses)
virtual void
_process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst,
size_t num_rows) =
0;
@@ -182,8 +182,9 @@ protected:
size_t num_rows) = 0;
public:
- BaseSparseColumnProcessor(std::unique_ptr<ColumnIterator>&& reader,
StorageReadOptions* opts)
- : _read_opts(opts), _sparse_column_reader(std::move(reader)) {
+ BaseSparseColumnProcessor(std::unique_ptr<ColumnIterator>&& reader,
StorageReadOptions* opts,
+ const TabletColumn& col)
+ : _read_opts(opts), _sparse_column_reader(std::move(reader)),
_col(col) {
_sparse_column = vectorized::ColumnObject::create_sparse_column_fn();
}
@@ -208,15 +209,17 @@ public:
Status _process_batch(ReadMethod&& read_method, size_t nrows,
vectorized::MutableColumnPtr& dst) {
// Cache check and population logic
- if (_read_opts && _read_opts->sparse_column_cache &&
+ if (_read_opts &&
_read_opts->sparse_column_cache[_col.parent_unique_id()] &&
ColumnReader::is_compaction_reader_type(_read_opts->io_ctx.reader_type)) {
- _sparse_column = _read_opts->sparse_column_cache->assume_mutable();
+ _sparse_column =
+
_read_opts->sparse_column_cache[_col.parent_unique_id()]->assume_mutable();
} else {
_sparse_column->clear();
RETURN_IF_ERROR(read_method());
if (_read_opts) {
- _read_opts->sparse_column_cache =
_sparse_column->assume_mutable();
+ _read_opts->sparse_column_cache[_col.parent_unique_id()] =
+ _sparse_column->get_ptr();
}
}
@@ -231,6 +234,14 @@ public:
}
return Status::OK();
}
+};
+
+// Implementation for path extraction processor
+class SparseColumnExtractReader : public BaseSparseColumnProcessor {
+public:
+ SparseColumnExtractReader(std::string_view path,
std::unique_ptr<ColumnIterator> reader,
+ StorageReadOptions* opts, const TabletColumn&
col)
+ : BaseSparseColumnProcessor(std::move(reader), opts, col),
_path(path) {}
// Batch processing using template method
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool*
has_null) override {
@@ -248,14 +259,6 @@ public:
},
count, dst);
}
-};
-
-// Implementation for path extraction processor
-class SparseColumnExtractReader : public BaseSparseColumnProcessor {
-public:
- SparseColumnExtractReader(std::string_view path,
std::unique_ptr<ColumnIterator> reader,
- StorageReadOptions* opts)
- : BaseSparseColumnProcessor(std::move(reader), opts), _path(path)
{}
private:
std::string _path;
@@ -280,8 +283,8 @@ public:
SparseColumnMergeReader(const TabletSchema::PathSet& path_map,
std::unique_ptr<ColumnIterator>&&
sparse_column_reader,
SubstreamReaderTree&& src_subcolumns_for_sparse,
- StorageReadOptions* opts)
- : BaseSparseColumnProcessor(std::move(sparse_column_reader), opts),
+ StorageReadOptions* opts, const TabletColumn& col)
+ : BaseSparseColumnProcessor(std::move(sparse_column_reader), opts,
col),
_src_subcolumn_map(path_map),
_src_subcolumns_for_sparse(src_subcolumns_for_sparse) {}
Status init(const ColumnIteratorOptions& opts) override;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index a9db0a81307..46af14e7479 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -2024,7 +2024,7 @@ Status
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
bool is_mem_reuse = block->mem_reuse();
DCHECK(is_mem_reuse);
// Clear the sparse column cache before processing a new batch
- _opts.sparse_column_cache = nullptr;
+ _opts.sparse_column_cache.clear();
SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
if (UNLIKELY(!_lazy_inited)) {
RETURN_IF_ERROR(_lazy_init());
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 1720b0ddba4..8d38964dd87 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -809,7 +809,7 @@ Status SegmentWriter::append_block(const vectorized::Block*
block, size_t row_po
// caculate stats for variant type
// TODO it's tricky here, maybe come up with a better idea
- _maybe_calculate_variant_stats(block, id, cid);
+ _maybe_calculate_variant_stats(block, id, cid, row_pos, num_rows);
}
if (_has_key) {
if (_is_mow_with_cluster_key()) {
@@ -1329,8 +1329,11 @@ inline bool SegmentWriter::_is_mow_with_cluster_key() {
// Compaction will extend sparse column and is visible during read and write,
in order to
// persit variant stats info, we should do extra caculation during flushing
segment, otherwise
// the info is lost
-void SegmentWriter::_maybe_calculate_variant_stats(const vectorized::Block*
block, size_t id,
- size_t cid) {
+void SegmentWriter::_maybe_calculate_variant_stats(
+ const vectorized::Block* block,
+ size_t id, // id is the offset of the column in the block
+ size_t cid, // cid is the column id in TabletSchema
+ size_t row_pos, size_t num_rows) {
// Only process sparse columns during compaction
if (!_tablet_schema->columns()[cid]->is_sparse_column() ||
_opts.write_type != DataWriteType::TYPE_COMPACTION) {
@@ -1351,7 +1354,8 @@ void SegmentWriter::_maybe_calculate_variant_stats(const
vectorized::Block* bloc
// Found matching column, calculate statistics
auto* stats = column.mutable_variant_statistics();
-
vectorized::schema_util::calculate_variant_stats(*block->get_by_position(id).column,
stats);
+
vectorized::schema_util::calculate_variant_stats(*block->get_by_position(id).column,
stats,
+ row_pos, num_rows);
VLOG_DEBUG << "sparse stats columns " <<
stats->sparse_column_non_null_size_size();
break;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h
b/be/src/olap/rowset/segment_v2/segment_writer.h
index f505aaeaebb..1f642759413 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -175,7 +175,8 @@ private:
Status _write_footer();
Status _write_raw_data(const std::vector<Slice>& slices);
void _maybe_invalid_row_cache(const std::string& key);
- void _maybe_calculate_variant_stats(const vectorized::Block* block, size_t
id, size_t cid);
+ void _maybe_calculate_variant_stats(const vectorized::Block* block, size_t
id, size_t cid,
+ size_t row_pos, size_t num_rows);
std::string _encode_keys(const
std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t pos);
// used for unique-key with merge on write and segment min_max key
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
index 5c57db390de..34fe6e085ec 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
@@ -83,9 +83,10 @@ Status _create_column_writer(uint32_t cid, const
TabletColumn& column,
opt->need_bloom_filter = column.is_bf_column();
opt->need_bitmap_index = column.has_bitmap_index();
const auto& index =
tablet_schema->inverted_index(column.parent_unique_id());
- VLOG_DEBUG << "column: " << column.name() << " need_inverted_index: " <<
opt->need_inverted_index
- << " need_bloom_filter: " << opt->need_bloom_filter
- << " need_bitmap_index: " << opt->need_bitmap_index;
+ VLOG_DEBUG << "column: " << column.name()
+ << " need_inverted_index: " << opt->need_inverted_index
+ << " need_bloom_filter: " << opt->need_bloom_filter
+ << " need_bitmap_index: " << opt->need_bitmap_index;
// init inverted index
if (index != nullptr &&
@@ -660,8 +661,9 @@ Status VariantSubcolumnWriter::finalize() {
_opts.rowset_ctx->tablet_schema->column_by_uid(_tablet_column->parent_unique_id());
// refresh opts and get writer with flush column
vectorized::schema_util::inherit_column_attributes(parent_column,
flush_column);
- VLOG_DEBUG << "parent_column: " << parent_column.name() << " flush_column:
"
- << flush_column.name() << " is_bf_column: " <<
parent_column.is_bf_column() << " "
+ VLOG_DEBUG << "parent_column: " << parent_column.name()
+ << " flush_column: " << flush_column.name()
+ << " is_bf_column: " << parent_column.is_bf_column() << " "
<< flush_column.is_bf_column();
RETURN_IF_ERROR(_create_column_writer(
0, flush_column, _opts.rowset_ctx->tablet_schema,
_opts.inverted_index_file_writer,
diff --git a/be/src/vec/common/schema_util.cpp
b/be/src/vec/common/schema_util.cpp
index 50c8d0649fe..59d6d9bc1e9 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -724,6 +724,36 @@ void get_subpaths(const TabletColumn& variant,
}
}
+Status check_path_stats(const std::vector<RowsetSharedPtr>& intputs,
RowsetSharedPtr output,
+ int64_t tablet_id) {
+ std::unordered_map<int32_t, PathToNoneNullValues>
original_uid_to_path_stats;
+ for (const auto& rs : intputs) {
+ RETURN_IF_ERROR(collect_path_stats(rs, original_uid_to_path_stats));
+ }
+ std::unordered_map<int32_t, PathToNoneNullValues> output_uid_to_path_stats;
+ RETURN_IF_ERROR(collect_path_stats(output, output_uid_to_path_stats));
+ for (const auto& [uid, stats] : original_uid_to_path_stats) {
+ if (output_uid_to_path_stats.find(uid) ==
output_uid_to_path_stats.end()) {
+ return Status::InternalError("Path stats not found for uid {},
tablet_id {}", uid,
+ tablet_id);
+ }
+ if (stats.size() != output_uid_to_path_stats.at(uid).size()) {
+ return Status::InternalError("Path stats size not match for uid
{}, tablet_id {}", uid,
+ tablet_id);
+ }
+ for (const auto& [path, size] : stats) {
+ if (output_uid_to_path_stats.at(uid).at(path) != size) {
+ return Status::InternalError(
+ "Path stats not match for uid {} with path `{}`, input
size {}, output "
+ "size {}, "
+ "tablet_id {}",
+ uid, path, size,
output_uid_to_path_stats.at(uid).at(path), tablet_id);
+ }
+ }
+ }
+ return Status::OK();
+}
+
// Build the temporary schema for compaction
// 1. collect path stats from all rowsets
// 2. get the subpaths and sparse paths for each unique id
@@ -763,7 +793,8 @@ Status get_compaction_schema(const
std::vector<RowsetSharedPtr>& rowsets,
subcolumn.set_name(column->name_lower_case() + "." +
subpath.to_string());
subcolumn.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
subcolumn.set_parent_unique_id(column->unique_id());
- subcolumn.set_path_info(PathInData(column->name_lower_case() + "."
+ subpath.to_string()));
+ subcolumn.set_path_info(
+ PathInData(column->name_lower_case() + "." +
subpath.to_string()));
subcolumn.set_aggregation_method(column->aggregation());
subcolumn.set_variant_max_subcolumns_count(column->variant_max_subcolumns_count());
subcolumn.set_is_nullable(true);
@@ -783,7 +814,8 @@ Status get_compaction_schema(const
std::vector<RowsetSharedPtr>& rowsets,
// Calculate statistics about variant data paths from the encoded sparse column
void calculate_variant_stats(const IColumn& encoded_sparse_column,
- segment_v2::VariantStatisticsPB* stats) {
+ segment_v2::VariantStatisticsPB* stats, size_t
row_pos,
+ size_t num_rows) {
// Cast input column to ColumnMap type since sparse column is stored as a
map
const auto& map_column = assert_cast<const
ColumnMap&>(encoded_sparse_column);
@@ -793,21 +825,25 @@ void calculate_variant_stats(const IColumn&
encoded_sparse_column,
// Get the keys column which contains the paths as strings
const auto& sparse_data_paths =
assert_cast<const ColumnString*>(map_column.get_keys_ptr().get());
-
+ const auto& serialized_sparse_column_offsets =
+ assert_cast<const
ColumnArray::Offsets64&>(map_column.get_offsets());
// Iterate through all paths in the sparse column
- for (size_t i = 0; i != sparse_data_paths->size(); ++i) {
- auto path = sparse_data_paths->get_data_at(i);
-
- // If path already exists in statistics, increment its count
- if (auto it = sparse_data_paths_statistics.find(path);
- it != sparse_data_paths_statistics.end()) {
- ++it->second;
- }
- // If path doesn't exist and we haven't hit the max statistics size
limit,
- // add it with count 1
- else if (sparse_data_paths_statistics.size() <
- VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
- sparse_data_paths_statistics.emplace(path, 1);
+ for (size_t i = row_pos; i != row_pos + num_rows; ++i) {
+ size_t offset = serialized_sparse_column_offsets[i - 1];
+ size_t end = serialized_sparse_column_offsets[i];
+ for (size_t j = offset; j != end; ++j) {
+ auto path = sparse_data_paths->get_data_at(j);
+ // If path already exists in statistics, increment its count
+ if (auto it = sparse_data_paths_statistics.find(path);
+ it != sparse_data_paths_statistics.end()) {
+ ++it->second;
+ }
+ // If path doesn't exist and we haven't hit the max statistics
size limit,
+ // add it with count 1
+ else if (sparse_data_paths_statistics.size() <
+ VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
+ sparse_data_paths_statistics.emplace(path, 1);
+ }
}
}
@@ -815,13 +851,11 @@ void calculate_variant_stats(const IColumn&
encoded_sparse_column,
// This maps each path string to its frequency count
for (const auto& [path, size] : sparse_data_paths_statistics) {
const auto& sparse_path = path.to_string();
- auto it = stats->sparse_column_non_null_size().find(sparse_path);
- if (it == stats->sparse_column_non_null_size().end()) {
- stats->mutable_sparse_column_non_null_size()->emplace(sparse_path,
size);
+ auto& count_map = *stats->mutable_sparse_column_non_null_size();
+ if (auto it = count_map.find(sparse_path); it != count_map.end()) {
+ it->second += size;
} else {
- size_t original_size = it->second;
- stats->mutable_sparse_column_non_null_size()->emplace(sparse_path,
-
original_size + size);
+ count_map.emplace(sparse_path, size);
}
}
}
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 6e3d049f199..a4101883fc9 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -136,8 +136,14 @@ TabletColumn create_sparse_column(const TabletColumn&
variant);
// Build the temporary schema for compaction, this will reduce the memory
usage of compacting variant columns
Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets,
TabletSchemaSPtr& target);
+// Check if the path stats are consistent between inputs rowsets and output
rowset.
+// Used to check the correctness of compaction.
+Status check_path_stats(const std::vector<RowsetSharedPtr>& intputs,
RowsetSharedPtr output,
+ int64_t tablet_id);
+
// Calculate statistics about variant data paths from the encoded sparse column
void calculate_variant_stats(const IColumn& encoded_sparse_column,
- segment_v2::VariantStatisticsPB* stats);
+ segment_v2::VariantStatisticsPB* stats, size_t
row_pos,
+ size_t num_rows);
} // namespace doris::vectorized::schema_util
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]