This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push:
new 824ed73153a refactor and implement sparse column reader and
stats(#45492)
824ed73153a is described below
commit 824ed73153a18939d15f9b56ff6f1ca6888d0312
Author: lihangyu <[email protected]>
AuthorDate: Mon Dec 16 22:07:10 2024 +0800
refactor and implement sparse column reader and stats(#45492)
---
be/src/olap/compaction.cpp | 1 +
be/src/olap/rowset/rowset_writer_context.h | 3 +
be/src/olap/rowset/segment_v2/column_reader.cpp | 146 ++++++++
be/src/olap/rowset/segment_v2/column_reader.h | 37 +-
be/src/olap/rowset/segment_v2/column_writer.cpp | 6 +-
be/src/olap/rowset/segment_v2/column_writer.h | 4 +-
.../rowset/segment_v2/hierarchical_data_reader.cpp | 364 ++++++++++++++-----
.../rowset/segment_v2/hierarchical_data_reader.h | 197 +++--------
be/src/olap/rowset/segment_v2/segment.cpp | 330 ++++++------------
be/src/olap/rowset/segment_v2/segment.h | 31 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 1 +
be/src/olap/rowset/segment_v2/stream_reader.h | 7 +-
.../segment_v2/variant_column_writer_impl.cpp | 113 +++++-
.../rowset/segment_v2/variant_column_writer_impl.h | 11 +-
.../rowset/segment_v2/vertical_segment_writer.cpp | 1 +
be/src/vec/columns/column_object.cpp | 387 +++++++++++++++++----
be/src/vec/columns/column_object.h | 34 +-
be/src/vec/common/schema_util.cpp | 6 +-
be/src/vec/common/schema_util.h | 1 +
be/src/vec/common/string_buffer.hpp | 86 +++++
.../data_types/serde/data_type_object_serde.cpp | 6 +-
gensrc/proto/segment_v2.proto | 2 +-
22 files changed, 1213 insertions(+), 561 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 68ed0322a9e..1e53ddc7364 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -172,6 +172,7 @@ Status Compaction::merge_input_rowsets() {
}
RowsetWriterContext ctx;
+ ctx.input_rs_readers = input_rs_readers;
RETURN_IF_ERROR(construct_output_rowset_writer(ctx));
// write merged rows to output rowset
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index cb0fda83e60..cbdba6991ae 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -115,6 +115,9 @@ struct RowsetWriterContext {
// For remote rowset
std::optional<StorageResource> storage_resource;
+ // For collect segment statistics for compaction
+ std::vector<RowsetReaderSharedPtr> input_rs_readers;
+
bool is_local_rowset() const { return !storage_resource; }
std::string segment_path(int seg_id) const {
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index b96cf4f7e67..745ff3d93a3 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -44,6 +44,7 @@
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/rowset/segment_v2/bloom_filter_index_reader.h"
#include "olap/rowset/segment_v2/encoding_info.h" // for EncodingInfo
+#include "olap/rowset/segment_v2/hierarchical_data_reader.h"
#include "olap/rowset/segment_v2/inverted_index_file_reader.h"
#include "olap/rowset/segment_v2/inverted_index_reader.h"
#include "olap/rowset/segment_v2/page_decoder.h"
@@ -52,6 +53,7 @@
#include "olap/rowset/segment_v2/page_pointer.h" // for PagePointer
#include "olap/rowset/segment_v2/row_ranges.h"
#include "olap/rowset/segment_v2/segment.h"
+#include "olap/rowset/segment_v2/variant_column_writer_impl.h"
#include "olap/rowset/segment_v2/zone_map_index.h"
#include "olap/tablet_schema.h"
#include "olap/types.h" // for TypeInfo
@@ -220,6 +222,146 @@ Status ColumnReader::create_agg_state(const
ColumnReaderOptions& opts, const Col
agg_state_type->get_name(), int(type));
}
+const SubcolumnColumnReaders::Node* VariantColumnReader::get_reader_by_path(
+ const vectorized::PathInData& relative_path) const {
+ return _subcolumn_readers->find_leaf(relative_path);
+}
+
+Status VariantColumnReader::new_iterator(ColumnIterator** iterator,
+ const TabletColumn& target_col) {
+ // root column use unique id, leaf column use parent_unique_id
+ auto relative_path = target_col.path_info_ptr()->copy_pop_front();
+ const auto* root = _subcolumn_readers->get_root();
+ const auto* node =
+ target_col.has_path_info() ?
_subcolumn_readers->find_exact(relative_path) : nullptr;
+
+ if (node != nullptr) {
+ if (node->is_leaf_node()) {
+ // Node contains column without any child sub columns and no
corresponding sparse columns
+ // Direct read extracted columns
+ const auto* node = _subcolumn_readers->find_leaf(relative_path);
+ RETURN_IF_ERROR(node->data.reader->new_iterator(iterator));
+ } else {
+ // Node contains column with children columns or has correspoding
sparse columns
+ // Create reader with hirachical data.
+ std::unique_ptr<ColumnIterator> sparse_iter;
+ if (!_sparse_column_set_in_stats.empty()) {
+ // Sparse column exists or reached sparse size limit, read
sparse column
+ ColumnIterator* iter;
+ RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&iter));
+ sparse_iter.reset(iter);
+ }
+ // If read the full path of variant read in MERGE_ROOT, otherwise
READ_DIRECT
+ HierarchicalDataReader::ReadType read_type =
+ (relative_path == root->path) ?
HierarchicalDataReader::ReadType::MERGE_ROOT
+ :
HierarchicalDataReader::ReadType::READ_DIRECT;
+ RETURN_IF_ERROR(HierarchicalDataReader::create(iterator,
relative_path, node, root,
+ read_type,
std::move(sparse_iter)));
+ }
+ } else {
+ if (_sparse_column_set_in_stats.contains(StringRef
{relative_path.get_path()}) ||
+ _sparse_column_set_in_stats.size() >
+ VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
+ // Sparse column exists or reached sparse size limit, read sparse
column
+ ColumnIterator* inner_iter;
+ RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
+ *iterator = new SparseColumnExtractReader(relative_path.get_path(),
+
std::unique_ptr<ColumnIterator>(inner_iter));
+ } else {
+ // Sparse column not exists and not reached stats limit, then the
target path is not exist, get a default iterator
+ std::unique_ptr<ColumnIterator> iter;
+ RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &iter));
+ *iterator = iter.release();
+ }
+ }
+ return Status::OK();
+}
+
+Status VariantColumnReader::init(const ColumnReaderOptions& opts, const
SegmentFooterPB& footer,
+ uint32_t column_id, uint64_t num_rows,
+ io::FileReaderSPtr file_reader) {
+ // init sub columns
+ _subcolumn_readers = std::make_unique<SubcolumnColumnReaders>();
+ std::unordered_map<vectorized::PathInData, uint32_t,
vectorized::PathInData::Hash>
+ column_path_to_footer_ordinal;
+ for (uint32_t ordinal = 0; ordinal < footer.columns().size(); ++ordinal) {
+ const auto& column_pb = footer.columns(ordinal);
+ // column path for accessing subcolumns of variant
+ if (column_pb.has_column_path_info()) {
+ vectorized::PathInData path;
+ path.from_protobuf(column_pb.column_path_info());
+ column_path_to_footer_ordinal.emplace(path, ordinal);
+ }
+ }
+
+ const ColumnMetaPB& self_column_pb = footer.columns(column_id);
+ for (const ColumnMetaPB& column_pb : footer.columns()) {
+ if (column_pb.unique_id() != self_column_pb.unique_id()) {
+ continue;
+ }
+ DCHECK(column_pb.has_column_path_info());
+ std::unique_ptr<ColumnReader> reader;
+ RETURN_IF_ERROR(
+ ColumnReader::create(opts, column_pb, footer.num_rows(),
file_reader, &reader));
+ vectorized::PathInData path;
+ path.from_protobuf(column_pb.column_path_info());
+ // init sparse column
+ if (path.get_path() == SPARSE_COLUMN_PATH) {
+ RETURN_IF_ERROR(ColumnReader::create(opts, column_pb,
footer.num_rows(), file_reader,
+ &_sparse_column_reader));
+ continue;
+ }
+ // init subcolumns
+ auto relative_path = path.copy_pop_front();
+ if (_subcolumn_readers->get_root() == nullptr) {
+ _subcolumn_readers->create_root(SubcolumnReader {nullptr,
nullptr});
+ }
+ if (relative_path.empty()) {
+ // root column
+
_subcolumn_readers->get_mutable_root()->modify_to_scalar(SubcolumnReader {
+ std::move(reader),
+
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
+ } else {
+ // check the root is already a leaf node
+ _subcolumn_readers->add(
+ relative_path,
+ SubcolumnReader {
+ std::move(reader),
+
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
+ }
+ }
+
+ // init sparse column set in stats
+ if (self_column_pb.has_variant_statistics()) {
+ const auto& variant_stats = self_column_pb.variant_statistics();
+ for (const auto& [path, _] :
variant_stats.sparse_column_non_null_size()) {
+ _sparse_column_set_in_stats.emplace(path.data(), path.size());
+ }
+ }
+ return Status::OK();
+}
+
+Status ColumnReader::create_variant(const ColumnReaderOptions& opts, const
SegmentFooterPB& footer,
+ uint32_t column_id, uint64_t num_rows,
+ const io::FileReaderSPtr& file_reader,
+ std::unique_ptr<ColumnReader>* reader) {
+ std::unique_ptr<VariantColumnReader> reader_local(new
VariantColumnReader());
+ RETURN_IF_ERROR(reader_local->init(opts, footer, column_id, num_rows,
file_reader));
+ *reader = std::move(reader_local);
+ return Status::OK();
+}
+
+Status ColumnReader::create(const ColumnReaderOptions& opts, const
SegmentFooterPB& footer,
+ uint32_t column_id, uint64_t num_rows,
+ const io::FileReaderSPtr& file_reader,
+ std::unique_ptr<ColumnReader>* reader) {
+ if ((FieldType)footer.columns(column_id).type() !=
FieldType::OLAP_FIELD_TYPE_VARIANT) {
+ return ColumnReader::create(opts, footer.columns(column_id), num_rows,
file_reader, reader);
+ }
+ // create variant column reader with extracted columns info in footer
+ return create_variant(opts, footer, column_id, num_rows, file_reader,
reader);
+}
+
Status ColumnReader::create(const ColumnReaderOptions& opts, const
ColumnMetaPB& meta,
uint64_t num_rows, const io::FileReaderSPtr&
file_reader,
std::unique_ptr<ColumnReader>* reader) {
@@ -706,6 +848,10 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal,
OrdinalPageIndexIterat
return Status::OK();
}
+Status ColumnReader::new_iterator(ColumnIterator** iterator, const
TabletColumn& col) {
+ return new_iterator(iterator);
+}
+
Status ColumnReader::new_iterator(ColumnIterator** iterator) {
if (is_empty()) {
*iterator = new EmptyFileColumnIterator();
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h
b/be/src/olap/rowset/segment_v2/column_reader.h
index d72d802f977..d61393e820c 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -41,6 +41,7 @@
#include "olap/rowset/segment_v2/page_handle.h" // for PageHandle
#include "olap/rowset/segment_v2/page_pointer.h"
#include "olap/rowset/segment_v2/parsed_page.h" // for ParsedPage
+#include "olap/rowset/segment_v2/stream_reader.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "util/once.h"
@@ -78,6 +79,8 @@ class InvertedIndexFileReader;
class PageDecoder;
class RowRanges;
class ZoneMapIndexReader;
+// struct SubcolumnReader;
+// using SubcolumnColumnReaders = vectorized::SubcolumnsTree<SubcolumnReader>;
struct ColumnReaderOptions {
// whether verify checksum when read page
@@ -112,11 +115,16 @@ struct ColumnIteratorOptions {
// This will cache data shared by all reader
class ColumnReader : public MetadataAdder<ColumnReader> {
public:
+ ColumnReader() = default;
// Create an initialized ColumnReader in *reader.
// This should be a lightweight operation without I/O.
static Status create(const ColumnReaderOptions& opts, const ColumnMetaPB&
meta,
uint64_t num_rows, const io::FileReaderSPtr&
file_reader,
std::unique_ptr<ColumnReader>* reader);
+ static Status create(const ColumnReaderOptions& opts, const
SegmentFooterPB& footer,
+ uint32_t column_id, uint64_t num_rows,
+ const io::FileReaderSPtr& file_reader,
+ std::unique_ptr<ColumnReader>* reader);
static Status create_array(const ColumnReaderOptions& opts, const
ColumnMetaPB& meta,
const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
@@ -129,11 +137,16 @@ public:
static Status create_agg_state(const ColumnReaderOptions& opts, const
ColumnMetaPB& meta,
uint64_t num_rows, const
io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
+ static Status create_variant(const ColumnReaderOptions& opts, const
SegmentFooterPB& footer,
+ uint32_t column_id, uint64_t num_rows,
+ const io::FileReaderSPtr& file_reader,
+ std::unique_ptr<ColumnReader>* reader);
enum DictEncodingType { UNKNOWN_DICT_ENCODING, PARTIAL_DICT_ENCODING,
ALL_DICT_ENCODING };
- virtual ~ColumnReader();
+ ~ColumnReader() override;
// create a new column iterator. Client should delete returned iterator
+ virtual Status new_iterator(ColumnIterator** iterator, const TabletColumn&
col);
Status new_iterator(ColumnIterator** iterator);
Status new_array_iterator(ColumnIterator** iterator);
Status new_struct_iterator(ColumnIterator** iterator);
@@ -283,6 +296,28 @@ private:
DorisCallOnce<Status> _set_dict_encoding_type_once;
};
+class VariantColumnReader : public ColumnReader {
+public:
+ VariantColumnReader() = default;
+
+ Status init(const ColumnReaderOptions& opts, const SegmentFooterPB&
footer, uint32_t column_id,
+ uint64_t num_rows, io::FileReaderSPtr file_reader);
+ Status new_iterator(ColumnIterator** iterator, const TabletColumn& col)
override;
+
+ const SubcolumnColumnReaders::Node* get_reader_by_path(
+ const vectorized::PathInData& relative_path) const;
+
+ ~VariantColumnReader() override = default;
+
+private:
+ std::unique_ptr<SubcolumnColumnReaders> _subcolumn_readers;
+ std::unique_ptr<ColumnReader> _sparse_column_reader;
+ // Some sparse column record in stats, use StringRef to reduce memory
usage,
+ // notice: make sure the ref is not released before the ColumnReader is
destructed,
+ // used to decide whether to read from sparse column
+ std::unordered_set<StringRef> _sparse_column_set_in_stats;
+};
+
// Base iterator to read one column data
class ColumnIterator {
public:
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index e3cd3b17144..895589d1cd3 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -1168,7 +1168,11 @@ VariantColumnWriter::VariantColumnWriter(const
ColumnWriterOptions& opts,
const TabletColumn* column,
std::unique_ptr<Field> field)
: ColumnWriter(std::move(field), opts.meta->is_nullable()) {
_impl = std::make_unique<VariantColumnWriterImpl>(opts, column);
-};
+}
+
+Status VariantColumnWriter::init() {
+ return _impl->init();
+}
Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
_next_rowid += num_rows;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h
b/be/src/olap/rowset/segment_v2/column_writer.h
index b664332ea8e..53d7c5d0234 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -73,6 +73,8 @@ struct ColumnWriterOptions {
io::FileWriter* file_writer = nullptr;
CompressionTypePB compression_type = UNKNOWN_COMPRESSION;
RowsetWriterContext* rowset_ctx = nullptr;
+ // For collect segment statistics for compaction
+ std::vector<RowsetReaderSharedPtr> input_rs_readers;
std::string to_string() const {
std::stringstream ss;
ss << std::boolalpha << "meta=" << meta->DebugString()
@@ -480,7 +482,7 @@ public:
~VariantColumnWriter() override = default;
- Status init() override { return Status::OK(); }
+ Status init() override;
Status append_data(const uint8_t** ptr, size_t num_rows) override;
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
index c85e4b429ad..2b8e58d47f1 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
@@ -23,6 +23,7 @@
#include "io/io_common.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "vec/columns/column.h"
+#include "vec/columns/column_map.h"
#include "vec/columns/column_object.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h"
@@ -30,14 +31,12 @@
#include "vec/data_types/data_type_nullable.h"
#include "vec/json/path_in_data.h"
-namespace doris {
-namespace segment_v2 {
+namespace doris::segment_v2 {
-Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
- vectorized::PathInData path,
+Status HierarchicalDataReader::create(ColumnIterator** reader,
vectorized::PathInData path,
const SubcolumnColumnReaders::Node* node,
- const SubcolumnColumnReaders::Node* root,
- ReadType read_type) {
+ const SubcolumnColumnReaders::Node*
root, ReadType read_type,
+ std::unique_ptr<ColumnIterator>&&
sparse_reader) {
// None leave node need merge with root
auto* stream_iter = new HierarchicalDataReader(path);
std::vector<const SubcolumnColumnReaders::Node*> leaves;
@@ -54,14 +53,21 @@ Status
HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
// Eg. {"a" : "b" : {"c" : 1}}, access the `a.b` path and merge with root
path so that
// we could make sure the data could be fully merged, since some column
may not be extracted but remains in root
// like {"a" : "b" : {"e" : 1.1}} in jsonb format
- if (read_type == ReadType::MERGE_SPARSE) {
+ if (read_type == ReadType::MERGE_ROOT) {
ColumnIterator* it;
RETURN_IF_ERROR(root->data.reader->new_iterator(&it));
stream_iter->set_root(std::make_unique<SubstreamIterator>(
root->data.file_column_type->create_column(),
std::unique_ptr<ColumnIterator>(it),
root->data.file_column_type));
}
- reader->reset(stream_iter);
+ // need read from sparse column
+ if (sparse_reader) {
+ vectorized::MutableColumnPtr sparse_column =
+ vectorized::ColumnObject::create_sparse_column_fn();
+ stream_iter->_sparse_column_reader =
std::make_unique<SubstreamIterator>(
+ std::move(sparse_column), std::move(sparse_reader), nullptr);
+ };
+ *reader = stream_iter;
return Status::OK();
}
@@ -104,7 +110,7 @@ Status HierarchicalDataReader::next_batch(size_t* n,
vectorized::MutableColumnPt
CHECK(reader.inited);
RETURN_IF_ERROR(reader.iterator->next_batch(n, reader.column,
has_null));
VLOG_DEBUG << fmt::format("{} next_batch {} rows, type={}",
path.get_path(), *n,
- type->get_name());
+ type ? type->get_name() : "null");
reader.rows_read += *n;
return Status::OK();
},
@@ -119,7 +125,7 @@ Status HierarchicalDataReader::read_by_rowids(const
rowid_t* rowids, const size_
CHECK(reader.inited);
RETURN_IF_ERROR(reader.iterator->read_by_rowids(rowids, count,
reader.column));
VLOG_DEBUG << fmt::format("{} read_by_rowids {} rows,
type={}", path.get_path(),
- count, type->get_name());
+ count, type ? type->get_name() :
"null");
reader.rows_read += count;
return Status::OK();
},
@@ -150,95 +156,295 @@ ordinal_t HierarchicalDataReader::get_current_ordinal()
const {
return (*_substream_reader.begin())->data.iterator->get_current_ordinal();
}
-Status ExtractReader::init(const ColumnIteratorOptions& opts) {
- if (!_root_reader->inited) {
- RETURN_IF_ERROR(_root_reader->iterator->init(opts));
- _root_reader->inited = true;
+Status HierarchicalDataReader::_process_sub_columns(
+ vectorized::ColumnObject& container_variant,
+ const vectorized::PathsWithColumnAndType& non_nested_subcolumns) {
+ for (const auto& entry : non_nested_subcolumns) {
+ DCHECK(!entry.path.has_nested_part());
+ bool add = container_variant.add_sub_column(entry.path,
entry.column->assume_mutable(),
+ entry.type);
+ if (!add) {
+ return Status::InternalError("Duplicated {}, type {}",
entry.path.get_path(),
+ entry.type->get_name());
+ }
}
return Status::OK();
}
-Status ExtractReader::seek_to_first() {
- LOG(FATAL) << "Not implemented";
- __builtin_unreachable();
+Status HierarchicalDataReader::_process_nested_columns(
+ vectorized::ColumnObject& container_variant,
+ const std::map<vectorized::PathInData,
vectorized::PathsWithColumnAndType>&
+ nested_subcolumns) {
+ using namespace vectorized;
+ // Iterate nested subcolumns and flatten them, the entry contains the
nested subcolumns of the same nested parent
+ // first we pick the first subcolumn as base array and using it's offset
info. Then we flatten all nested subcolumns
+ // into a new object column and wrap it with array column using the first
element offsets.The wrapped array column
+ // will type the type of ColumnObject::NESTED_TYPE, whih is
Nullable<ColumnArray<NULLABLE(ColumnObject)>>.
+ for (const auto& entry : nested_subcolumns) {
+ MutableColumnPtr nested_object = ColumnObject::create(true, false);
+ const auto* base_array =
+
check_and_get_column<ColumnArray>(remove_nullable(entry.second[0].column));
+ MutableColumnPtr offset =
base_array->get_offsets_ptr()->assume_mutable();
+ auto* nested_object_ptr =
assert_cast<ColumnObject*>(nested_object.get());
+ // flatten nested arrays
+ for (const auto& subcolumn : entry.second) {
+ const auto& column = subcolumn.column;
+ const auto& type = subcolumn.type;
+ if (!remove_nullable(column)->is_column_array()) {
+ return Status::InvalidArgument(
+ "Meet none array column when flatten nested array,
path {}, type {}",
+ subcolumn.path.get_path(), subcolumn.type->get_name());
+ }
+ const auto* target_array =
+
check_and_get_column<ColumnArray>(remove_nullable(subcolumn.column).get());
+#ifndef NDEBUG
+ if (!base_array->has_equal_offsets(*target_array)) {
+ return Status::InvalidArgument(
+ "Meet none equal offsets array when flatten nested
array, path {}, "
+ "type {}",
+ subcolumn.path.get_path(), subcolumn.type->get_name());
+ }
+#endif
+ MutableColumnPtr flattend_column =
target_array->get_data_ptr()->assume_mutable();
+ DataTypePtr flattend_type =
+
check_and_get_data_type<DataTypeArray>(remove_nullable(type).get())
+ ->get_nested_type();
+ // add sub path without parent prefix
+ nested_object_ptr->add_sub_column(
+
subcolumn.path.copy_pop_nfront(entry.first.get_parts().size()),
+ std::move(flattend_column), std::move(flattend_type));
+ }
+ nested_object =
make_nullable(nested_object->get_ptr())->assume_mutable();
+ auto array =
+ make_nullable(ColumnArray::create(std::move(nested_object),
std::move(offset)));
+ PathInDataBuilder builder;
+ // add parent prefix
+ builder.append(entry.first.get_parts(), false);
+ PathInData parent_path = builder.build();
+ // unset nested parts
+ parent_path.unset_nested();
+ DCHECK(!parent_path.has_nested_part());
+ container_variant.add_sub_column(parent_path, array->assume_mutable(),
+ ColumnObject::NESTED_TYPE);
+ }
+ return Status::OK();
}
-Status ExtractReader::seek_to_ordinal(ordinal_t ord) {
- CHECK(_root_reader->inited);
- return _root_reader->iterator->seek_to_ordinal(ord);
-}
+Status HierarchicalDataReader::_init_container(vectorized::MutableColumnPtr&
container,
+ size_t nrows) {
+ using namespace vectorized;
+ // build variant as container
+ container = ColumnObject::create(true, false);
+ auto& container_variant = assert_cast<ColumnObject&>(*container);
-Status ExtractReader::extract_to(vectorized::MutableColumnPtr& dst, size_t
nrows) {
- DCHECK(_root_reader);
- DCHECK(_root_reader->inited);
- vectorized::ColumnNullable* nullable_column = nullptr;
- if (dst->is_nullable()) {
- nullable_column = assert_cast<vectorized::ColumnNullable*>(dst.get());
+ // add root first
+ if (_path.get_parts().empty() && _root_reader) {
+ auto& root_var =
+ _root_reader->column->is_nullable()
+ ? assert_cast<vectorized::ColumnObject&>(
+
assert_cast<vectorized::ColumnNullable&>(*_root_reader->column)
+ .get_nested_column())
+ :
assert_cast<vectorized::ColumnObject&>(*_root_reader->column);
+ auto column = root_var.get_root();
+ auto type = root_var.get_root_type();
+ container_variant.add_sub_column({}, std::move(column), type);
}
- auto& variant =
- nullable_column == nullptr
- ? assert_cast<vectorized::ColumnObject&>(*dst)
- :
assert_cast<vectorized::ColumnObject&>(nullable_column->get_nested_column());
- const auto& root =
- _root_reader->column->is_nullable()
- ? assert_cast<vectorized::ColumnObject&>(
-
assert_cast<vectorized::ColumnNullable&>(*_root_reader->column)
- .get_nested_column())
- : assert_cast<const
vectorized::ColumnObject&>(*_root_reader->column);
- // extract root value with path, we can't modify the original root column
- // since some other column may depend on it.
- vectorized::MutableColumnPtr extracted_column;
- RETURN_IF_ERROR(root.extract_root( // trim the root name, eg. v.a.b -> a.b
- _col.path_info_ptr()->copy_pop_front(), extracted_column));
-
- if (_target_type_hint != nullptr) {
- variant.create_root(_target_type_hint,
_target_type_hint->create_column());
+ // parent path -> subcolumns
+ std::map<PathInData, PathsWithColumnAndType> nested_subcolumns;
+ PathsWithColumnAndType non_nested_subcolumns;
+ RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
+ MutableColumnPtr column = node.data.column->get_ptr();
+ PathInData relative_path =
node.path.copy_pop_nfront(_path.get_parts().size());
+
+ if (node.path.has_nested_part()) {
+
CHECK_EQ(getTypeName(remove_nullable(node.data.type)->get_type_id()),
+ getTypeName(TypeIndex::Array));
+ PathInData parent_path =
+
node.path.get_nested_prefix_path().copy_pop_nfront(_path.get_parts().size());
+ nested_subcolumns[parent_path].emplace_back(relative_path,
column->get_ptr(),
+ node.data.type);
+ } else {
+ non_nested_subcolumns.emplace_back(relative_path,
column->get_ptr(), node.data.type);
+ }
+ return Status::OK();
+ }));
+
+ RETURN_IF_ERROR(_process_sub_columns(container_variant,
non_nested_subcolumns));
+
+ RETURN_IF_ERROR(_process_nested_columns(container_variant,
nested_subcolumns));
+
+ RETURN_IF_ERROR(_process_sparse_column(container_variant, nrows));
+ return Status::OK();
+}
+
+// Return sub-path by specified prefix.
+// For example, for prefix a.b:
+// a.b.c.d -> c.d, a.b.c -> c
+static std::string_view get_sub_path(const std::string_view& path, const
std::string_view& prefix) {
+ return path.substr(prefix.size() + 1);
+}
+
+Status
HierarchicalDataReader::_process_sparse_column(vectorized::ColumnObject&
container_variant,
+ size_t nrows) {
+ using namespace vectorized;
+ if (!_sparse_column_reader) {
+
container_variant.get_sparse_column()->assume_mutable()->insert_many_defaults(nrows);
+ return Status::OK();
}
- if (variant.empty() || variant.is_null_root()) {
- variant.create_root(root.get_root_type(), std::move(extracted_column));
+ // process sparse column
+ if (_path.get_parts().empty()) {
+ // directly use sparse column if access root
+
container_variant.set_sparse_column(_sparse_column_reader->column->get_ptr());
} else {
- vectorized::ColumnPtr cast_column;
- const auto& expected_type = variant.get_root_type();
- RETURN_IF_ERROR(vectorized::schema_util::cast_column(
- {extracted_column->get_ptr(),
- vectorized::make_nullable(
-
std::make_shared<vectorized::ColumnObject::MostCommonType>()),
- ""},
- expected_type, &cast_column));
- variant.get_root()->insert_range_from(*cast_column, 0, nrows);
- // variant.set_num_rows(variant.get_root()->size());
+ const auto& offsets =
+ assert_cast<const
ColumnMap&>(*_sparse_column_reader->column).get_offsets();
+ /// Check if there is no data in shared data in current range.
+ if (offsets.back() == offsets[-1]) {
+
container_variant.get_sparse_column()->assume_mutable()->insert_many_defaults(nrows);
+ } else {
+ // Read for variant sparse column
+ // Example path: a.b
+ // data: a.b.c : int|123
+ // a.b.d : string|"456"
+ // a.e.d : string|"789"
+ // then the extracted sparse column will be:
+ // c : int|123
+ // d : string|"456"
+ const auto& sparse_data_map =
+ assert_cast<const
ColumnMap&>(*_sparse_column_reader->column);
+ const auto& src_sparse_data_offsets =
sparse_data_map.get_offsets();
+ const auto& src_sparse_data_paths =
+ assert_cast<const
ColumnString&>(sparse_data_map.get_keys());
+ const auto& src_sparse_data_values =
+ assert_cast<const
ColumnString&>(sparse_data_map.get_values());
+
+ auto& sparse_data_offsets =
+ assert_cast<ColumnMap&>(
+
*container_variant.get_sparse_column()->assume_mutable())
+ .get_offsets();
+ auto [sparse_data_paths, sparse_data_values] =
+ container_variant.get_sparse_data_paths_and_values();
+ StringRef prefix_ref(_path.get_path());
+ std::string_view path_prefix(prefix_ref.data, prefix_ref.size);
+ for (size_t i = 0; i != src_sparse_data_offsets.size(); ++i) {
+ size_t start = src_sparse_data_offsets[ssize_t(i) - 1];
+ size_t end = src_sparse_data_offsets[ssize_t(i)];
+ size_t lower_bound_index =
+
vectorized::ColumnObject::find_path_lower_bound_in_sparse_data(
+ prefix_ref, src_sparse_data_paths, start, end);
+ for (; lower_bound_index != end; ++lower_bound_index) {
+ auto path_ref =
src_sparse_data_paths.get_data_at(lower_bound_index);
+ std::string_view path(path_ref.data, path_ref.size);
+ if (!path.starts_with(path_prefix)) {
+ break;
+ }
+ // Don't include path that is equal to the prefix.
+ if (path.size() != path_prefix.size()) {
+ auto sub_path = get_sub_path(path, path_prefix);
+ sparse_data_paths->insert_data(sub_path.data(),
sub_path.size());
+
sparse_data_values->insert_from(src_sparse_data_values, lower_bound_index);
+ }
+ }
+ sparse_data_offsets.push_back(sparse_data_paths->size());
+ }
+ }
}
- if (dst->is_nullable()) {
- // fill nullmap
- vectorized::ColumnUInt8& dst_null_map =
-
assert_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column();
- vectorized::ColumnUInt8& src_null_map =
-
assert_cast<vectorized::ColumnNullable&>(*variant.get_root()).get_null_map_column();
- dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size());
+ return Status::OK();
+}
+
+Status HierarchicalDataReader::_init_null_map_and_clear_columns(
+ vectorized::MutableColumnPtr& container, vectorized::MutableColumnPtr&
dst, size_t nrows) {
+ using namespace vectorized;
+ // clear data in nodes
+ RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
+ node.data.column->clear();
+ return Status::OK();
+ }));
+ container->clear();
+ _sparse_column_reader->column->clear();
+ if (_root_reader) {
+ if (_root_reader->column->is_nullable()) {
+ // fill nullmap
+ DCHECK(dst->is_nullable());
+ ColumnUInt8& dst_null_map =
assert_cast<ColumnNullable&>(*dst).get_null_map_column();
+ ColumnUInt8& src_null_map =
+
assert_cast<ColumnNullable&>(*_root_reader->column).get_null_map_column();
+ dst_null_map.insert_range_from(src_null_map, 0,
src_null_map.size());
+ // clear nullmap and inner data
+ src_null_map.clear();
+ assert_cast<ColumnObject&>(
+
assert_cast<ColumnNullable&>(*_root_reader->column).get_nested_column())
+ .clear_column_data();
+ } else {
+ auto& root_column =
assert_cast<ColumnObject&>(*_root_reader->column);
+ root_column.clear_column_data();
+ }
+ } else {
+ if (dst->is_nullable()) {
+ // No nullable info exist in hirearchical data, fill nullmap with
all none null
+ ColumnUInt8& dst_null_map =
assert_cast<ColumnNullable&>(*dst).get_null_map_column();
+ auto fake_nullable_column = ColumnUInt8::create(nrows, 0);
+ dst_null_map.insert_range_from(*fake_nullable_column, 0, nrows);
+ }
}
- _root_reader->column->clear();
-#ifndef NDEBUG
- variant.check_consistency();
-#endif
return Status::OK();
}
-Status ExtractReader::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
bool* has_null) {
- RETURN_IF_ERROR(_root_reader->iterator->next_batch(n,
_root_reader->column));
- RETURN_IF_ERROR(extract_to(dst, *n));
+Status SparseColumnExtractReader::init(const ColumnIteratorOptions& opts) {
+ return _sparse_column_reader->init(opts);
+}
+
+Status SparseColumnExtractReader::seek_to_first() {
+ return _sparse_column_reader->seek_to_first();
+}
+
+Status SparseColumnExtractReader::seek_to_ordinal(ordinal_t ord) {
+ return _sparse_column_reader->seek_to_ordinal(ord);
+}
+
+void
SparseColumnExtractReader::_fill_path_column(vectorized::MutableColumnPtr& dst)
{
+ vectorized::ColumnObject& var =
+ dst->is_nullable()
+ ? assert_cast<vectorized::ColumnObject&>(
+
assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column())
+ : assert_cast<vectorized::ColumnObject&>(*dst);
+ DCHECK(!var.is_null_root());
+ vectorized::ColumnObject::fill_path_olumn_from_sparse_data(
+ *var.get_subcolumn({}) /*root*/, StringRef {_path.data(),
_path.size()},
+ _sparse_column->get_ptr(), 0, _sparse_column->size());
+ _sparse_column->clear();
+}
+
+Status SparseColumnExtractReader::next_batch(size_t* n,
vectorized::MutableColumnPtr& dst,
+ bool* has_null) {
+ _sparse_column->clear();
+ RETURN_IF_ERROR(_sparse_column_reader->next_batch(n, _sparse_column,
has_null));
+ const auto& offsets = assert_cast<const
vectorized::ColumnMap&>(*_sparse_column).get_offsets();
+ // Check if we don't have any paths in shared data in current range.
+ if (offsets.back() == offsets[-1]) {
+ dst->insert_many_defaults(*n);
+ } else {
+ _fill_path_column(dst);
+ }
return Status::OK();
}
-Status ExtractReader::read_by_rowids(const rowid_t* rowids, const size_t count,
- vectorized::MutableColumnPtr& dst) {
- RETURN_IF_ERROR(_root_reader->iterator->read_by_rowids(rowids, count,
_root_reader->column));
- RETURN_IF_ERROR(extract_to(dst, count));
+Status SparseColumnExtractReader::read_by_rowids(const rowid_t* rowids, const
size_t count,
+ vectorized::MutableColumnPtr&
dst) {
+ _sparse_column->clear();
+ RETURN_IF_ERROR(_sparse_column_reader->read_by_rowids(rowids, count,
_sparse_column));
+ const auto& offsets = assert_cast<const
vectorized::ColumnMap&>(*_sparse_column).get_offsets();
+ // Check if we don't have any paths in shared data in current range.
+ if (offsets.back() == offsets[-1]) {
+ dst->insert_many_defaults(count);
+ } else {
+ _fill_path_column(dst);
+ }
return Status::OK();
}
-ordinal_t ExtractReader::get_current_ordinal() const {
- return _root_reader->iterator->get_current_ordinal();
+ordinal_t SparseColumnExtractReader::get_current_ordinal() const {
+ return _sparse_column_reader->get_current_ordinal();
}
-} // namespace segment_v2
-} // namespace doris
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index 6c8ced89cd2..5d58f666f62 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -18,8 +18,11 @@
#pragma once
#include <memory>
+#include <string_view>
#include <unordered_map>
+#include <utility>
+#include "common/status.h"
#include "io/io_common.h"
#include "olap/field.h"
#include "olap/iterators.h"
@@ -49,13 +52,14 @@ namespace doris::segment_v2 {
class HierarchicalDataReader : public ColumnIterator {
public:
// Currently two types of read, merge sparse columns with root columns, or
read directly
- enum class ReadType { MERGE_SPARSE, READ_DIRECT };
+ enum class ReadType { MERGE_ROOT, READ_DIRECT };
HierarchicalDataReader(const vectorized::PathInData& path) : _path(path) {}
- static Status create(std::unique_ptr<ColumnIterator>* reader,
vectorized::PathInData path,
+ static Status create(ColumnIterator** reader, vectorized::PathInData path,
const SubcolumnColumnReaders::Node* target_node,
- const SubcolumnColumnReaders::Node* root, ReadType
read_type);
+ const SubcolumnColumnReaders::Node* root, ReadType
read_type,
+ std::unique_ptr<ColumnIterator>&& sparse_reader);
Status init(const ColumnIteratorOptions& opts) override;
@@ -77,6 +81,7 @@ public:
private:
SubstreamReaderTree _substream_reader;
std::unique_ptr<SubstreamIterator> _root_reader;
+ std::unique_ptr<SubstreamIterator> _sparse_column_reader;
size_t _rows_read = 0;
vectorized::PathInData _path;
@@ -87,6 +92,29 @@ private:
}
return Status::OK();
}
+
+ Status _process_sub_columns(vectorized::ColumnObject& container_variant,
+ const vectorized::PathsWithColumnAndType&
non_nested_subcolumns);
+
+ Status _process_nested_columns(
+ vectorized::ColumnObject& container_variant,
+ const std::map<vectorized::PathInData,
vectorized::PathsWithColumnAndType>&
+ nested_subcolumns);
+
+ Status _process_sparse_column(vectorized::ColumnObject& container_variant,
size_t nrows);
+
+ // 1. add root column
+ // 2. collect path for subcolumns and nested subcolumns
+ // 3. init container with subcolumns
+ // 4. init container with nested subcolumns
+ // 5. init container with sparse column
+ Status _init_container(vectorized::MutableColumnPtr& container, size_t
nrows);
+
+ // clear all subcolumns's column data for next batch read
+ // set null map for nullable column
+ Status _init_null_map_and_clear_columns(vectorized::MutableColumnPtr&
container,
+ vectorized::MutableColumnPtr& dst,
size_t nrows);
+
// process read
template <typename ReadFunction>
Status process_read(ReadFunction&& read_func,
vectorized::MutableColumnPtr& dst, size_t nrows) {
@@ -112,162 +140,36 @@ private:
return Status::OK();
}));
- // build variant as container
- auto container = ColumnObject::create(true, false);
- auto& container_variant = assert_cast<ColumnObject&>(*container);
-
- // add root first
- if (_path.get_parts().empty() && _root_reader) {
- auto& root_var =
- _root_reader->column->is_nullable()
- ? assert_cast<vectorized::ColumnObject&>(
- assert_cast<vectorized::ColumnNullable&>(
- *_root_reader->column)
- .get_nested_column())
- :
assert_cast<vectorized::ColumnObject&>(*_root_reader->column);
- auto column = root_var.get_root();
- auto type = root_var.get_root_type();
- container_variant.add_sub_column({}, std::move(column), type);
+ // read sparse column
+ if (_sparse_column_reader) {
+ RETURN_IF_ERROR(read_func(*_sparse_column_reader, {}, nullptr));
}
- // parent path -> subcolumns
- std::map<PathInData, PathsWithColumnAndType> nested_subcolumns;
- PathsWithColumnAndType non_nested_subcolumns;
- RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
- MutableColumnPtr column = node.data.column->get_ptr();
- PathInData relative_path =
node.path.copy_pop_nfront(_path.get_parts().size());
-
- if (node.path.has_nested_part()) {
-
CHECK_EQ(getTypeName(remove_nullable(node.data.type)->get_type_id()),
- getTypeName(TypeIndex::Array));
- PathInData parent_path =
node.path.get_nested_prefix_path().copy_pop_nfront(
- _path.get_parts().size());
- nested_subcolumns[parent_path].emplace_back(relative_path,
column->get_ptr(),
- node.data.type);
- } else {
- non_nested_subcolumns.emplace_back(relative_path,
column->get_ptr(),
- node.data.type);
- }
- return Status::OK();
- }));
- for (auto& entry : non_nested_subcolumns) {
- DCHECK(!entry.path.has_nested_part());
- bool add = container_variant.add_sub_column(entry.path,
entry.column->assume_mutable(),
- entry.type);
- if (!add) {
- return Status::InternalError("Duplicated {}, type {}",
entry.path.get_path(),
- entry.type->get_name());
- }
- }
- // Iterate nested subcolumns and flatten them, the entry contains the
nested subcolumns of the same nested parent
- // first we pick the first subcolumn as base array and using it's
offset info. Then we flatten all nested subcolumns
- // into a new object column and wrap it with array column using the
first element offsets.The wrapped array column
- // will type the type of ColumnObject::NESTED_TYPE, whih is
Nullable<ColumnArray<NULLABLE(ColumnObject)>>.
- for (auto& entry : nested_subcolumns) {
- MutableColumnPtr nested_object = ColumnObject::create(true, false);
- const auto* base_array =
-
check_and_get_column<ColumnArray>(remove_nullable(entry.second[0].column));
- MutableColumnPtr offset =
base_array->get_offsets_ptr()->assume_mutable();
- auto* nested_object_ptr =
assert_cast<ColumnObject*>(nested_object.get());
- // flatten nested arrays
- for (const auto& subcolumn : entry.second) {
- const auto& column = subcolumn.column;
- const auto& type = subcolumn.type;
- if (!remove_nullable(column)->is_column_array()) {
- return Status::InvalidArgument(
- "Meet none array column when flatten nested array,
path {}, type {}",
- subcolumn.path.get_path(),
subcolumn.type->get_name());
- }
- const auto* target_array =
-
check_and_get_column<ColumnArray>(remove_nullable(subcolumn.column).get());
-#ifndef NDEBUG
- if (!base_array->has_equal_offsets(*target_array)) {
- return Status::InvalidArgument(
- "Meet none equal offsets array when flatten nested
array, path {}, "
- "type {}",
- subcolumn.path.get_path(),
subcolumn.type->get_name());
- }
-#endif
- MutableColumnPtr flattend_column =
target_array->get_data_ptr()->assume_mutable();
- DataTypePtr flattend_type =
-
check_and_get_data_type<DataTypeArray>(remove_nullable(type).get())
- ->get_nested_type();
- // add sub path without parent prefix
- nested_object_ptr->add_sub_column(
-
subcolumn.path.copy_pop_nfront(entry.first.get_parts().size()),
- std::move(flattend_column), std::move(flattend_type));
- }
- nested_object =
make_nullable(nested_object->get_ptr())->assume_mutable();
- auto array =
-
make_nullable(ColumnArray::create(std::move(nested_object), std::move(offset)));
- PathInDataBuilder builder;
- // add parent prefix
- builder.append(entry.first.get_parts(), false);
- PathInData parent_path = builder.build();
- // unset nested parts
- parent_path.unset_nested();
- DCHECK(!parent_path.has_nested_part());
- container_variant.add_sub_column(parent_path,
array->assume_mutable(),
- ColumnObject::NESTED_TYPE);
- }
+ MutableColumnPtr container;
+ RETURN_IF_ERROR(_init_container(container, nrows));
+ auto& container_variant = assert_cast<ColumnObject&>(*container);
- // TODO select v:b -> v.b / v.b.c but v.d maybe in v
- // copy container variant to dst variant, todo avoid copy
variant.insert_range_from(container_variant, 0, nrows);
- // variant.set_num_rows(nrows);
_rows_read += nrows;
variant.finalize();
#ifndef NDEBUG
variant.check_consistency();
#endif
- // clear data in nodes
- RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
- node.data.column->clear();
- return Status::OK();
- }));
- container->clear();
- if (_root_reader) {
- if (_root_reader->column->is_nullable()) {
- // fill nullmap
- DCHECK(dst->is_nullable());
- ColumnUInt8& dst_null_map =
-
assert_cast<ColumnNullable&>(*dst).get_null_map_column();
- ColumnUInt8& src_null_map =
-
assert_cast<ColumnNullable&>(*_root_reader->column).get_null_map_column();
- dst_null_map.insert_range_from(src_null_map, 0,
src_null_map.size());
- // clear nullmap and inner data
- src_null_map.clear();
- assert_cast<ColumnObject&>(
-
assert_cast<ColumnNullable&>(*_root_reader->column).get_nested_column())
- .clear_column_data();
- } else {
- ColumnObject& root_column =
assert_cast<ColumnObject&>(*_root_reader->column);
- root_column.clear_column_data();
- }
- } else {
- if (dst->is_nullable()) {
- // No nullable info exist in hirearchical data, fill nullmap
with all none null
- ColumnUInt8& dst_null_map =
-
assert_cast<ColumnNullable&>(*dst).get_null_map_column();
- auto fake_nullable_column = ColumnUInt8::create(nrows, 0);
- dst_null_map.insert_range_from(*fake_nullable_column, 0,
nrows);
- }
- }
+ RETURN_IF_ERROR(_init_null_map_and_clear_columns(container, dst,
nrows));
return Status::OK();
}
};
-// Extract from root column of variant, since root column of variant
-// encodes sparse columns that are not materialized
-class ExtractReader : public ColumnIterator {
+// Extract path from sparse column
+class SparseColumnExtractReader : public ColumnIterator {
public:
- ExtractReader(const TabletColumn& col,
std::unique_ptr<SubstreamIterator>&& root_reader,
- vectorized::DataTypePtr target_type_hint)
- : _col(col),
- _root_reader(std::move(root_reader)),
- _target_type_hint(target_type_hint) {}
+ SparseColumnExtractReader(std::string path,
+ std::unique_ptr<ColumnIterator>&&
sparse_column_reader)
+ : _path(std::move(path)),
_sparse_column_reader(std::move(sparse_column_reader)) {
+ _sparse_column = vectorized::ColumnObject::create_sparse_column_fn();
+ }
Status init(const ColumnIteratorOptions& opts) override;
@@ -283,12 +185,11 @@ public:
ordinal_t get_current_ordinal() const override;
private:
- Status extract_to(vectorized::MutableColumnPtr& dst, size_t nrows);
-
- TabletColumn _col;
+ void _fill_path_column(vectorized::MutableColumnPtr& dst);
+ vectorized::MutableColumnPtr _sparse_column;
+ std::string _path;
// may shared among different column iterators
- std::unique_ptr<SubstreamIterator> _root_reader;
- vectorized::DataTypePtr _target_type_hint;
+ std::unique_ptr<ColumnIterator> _sparse_column_reader;
};
} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index a50ada112f9..441e839e6ef 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -43,7 +43,6 @@
#include "olap/rowset/rowset_reader_context.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/rowset/segment_v2/empty_segment_iterator.h"
-#include "olap/rowset/segment_v2/hierarchical_data_reader.h"
#include "olap/rowset/segment_v2/indexed_column_reader.h"
#include "olap/rowset/segment_v2/inverted_index_file_reader.h"
#include "olap/rowset/segment_v2/page_io.h"
@@ -51,6 +50,7 @@
#include "olap/rowset/segment_v2/segment_iterator.h"
#include "olap/rowset/segment_v2/segment_writer.h" // k_segment_magic_length
#include "olap/rowset/segment_v2/stream_reader.h"
+#include "olap/rowset/segment_v2/variant_column_writer_impl.h"
#include "olap/schema.h"
#include "olap/short_key_index.h"
#include "olap/tablet_schema.h"
@@ -201,9 +201,24 @@ Status Segment::_open() {
// 0.01 comes from PrimaryKeyIndexBuilder::init
_meta_mem_usage += BloomFilter::optimal_bit_num(_num_rows, 0.01) / 8;
+ uint32_t ordinal = 0;
+ for (const auto& column_meta : _footer_pb->columns()) {
+ // unique_id < 0 means this column is extracted column from variant
+ if (static_cast<int>(column_meta.unique_id()) >= 0) {
+ _column_id_to_footer_ordinal[column_meta.unique_id()] = ordinal++;
+ }
+ }
return Status::OK();
}
+const ColumnMetaPB* Segment::get_column_meta(int32_t unique_id) const {
+ auto it = _column_id_to_footer_ordinal.find(unique_id);
+ if (it == _column_id_to_footer_ordinal.end()) {
+ return nullptr;
+ }
+ return &_footer_pb->columns(it->second);
+}
+
Status Segment::_open_inverted_index() {
_inverted_index_file_reader = std::make_shared<InvertedIndexFileReader>(
_fs,
@@ -233,7 +248,8 @@ Status Segment::new_iterator(SchemaSPtr schema, const
StorageReadOptions& read_o
if (col.is_extracted_column()) {
auto relative_path = col.path_info_ptr()->copy_pop_front();
int32_t unique_id = col.unique_id() > 0 ? col.unique_id() :
col.parent_unique_id();
- const auto* node =
_sub_column_tree[unique_id].find_exact(relative_path);
+ const auto* node =
((VariantColumnReader*)(_column_readers.at(unique_id).get()))
+ ->get_reader_by_path(relative_path);
reader = node != nullptr ? node->data.reader.get() : nullptr;
} else {
reader = _column_readers.contains(col.unique_id())
@@ -558,20 +574,17 @@ vectorized::DataTypePtr Segment::get_data_type_of(const
ColumnIdentifier& identi
auto relative_path = identifier.path->copy_pop_front();
int32_t unique_id =
identifier.unique_id > 0 ? identifier.unique_id :
identifier.parent_unique_id;
- const auto* node = _sub_column_tree.contains(unique_id)
- ?
_sub_column_tree.at(unique_id).find_leaf(relative_path)
+ const auto* node = _column_readers.contains(unique_id)
+ ?
((VariantColumnReader*)(_column_readers.at(unique_id).get()))
+
->get_reader_by_path(relative_path)
: nullptr;
- const auto* sparse_node =
- _sparse_column_tree.contains(unique_id)
- ?
_sparse_column_tree.at(unique_id).find_exact(relative_path)
- : nullptr;
if (node) {
- if (read_flat_leaves || (node->children.empty() && sparse_node ==
nullptr)) {
+ if (read_flat_leaves || (node->children.empty())) {
return node->data.file_column_type;
}
}
// missing in storage, treat it using input data type
- if (read_flat_leaves && !node && !sparse_node) {
+ if (read_flat_leaves && !node) {
return nullptr;
}
// it contains children or column missing in storage, so treat it as
variant
@@ -592,28 +605,11 @@ Status Segment::_create_column_readers_once() {
}
Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
- std::unordered_map<uint32_t, uint32_t> column_id_to_footer_ordinal;
- std::unordered_map<vectorized::PathInData, uint32_t,
vectorized::PathInData::Hash>
- column_path_to_footer_ordinal;
- for (uint32_t ordinal = 0; ordinal < footer.columns().size(); ++ordinal) {
- const auto& column_pb = footer.columns(ordinal);
- // column path for accessing subcolumns of variant
- if (column_pb.has_column_path_info()) {
- vectorized::PathInData path;
- path.from_protobuf(column_pb.column_path_info());
- column_path_to_footer_ordinal.emplace(path, ordinal);
- }
- // unique_id is unsigned, -1 meaning no unique id(e.g. an extracted
column from variant)
- if (static_cast<int>(column_pb.unique_id()) >= 0) {
- // unique id
- column_id_to_footer_ordinal.emplace(column_pb.unique_id(),
ordinal);
- }
- }
// init by unique_id
for (uint32_t ordinal = 0; ordinal < _tablet_schema->num_columns();
++ordinal) {
const auto& column = _tablet_schema->column(ordinal);
- auto iter = column_id_to_footer_ordinal.find(column.unique_id());
- if (iter == column_id_to_footer_ordinal.end()) {
+ auto iter = _column_id_to_footer_ordinal.find(column.unique_id());
+ if (iter == _column_id_to_footer_ordinal.end()) {
continue;
}
@@ -622,40 +618,40 @@ Status Segment::_create_column_readers(const
SegmentFooterPB& footer) {
.be_exec_version = _be_exec_version,
};
std::unique_ptr<ColumnReader> reader;
- RETURN_IF_ERROR(ColumnReader::create(opts,
footer.columns(iter->second), footer.num_rows(),
+ RETURN_IF_ERROR(ColumnReader::create(opts, footer, iter->second,
footer.num_rows(),
_file_reader, &reader));
_column_readers.emplace(column.unique_id(), std::move(reader));
}
- for (const auto& [path, ordinal] : column_path_to_footer_ordinal) {
- const ColumnMetaPB& column_pb = footer.columns(ordinal);
- ColumnReaderOptions opts {
- .kept_in_memory = _tablet_schema->is_in_memory(),
- .be_exec_version = _be_exec_version,
- };
- std::unique_ptr<ColumnReader> reader;
- RETURN_IF_ERROR(
- ColumnReader::create(opts, column_pb, footer.num_rows(),
_file_reader, &reader));
- int32_t unique_id = column_pb.unique_id();
- auto relative_path = path.copy_pop_front();
- if (_sub_column_tree[unique_id].get_root() == nullptr) {
- _sub_column_tree[unique_id].create_root(SubcolumnReader {nullptr,
nullptr});
- }
- if (relative_path.empty()) {
- // root column
-
_sub_column_tree[unique_id].get_mutable_root()->modify_to_scalar(SubcolumnReader
{
- std::move(reader),
-
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
- } else {
- // check the root is already a leaf node
- //
DCHECK(_sub_column_tree[unique_id].get_leaves()[0]->path.empty());
- _sub_column_tree[unique_id].add(
- relative_path,
- SubcolumnReader {
- std::move(reader),
-
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
- }
- }
+ // for (const auto& [path, ordinal] : column_path_to_footer_ordinal) {
+ // const ColumnMetaPB& column_pb = footer.columns(ordinal);
+ // ColumnReaderOptions opts {
+ // .kept_in_memory = _tablet_schema->is_in_memory(),
+ // .be_exec_version = _be_exec_version,
+ // };
+ // std::unique_ptr<ColumnReader> reader;
+ // RETURN_IF_ERROR(
+ // ColumnReader::create(opts, column_pb, footer.num_rows(),
_file_reader, &reader));
+ // int32_t unique_id = column_pb.unique_id();
+ // auto relative_path = path.copy_pop_front();
+ // if (_sub_column_tree[unique_id].get_root() == nullptr) {
+ // _sub_column_tree[unique_id].create_root(SubcolumnReader
{nullptr, nullptr});
+ // }
+ // if (relative_path.empty()) {
+ // // root column
+ //
_sub_column_tree[unique_id].get_mutable_root()->modify_to_scalar(SubcolumnReader
{
+ // std::move(reader),
+ //
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
+ // } else {
+ // // check the root is already a leaf node
+ // //
DCHECK(_sub_column_tree[unique_id].get_leaves()[0]->path.empty());
+ // _sub_column_tree[unique_id].add(
+ // relative_path,
+ // SubcolumnReader {
+ // std::move(reader),
+ //
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
+ // }
+ // }
// compability reason use tablet schema
// init by column path
@@ -716,8 +712,8 @@ Status Segment::_create_column_readers(const
SegmentFooterPB& footer) {
return Status::OK();
}
-static Status new_default_iterator(const TabletColumn& tablet_column,
- std::unique_ptr<ColumnIterator>* iter) {
+Status Segment::new_default_iterator(const TabletColumn& tablet_column,
+ std::unique_ptr<ColumnIterator>* iter) {
if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
return Status::InternalError(
"invalid nonexistent column without default value.
column_uid={}, column_name={}, "
@@ -736,147 +732,48 @@ static Status new_default_iterator(const TabletColumn&
tablet_column,
return Status::OK();
}
-Status Segment::_new_iterator_with_variant_root(const TabletColumn&
tablet_column,
-
std::unique_ptr<ColumnIterator>* iter,
- const
SubcolumnColumnReaders::Node* root,
- vectorized::DataTypePtr
target_type_hint) {
- ColumnIterator* it;
- RETURN_IF_ERROR(root->data.reader->new_iterator(&it));
- auto* stream_iter = new ExtractReader(
- tablet_column,
-
std::make_unique<SubstreamIterator>(root->data.file_column_type->create_column(),
-
std::unique_ptr<ColumnIterator>(it),
- root->data.file_column_type),
- target_type_hint);
- iter->reset(stream_iter);
- return Status::OK();
-}
-
-Status Segment::new_column_iterator_with_path(const TabletColumn&
tablet_column,
- std::unique_ptr<ColumnIterator>*
iter,
- const StorageReadOptions* opt) {
- // root column use unique id, leaf column use parent_unique_id
- int32_t unique_id = tablet_column.unique_id() > 0 ?
tablet_column.unique_id()
- :
tablet_column.parent_unique_id();
- if (!_sub_column_tree.contains(unique_id)) {
- // No such variant column in this segment, get a default one
- RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
- return Status::OK();
- }
- auto relative_path = tablet_column.path_info_ptr()->copy_pop_front();
- const auto* root = _sub_column_tree[unique_id].get_root();
- const auto* node = tablet_column.has_path_info()
- ?
_sub_column_tree[unique_id].find_exact(relative_path)
- : nullptr;
- const auto* sparse_node =
- tablet_column.has_path_info() &&
_sparse_column_tree.contains(unique_id)
- ? _sparse_column_tree[unique_id].find_exact(relative_path)
- : nullptr;
- // // Currently only compaction and checksum need to read flat leaves
- // // They both use tablet_schema_with_merged_max_schema_version as read
schema
- // auto type_to_read_flat_leaves = [](ReaderType type) {
- // return type == ReaderType::READER_BASE_COMPACTION ||
- // type == ReaderType::READER_CUMULATIVE_COMPACTION ||
- // type == ReaderType::READER_COLD_DATA_COMPACTION ||
- // type == ReaderType::READER_SEGMENT_COMPACTION ||
- // type == ReaderType::READER_FULL_COMPACTION || type ==
ReaderType::READER_CHECKSUM;
- // };
-
- // // find the sibling of the nested column to fill the target nested
column
- // auto new_default_iter_with_same_nested = [&](const TabletColumn&
tablet_column,
- //
std::unique_ptr<ColumnIterator>* iter) {
- // // We find node that represents the same Nested type as path.
- // const auto* parent =
_sub_column_tree[unique_id].find_best_match(relative_path);
- // VLOG_DEBUG << "find with path " <<
tablet_column.path_info_ptr()->get_path() << " parent "
- // << (parent ? parent->path.get_path() : "nullptr") << ",
type "
- // << ", parent is nested " << (parent ?
parent->is_nested() : false) << ", "
- // <<
TabletColumn::get_string_by_field_type(tablet_column.type());
- // // find it's common parent with nested part
- // // why not use parent->path->has_nested_part? because parent may
not be a leaf node
- // // none leaf node may not contain path info
- // // Example:
- // // {"payload" : {"commits" : [{"issue" : {"id" : 123, "email" :
"a@b"}}]}}
- // // nested node path : payload.commits(NESTED)
- // // tablet_column path_info : payload.commits.issue.id(SCALAR)
- // // parent path node : payload.commits.issue(TUPLE)
- // // leaf path_info : payload.commits.issue.email(SCALAR)
- // if (parent && SubcolumnColumnReaders::find_parent(
- // parent, [](const auto& node) { return
node.is_nested(); })) {
- // /// Find any leaf of Nested subcolumn.
- // const auto* leaf = SubcolumnColumnReaders::find_leaf(
- // parent, [](const auto& node) { return
node.path.has_nested_part(); });
- // assert(leaf);
- // std::unique_ptr<ColumnIterator> sibling_iter;
- // ColumnIterator* sibling_iter_ptr;
- //
RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr));
- // sibling_iter.reset(sibling_iter_ptr);
- // *iter =
std::make_unique<DefaultNestedColumnIterator>(std::move(sibling_iter),
- //
leaf->data.file_column_type);
- // } else {
- // *iter = std::make_unique<DefaultNestedColumnIterator>(nullptr,
nullptr);
- // }
- // return Status::OK();
- // };
-
- // if (opt != nullptr &&
type_to_read_flat_leaves(opt->io_ctx.reader_type)) {
- // // compaction need to read flat leaves nodes data to prevent from
amplification
- // const auto* node = tablet_column.has_path_info()
- // ?
_sub_column_tree[unique_id].find_leaf(relative_path)
- // : nullptr;
- // if (!node) {
- // // sparse_columns have this path, read from root
- // if (sparse_node != nullptr && sparse_node->is_leaf_node()) {
- // RETURN_IF_ERROR(_new_iterator_with_variant_root(
- // tablet_column, iter, root,
sparse_node->data.file_column_type));
- // } else {
- // if (tablet_column.is_nested_subcolumn()) {
- // // using the sibling of the nested column to fill the
target nested column
- //
RETURN_IF_ERROR(new_default_iter_with_same_nested(tablet_column, iter));
- // } else {
- // RETURN_IF_ERROR(new_default_iterator(tablet_column,
iter));
- // }
- // }
- // return Status::OK();
- // }
- // ColumnIterator* it;
- // RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
- // iter->reset(it);
- // return Status::OK();
- // }
-
- if (node != nullptr) {
- if (node->is_leaf_node() && sparse_node == nullptr) {
- // Node contains column without any child sub columns and no
corresponding sparse columns
- // Direct read extracted columns
- const auto* node =
_sub_column_tree[unique_id].find_leaf(relative_path);
- ColumnIterator* it;
- RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
- iter->reset(it);
- } else {
- // Node contains column with children columns or has correspoding
sparse columns
- // Create reader with hirachical data.
- // If sparse column exists or read the full path of variant read
in MERGE_SPARSE, otherwise READ_DIRECT
- HierarchicalDataReader::ReadType read_type =
- (relative_path == root->path) || sparse_node != nullptr
- ? HierarchicalDataReader::ReadType::MERGE_SPARSE
- : HierarchicalDataReader::ReadType::READ_DIRECT;
- RETURN_IF_ERROR(
- HierarchicalDataReader::create(iter, relative_path, node,
root, read_type));
- }
- } else {
- // No such node, read from either sparse column or default column
- if (sparse_node != nullptr) {
- // sparse columns have this path, read from root
- RETURN_IF_ERROR(_new_iterator_with_variant_root(tablet_column,
iter, root,
-
sparse_node->data.file_column_type));
- } else {
- // No such variant column in this segment, get a default one
- RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
- }
- }
-
- return Status::OK();
-}
+// Status Segment::new_column_iterator_with_path(const TabletColumn&
tablet_column,
+//
std::unique_ptr<ColumnIterator>* iter,
+// const StorageReadOptions*
opt) {
+// // root column use unique id, leaf column use parent_unique_id
+// int32_t unique_id = tablet_column.unique_id() > 0 ?
tablet_column.unique_id()
+// :
tablet_column.parent_unique_id();
+// if (!_sub_column_tree.contains(unique_id)) {
+// // No such variant column in this segment, get a default one
+// RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
+// return Status::OK();
+// }
+// auto relative_path = tablet_column.path_info_ptr()->copy_pop_front();
+// const auto* root = _sub_column_tree[unique_id].get_root();
+// const auto* node = tablet_column.has_path_info()
+// ?
_sub_column_tree[unique_id].find_exact(relative_path)
+// : nullptr;
+//
+// if (node != nullptr) {
+// if (node->is_leaf_node()) {
+// // Node contains column without any child sub columns and no
corresponding sparse columns
+// // Direct read extracted columns
+// const auto* node =
_sub_column_tree[unique_id].find_leaf(relative_path);
+// ColumnIterator* it;
+// RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
+// iter->reset(it);
+// } else {
+// // Node contains column with children columns or has
correspoding sparse columns
+// // Create reader with hirachical data.
+// // If sparse column exists or read the full path of variant
read in MERGE_ROOT, otherwise READ_DIRECT
+// HierarchicalDataReader::ReadType read_type =
+// (relative_path == root->path) ?
HierarchicalDataReader::ReadType::MERGE_ROOT
+// :
HierarchicalDataReader::ReadType::READ_DIRECT;
+// RETURN_IF_ERROR(
+// HierarchicalDataReader::create(iter, relative_path,
node, root, read_type));
+// }
+// } else {
+// // No such node, read from sparse column
+// // TODO test if in VariantStatisticsPB.sparse_column_non_null_size,
otherwise generate a default iterator
+// }
+//
+// return Status::OK();
+// }
// Not use cid anymore, for example original table schema is colA int, then
user do following actions
// 1.add column b
@@ -894,46 +791,43 @@ Status Segment::new_column_iterator(const TabletColumn&
tablet_column,
RETURN_IF_ERROR(_create_column_readers_once());
// init column iterator by path info
- if (tablet_column.has_path_info() || tablet_column.is_variant_type()) {
- return new_column_iterator_with_path(tablet_column, iter, opt);
- }
+ // if (tablet_column.has_path_info() || tablet_column.is_variant_type()) {
+ // return new_column_iterator_with_path(tablet_column, iter, opt);
+ // }
+
+ // For compability reason unique_id may less than 0 for variant extracted
column
+ int32_t unique_id = tablet_column.unique_id() > 0 ?
tablet_column.unique_id()
+ :
tablet_column.parent_unique_id();
// init default iterator
- if (!_column_readers.contains(tablet_column.unique_id())) {
+ if (!_column_readers.contains(unique_id)) {
RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
return Status::OK();
}
// init iterator by unique id
ColumnIterator* it;
-
RETURN_IF_ERROR(_column_readers.at(tablet_column.unique_id())->new_iterator(&it));
+ RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it,
tablet_column));
iter->reset(it);
if (config::enable_column_type_check && !tablet_column.is_agg_state_type()
&&
- tablet_column.type() !=
_column_readers.at(tablet_column.unique_id())->get_meta_type()) {
+ tablet_column.type() !=
_column_readers.at(unique_id)->get_meta_type()) {
LOG(WARNING) << "different type between schema and column reader,"
<< " column schema name: " << tablet_column.name()
<< " column schema type: " << int(tablet_column.type())
<< " column reader meta type: "
- <<
int(_column_readers.at(tablet_column.unique_id())->get_meta_type());
+ << int(_column_readers.at(unique_id)->get_meta_type());
return Status::InternalError("different type between schema and column
reader");
}
return Status::OK();
}
-Status Segment::new_column_iterator(int32_t unique_id,
std::unique_ptr<ColumnIterator>* iter) {
- RETURN_IF_ERROR(_create_column_readers_once());
- ColumnIterator* it;
- RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it));
- iter->reset(it);
- return Status::OK();
-}
-
ColumnReader* Segment::_get_column_reader(const TabletColumn& col) {
// init column iterator by path info
if (col.has_path_info() || col.is_variant_type()) {
auto relative_path = col.path_info_ptr()->copy_pop_front();
int32_t unique_id = col.unique_id() > 0 ? col.unique_id() :
col.parent_unique_id();
const auto* node = col.has_path_info()
- ?
_sub_column_tree[unique_id].find_exact(relative_path)
+ ?
((VariantColumnReader*)(_column_readers.at(unique_id).get()))
+
->get_reader_by_path(relative_path)
: nullptr;
if (node != nullptr) {
return node->data.reader.get();
diff --git a/be/src/olap/rowset/segment_v2/segment.h
b/be/src/olap/rowset/segment_v2/segment.h
index bc5ab1e1fdc..877f74ae1c3 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -37,14 +37,12 @@
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/column_reader.h" // ColumnReader
#include "olap/rowset/segment_v2/page_handle.h"
-#include "olap/rowset/segment_v2/stream_reader.h"
#include "olap/schema.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "util/once.h"
#include "util/slice.h"
#include "vec/columns/column.h"
-#include "vec/columns/subcolumn_tree.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/json/path_in_data.h"
@@ -107,11 +105,9 @@ public:
std::unique_ptr<ColumnIterator>* iter,
const StorageReadOptions* opt);
- Status new_column_iterator_with_path(const TabletColumn& tablet_column,
- std::unique_ptr<ColumnIterator>* iter,
- const StorageReadOptions* opt);
-
- Status new_column_iterator(int32_t unique_id,
std::unique_ptr<ColumnIterator>* iter);
+ // Status new_column_iterator_with_path(const TabletColumn& tablet_column,
+ // std::unique_ptr<ColumnIterator>*
iter,
+ // const StorageReadOptions* opt);
Status new_bitmap_index_iterator(const TabletColumn& tablet_column,
std::unique_ptr<BitmapIndexIterator>*
iter);
@@ -120,7 +116,8 @@ public:
const TabletIndex* index_meta,
const StorageReadOptions& read_options,
std::unique_ptr<InvertedIndexIterator>*
iter);
-
+ static Status new_default_iterator(const TabletColumn& tablet_column,
+ std::unique_ptr<ColumnIterator>* iter);
const ShortKeyIndexDecoder* get_short_key_index() const {
DCHECK(_load_index_once.has_called() &&
_load_index_once.stored_result().ok());
return _sk_index_decoder.get();
@@ -211,6 +208,8 @@ public:
const TabletSchemaSPtr& tablet_schema() { return _tablet_schema; }
+ const ColumnMetaPB* get_column_meta(int32_t unique_id) const;
+
private:
DISALLOW_COPY_AND_ASSIGN(Segment);
Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr
tablet_schema,
@@ -226,11 +225,6 @@ private:
Status _load_pk_bloom_filter();
ColumnReader* _get_column_reader(const TabletColumn& col);
- // Get Iterator which will read variant root column and extract with paths
and types info
- Status _new_iterator_with_variant_root(const TabletColumn& tablet_column,
- std::unique_ptr<ColumnIterator>*
iter,
- const SubcolumnColumnReaders::Node*
root,
- vectorized::DataTypePtr
target_type_hint);
Status _write_error_file(size_t file_size, size_t offset, size_t
bytes_read, char* data,
io::IOContext& io_ctx);
@@ -269,15 +263,6 @@ private:
// map column unique id ---> it's inner data type
std::map<int32_t, std::shared_ptr<const vectorized::IDataType>>
_file_column_types;
- // Each node in the tree represents the sub column reader and type
- // for variants.
- // map column unique id --> it's sub column readers
- std::map<int32_t, SubcolumnColumnReaders> _sub_column_tree;
-
- // each sprase column's path and types info
- // map column unique id --> it's sparse sub column readers
- std::map<int32_t, SubcolumnColumnReaders> _sparse_column_tree;
-
// used to guarantee that short key index will be loaded at most once in a
thread-safe way
DorisCallOnce<Status> _load_index_once;
// used to guarantee that primary key bloom filter will be loaded at most
once in a thread-safe way
@@ -303,6 +288,8 @@ private:
int _be_exec_version = BeExecVersionManager::get_newest_version();
OlapReaderStatistics* _pk_index_load_stats = nullptr;
+ // unique_id -> idx in footer.columns()
+ std::unordered_map<int32_t, uint32_t> _column_id_to_footer_ordinal;
};
} // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 1bfcfbb999b..b76acf68978 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -285,6 +285,7 @@ Status SegmentWriter::_create_column_writer(uint32_t cid,
const TabletColumn& co
opts.file_writer = _file_writer;
opts.compression_type = _opts.compression_type;
opts.footer = &_footer;
+ opts.input_rs_readers = _opts.rowset_ctx->input_rs_readers;
std::unique_ptr<ColumnWriter> writer;
RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer,
&writer));
diff --git a/be/src/olap/rowset/segment_v2/stream_reader.h
b/be/src/olap/rowset/segment_v2/stream_reader.h
index 9aac3c0f232..5b71e00101f 100644
--- a/be/src/olap/rowset/segment_v2/stream_reader.h
+++ b/be/src/olap/rowset/segment_v2/stream_reader.h
@@ -19,9 +19,14 @@
#include <memory>
-#include "olap/rowset/segment_v2/column_reader.h"
+// #include "olap/rowset/segment_v2/column_reader.h"
+#include "vec/columns/column.h"
+#include "vec/columns/subcolumn_tree.h"
+#include "vec/data_types/data_type.h"
namespace doris::segment_v2 {
+class ColumnIterator;
+class ColumnReader;
// This file Defined ColumnIterator and ColumnReader for reading variant
subcolumns. The types from read schema and from storage are
// different, so we need to wrap the ColumnIterator from execution phase and
storage column reading phase.And we also
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
index 958df5780bd..761cbec8c49 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
@@ -16,9 +16,14 @@
// under the License.
#include "olap/rowset/segment_v2/variant_column_writer_impl.h"
+#include <gen_cpp/segment_v2.pb.h>
+
#include "common/status.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/column_writer.h"
+#include "olap/segment_loader.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_object.h"
@@ -31,11 +36,90 @@ namespace doris::segment_v2 {
VariantColumnWriterImpl::VariantColumnWriterImpl(const ColumnWriterOptions&
opts,
const TabletColumn* column) {
_opts = opts;
- _column = vectorized::ColumnObject::create(true, false);
- if (column->is_nullable()) {
+ _tablet_column = column;
+}
+
+Status VariantColumnWriterImpl::init() {
+ // caculate stats info
+ std::set<std::string> dynamic_paths;
+ RETURN_IF_ERROR(_get_subcolumn_paths_from_stats(dynamic_paths));
+ if (dynamic_paths.empty()) {
+ _column = vectorized::ColumnObject::create(true, false);
+ } else {
+ vectorized::ColumnObject::Subcolumns dynamic_subcolumns;
+ for (const auto& path : dynamic_paths) {
+ dynamic_subcolumns.add(vectorized::PathInData(path),
+ vectorized::ColumnObject::Subcolumn {0,
true});
+ }
+ _column =
vectorized::ColumnObject::create(std::move(dynamic_subcolumns), true);
+ }
+ if (_tablet_column->is_nullable()) {
_null_column = vectorized::ColumnUInt8::create(0);
}
- _tablet_column = column;
+ return Status::OK();
+}
+
+Status
VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::string>&
paths) {
+ std::unordered_map<std::string, size_t>
path_to_total_number_of_non_null_values;
+
+ // Merge and collect all stats info from all input rowsets/segments
+ for (RowsetReaderSharedPtr reader : _opts.input_rs_readers) {
+ SegmentCacheHandle segment_cache;
+ RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
+ std::static_pointer_cast<BetaRowset>(reader->rowset()),
&segment_cache));
+ for (const auto& segment : segment_cache.get_segments()) {
+ const auto* column_meta_pb =
segment->get_column_meta(_tablet_column->unique_id());
+ if (!column_meta_pb) {
+ continue;
+ }
+ if (!column_meta_pb->has_variant_statistics()) {
+ continue;
+ }
+ const VariantStatisticsPB& source_statistics =
column_meta_pb->variant_statistics();
+ for (const auto& [path, size] :
source_statistics.subcolumn_non_null_size()) {
+ auto it = path_to_total_number_of_non_null_values.find(path);
+ if (it == path_to_total_number_of_non_null_values.end()) {
+ it = path_to_total_number_of_non_null_values.emplace(path,
0).first;
+ }
+ it->second += size;
+ }
+ for (const auto& [path, size] :
source_statistics.sparse_column_non_null_size()) {
+ auto it = path_to_total_number_of_non_null_values.find(path);
+ if (it == path_to_total_number_of_non_null_values.end()) {
+ it = path_to_total_number_of_non_null_values.emplace(path,
0).first;
+ }
+ it->second += size;
+ }
+ }
+ }
+ // Check if the number of all dynamic paths exceeds the limit.
+ if (path_to_total_number_of_non_null_values.size() >
vectorized::ColumnObject::MAX_SUBCOLUMNS) {
+ // Sort paths by total number of non null values.
+ std::vector<std::pair<size_t, std::string_view>> paths_with_sizes;
+
paths_with_sizes.reserve(path_to_total_number_of_non_null_values.size());
+ for (const auto& [path, size] :
path_to_total_number_of_non_null_values) {
+ paths_with_sizes.emplace_back(size, path);
+ }
+ std::sort(paths_with_sizes.begin(), paths_with_sizes.end(),
std::greater());
+
+ // Fill dynamic_paths with first max_dynamic_paths paths in sorted
list.
+ for (const auto& [size, path] : paths_with_sizes) {
+ if (paths.size() < vectorized::ColumnObject::MAX_SUBCOLUMNS) {
+ paths.emplace(path);
+ }
+ // // todo : Add all remaining paths into shared data statistics
until we reach its max size;
+ // else if (new_statistics.sparse_data_paths_statistics.size() <
Statistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
+ // new_statistics.sparse_data_paths_statistics.emplace(path,
size);
+ // }
+ }
+ } else {
+ // Use all dynamic paths from all source columns.
+ for (const auto& [path, _] : path_to_total_number_of_non_null_values) {
+ paths.emplace(path);
+ }
+ }
+
+ return Status::OK();
}
Status VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject*
ptr,
@@ -92,7 +176,6 @@ Status
VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt
.parent_unique_id =
_tablet_column->unique_id(),
.path_info = full_path});
};
-
_statistics._subcolumns_non_null_size.reserve(ptr->get_subcolumns().size());
// convert sub column data from engine format to storage layer format
for (const auto& entry :
vectorized::schema_util::get_sorted_subcolumns(ptr->get_subcolumns())) {
@@ -121,7 +204,8 @@ Status
VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt
_subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows);
// get stastics
-
_statistics._subcolumns_non_null_size.push_back(entry->data.get_non_null_value_size());
+ _statistics._subcolumns_non_null_size.emplace(entry->path.get_path(),
+
entry->data.get_non_null_value_size());
}
return Status::OK();
}
@@ -163,7 +247,7 @@ Status VariantColumnWriterImpl::_process_sparse_column(
it != _statistics._sparse_column_non_null_size.end()) {
++it->second;
} else if (_statistics._sparse_column_non_null_size.size() <
- VariantStatistics::MAX_SHARED_DATA_STATISTICS_SIZE) {
+ VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
_statistics._sparse_column_non_null_size.emplace(path, 1);
}
}
@@ -173,7 +257,22 @@ Status VariantColumnWriterImpl::_process_sparse_column(
}
void VariantStatistics::to_pb(VariantStatisticsPB* stats) const {
- // TODO
+ for (const auto& [path, value] : _sparse_column_non_null_size) {
+ stats->mutable_subcolumn_non_null_size()->emplace(path.to_string(),
value);
+ }
+ for (const auto& [path, value] : _sparse_column_non_null_size) {
+
stats->mutable_sparse_column_non_null_size()->emplace(path.to_string(), value);
+ }
+}
+
+void VariantStatistics::from_pb(const VariantStatisticsPB& stats) {
+ // make sure the ref of path, todo not use ref
+ for (const auto& [path, value] : stats.subcolumn_non_null_size()) {
+ _subcolumns_non_null_size[StringRef(path.data(), path.size())] = value;
+ }
+ for (const auto& [path, value] : stats.sparse_column_non_null_size()) {
+ _sparse_column_non_null_size[StringRef(path.data(), path.size())] =
value;
+ }
}
Status VariantColumnWriterImpl::finalize() {
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
index 87f67e7b1ef..66c5269e7ce 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
@@ -36,18 +36,20 @@ class ColumnWriter;
class ScalarColumnWriter;
struct VariantStatistics {
- constexpr static size_t MAX_SHARED_DATA_STATISTICS_SIZE = 10000;
- std::vector<size_t> _subcolumns_non_null_size;
+ // If reached the size of this, we should stop writing statistics for
sparse data
+ constexpr static size_t MAX_SPARSE_DATA_STATISTICS_SIZE = 10000;
+ std::map<StringRef, size_t> _subcolumns_non_null_size;
std::map<StringRef, size_t> _sparse_column_non_null_size;
void to_pb(VariantStatisticsPB* stats) const;
+ void from_pb(const VariantStatisticsPB& stats);
};
class VariantColumnWriterImpl {
public:
VariantColumnWriterImpl(const ColumnWriterOptions& opts, const
TabletColumn* column);
Status finalize();
-
+ Status init();
bool is_finalized() const;
Status append_data(const uint8_t** ptr, size_t num_rows);
@@ -65,6 +67,9 @@ public:
private:
void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const
TabletColumn& column);
+ // subcolumn path from variant stats info to distinguish from sparse column
+ Status _get_subcolumn_paths_from_stats(std::set<std::string>& paths);
+
Status _create_column_writer(uint32_t cid, const TabletColumn& column,
const TabletColumn& parent_column,
const TabletSchemaSPtr& tablet_schema);
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 089dac218fe..6abc2f5c16e 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -280,6 +280,7 @@ Status
VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
opts.file_writer = _file_writer;
opts.compression_type = _opts.compression_type;
opts.footer = &_footer;
+ opts.input_rs_readers = _opts.rowset_ctx->input_rs_readers;
std::unique_ptr<ColumnWriter> writer;
RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer,
&writer));
diff --git a/be/src/vec/columns/column_object.cpp
b/be/src/vec/columns/column_object.cpp
index 9d6e260724b..11130e628e9 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -356,6 +356,7 @@ ColumnObject::Subcolumn::Subcolumn(MutableColumnPtr&&
data_, DataTypePtr type, b
: least_common_type(type), is_nullable(is_nullable_),
is_root(is_root_) {
data.push_back(std::move(data_));
data_types.push_back(type);
+ data_serdes.push_back(type->get_serde());
}
ColumnObject::Subcolumn::Subcolumn(size_t size_, bool is_nullable_, bool
is_root_)
@@ -398,6 +399,7 @@ void
ColumnObject::Subcolumn::add_new_column_part(DataTypePtr type) {
data.push_back(type->create_column());
least_common_type = LeastCommonType {type};
data_types.push_back(type);
+ data_serdes.push_back(type->get_serde());
}
void ColumnObject::Subcolumn::insert(Field field, FieldInfo info) {
@@ -800,6 +802,9 @@ ColumnObject::ColumnObject(bool is_nullable_, bool
create_root_)
}
}
+ColumnObject::ColumnObject(MutableColumnPtr&& sparse_column)
+ : is_nullable(true),
serialized_sparse_column(std::move(sparse_column)) {}
+
ColumnObject::ColumnObject(bool is_nullable_, DataTypePtr type,
MutableColumnPtr&& column)
: is_nullable(is_nullable_) {
add_sub_column({}, std::move(column), type);
@@ -957,6 +962,27 @@ void ColumnObject::insert_default() {
++num_rows;
}
+bool ColumnObject::Subcolumn::is_null_at(size_t n) const {
+ if (least_common_type.get_base_type_id() == TypeIndex::Nothing) {
+ return true;
+ }
+ size_t ind = n;
+ if (ind < num_of_defaults_in_prefix) {
+ return true;
+ }
+
+ ind -= num_of_defaults_in_prefix;
+ for (const auto& part : data) {
+ if (ind < part->size()) {
+ return assert_cast<const ColumnNullable&>(*part).is_null_at(ind);
+ }
+ ind -= part->size();
+ }
+
+ throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting
field is out of range",
+ n);
+}
+
void ColumnObject::Subcolumn::get(size_t n, Field& res) const {
if (least_common_type.get_base_type_id() == TypeIndex::Nothing) {
res = Null();
@@ -1023,8 +1049,9 @@ void
ColumnObject::Subcolumn::serialize_to_sparse_column(ColumnString* key, std:
is_null = false;
// insert key
key->insert_data(path.data(), path.size());
+ const auto& part_type_serde = data_serdes[i];
// insert value
- data_types[i]->get_serde()->write_one_cell_to_binary(*part,
value, row);
+ part_type_serde->write_one_cell_to_binary(*part, value, row);
}
return;
}
@@ -1088,7 +1115,7 @@ const char* parse_binary_from_sparse_column(TypeIndex
type, const char* data, Fi
const size_t size = *reinterpret_cast<const size_t*>(data);
data += sizeof(size_t);
res = Array(size);
- vectorized::Array& array = res.get<Array>();
+ auto& array = res.get<Array>();
info_res.num_dimensions++;
for (size_t i = 0; i < size; ++i) {
const uint8_t is_null = *reinterpret_cast<const uint8_t*>(data++);
@@ -1097,7 +1124,7 @@ const char* parse_binary_from_sparse_column(TypeIndex
type, const char* data, Fi
continue;
}
Field nested_field;
- const TypeIndex nested_type =
+ const auto nested_type =
assert_cast<const TypeIndex>(*reinterpret_cast<const
uint8_t*>(data++));
data = parse_binary_from_sparse_column(nested_type, data,
nested_field, info_res);
array.emplace_back(std::move(nested_field));
@@ -1113,7 +1140,7 @@ const char* parse_binary_from_sparse_column(TypeIndex
type, const char* data, Fi
}
std::pair<Field, FieldInfo> ColumnObject::deserialize_from_sparse_column(const
ColumnString* value,
-
size_t row) const {
+
size_t row) {
const auto& data_ref = value->get_data_at(row);
const char* data = data_ref.data;
DCHECK(data_ref.size > 0);
@@ -1132,7 +1159,7 @@ std::pair<Field, FieldInfo>
ColumnObject::deserialize_from_sparse_column(const C
}
DCHECK(data_ref.size > 1);
- const TypeIndex type = assert_cast<const
TypeIndex>(*reinterpret_cast<const uint8_t*>(data++));
+ const auto type = assert_cast<const TypeIndex>(*reinterpret_cast<const
uint8_t*>(data++));
info_res.scalar_type_id = type;
Field res;
const char* end = parse_binary_from_sparse_column(type, data, res,
info_res);
@@ -1171,7 +1198,7 @@ void ColumnObject::get(size_t n, Field& res) const {
// Iterator over [path, binary value]
for (size_t i = offset; i != end; ++i) {
const StringRef path_data = path->get_data_at(i);
- const auto& data = deserialize_from_sparse_column(value, i);
+ const auto& data = ColumnObject::deserialize_from_sparse_column(value,
i);
object.try_emplace(std::string(path_data.data, path_data.size),
data.first);
}
@@ -1360,7 +1387,8 @@ void
ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
const PathInData column_path(src_sparse_path);
if (auto* subcolumn = get_subcolumn(column_path); subcolumn !=
nullptr) {
// Deserialize binary value into subcolumn from src serialized
sparse column data.
- const auto& data =
src.deserialize_from_sparse_column(src_sparse_column_values, i);
+ const auto& data =
+
ColumnObject::deserialize_from_sparse_column(src_sparse_column_values, i);
subcolumn->insert(data.first, data.second);
} else {
// Before inserting this path into sparse column check if we
need to
@@ -1386,7 +1414,7 @@ void
ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
}
}
- // Insert remaining dynamic paths from
src_dynamic_paths_for_shared_data.
+ // Insert remaining dynamic paths from
src_dynamic_paths_for_sparse_data.
while (sorted_src_subcolumn_for_sparse_column_idx <
sorted_src_subcolumn_for_sparse_column_size) {
auto& [src_path, src_subcolumn] =
sorted_src_subcolumn_for_sparse_column
@@ -1452,6 +1480,37 @@ const ColumnObject::Subcolumn*
ColumnObject::get_subcolumn(const PathInData& key
return &node->data;
}
+size_t ColumnObject::Subcolumn::serialize_text_json(size_t n, BufferWritable&
output) const {
+ if (least_common_type.get_base_type_id() == TypeIndex::Nothing) {
+ output.write(DataTypeSerDe::NULL_IN_COMPLEX_TYPE.data(),
+ DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size());
+ return DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size();
+ }
+
+ size_t ind = n;
+ if (ind < num_of_defaults_in_prefix) {
+ output.write(DataTypeSerDe::NULL_IN_COMPLEX_TYPE.data(),
+ DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size());
+ return DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size();
+ }
+
+ ind -= num_of_defaults_in_prefix;
+ DataTypeSerDe::FormatOptions opt;
+ for (size_t i = 0; i < data.size(); ++i) {
+ const auto& part = data[i];
+ const auto& part_type_serde = data_serdes[i];
+
+ if (ind < part->size()) {
+ return part_type_serde->serialize_one_cell_to_json(*part, ind,
output, opt);
+ }
+
+ ind -= part->size();
+ }
+
+ throw doris::Exception(ErrorCode::OUT_OF_BOUND,
+ "Index ({}) for serializing JSON is out of range",
n);
+}
+
const ColumnObject::Subcolumn* ColumnObject::get_subcolumn_with_cache(const
PathInData& key,
size_t
key_index) const {
// Optimization by caching the order of fields (which is almost always the
same)
@@ -1717,89 +1776,203 @@ void get_json_by_column_tree(rapidjson::Value& root,
rapidjson::Document::Alloca
}
Status ColumnObject::serialize_one_row_to_string(int64_t row, std::string*
output) const {
- if (!is_finalized()) {
- const_cast<ColumnObject*>(this)->finalize();
- }
- rapidjson::StringBuffer buf;
- if (is_scalar_variant()) {
+ // if (!is_finalized()) {
+ // const_cast<ColumnObject*>(this)->finalize();
+ // }
+ if (is_scalar_variant() && is_finalized()) {
auto type = get_root_type();
*output = type->to_string(*get_root(), row);
return Status::OK();
}
- RETURN_IF_ERROR(serialize_one_row_to_json_format(row, &buf, nullptr));
- // TODO avoid copy
- *output = std::string(buf.GetString(), buf.GetSize());
+ // TODO preallocate memory
+ auto tmp_col = ColumnString::create();
+ VectorBufferWriter write_buffer(*tmp_col.get());
+ RETURN_IF_ERROR(serialize_one_row_to_json_format(row, write_buffer,
nullptr));
+ write_buffer.commit();
+ auto str_ref = tmp_col->get_data_at(0);
+ *output = std::string(str_ref.data, str_ref.size);
return Status::OK();
}
Status ColumnObject::serialize_one_row_to_string(int64_t row, BufferWritable&
output) const {
- if (!is_finalized()) {
- const_cast<ColumnObject*>(this)->finalize();
- }
- if (is_scalar_variant()) {
+ // if (!is_finalized()) {
+ // const_cast<ColumnObject*>(this)->finalize();
+ // }
+ if (is_scalar_variant() && is_finalized()) {
auto type = get_root_type();
type->to_string(*get_root(), row, output);
return Status::OK();
}
- rapidjson::StringBuffer buf;
- RETURN_IF_ERROR(serialize_one_row_to_json_format(row, &buf, nullptr));
- output.write(buf.GetString(), buf.GetLength());
+ RETURN_IF_ERROR(serialize_one_row_to_json_format(row, output, nullptr));
return Status::OK();
}
-Status ColumnObject::serialize_one_row_to_json_format(int64_t row,
rapidjson::StringBuffer* output,
- bool* is_null) const {
- CHECK(is_finalized());
- if (subcolumns.empty()) {
- if (is_null != nullptr) {
- *is_null = true;
- } else {
- rapidjson::Value root(rapidjson::kNullType);
- rapidjson::Writer<rapidjson::StringBuffer> writer(*output);
- if (!root.Accept(writer)) {
- return Status::InternalError("Failed to serialize json value");
+/// Struct that represents elements of the JSON path.
+/// "a.b.c" -> ["a", "b", "c"]
+struct PathElements {
+ explicit PathElements(const String& path) {
+ const char* start = path.data();
+ const char* end = start + path.size();
+ const char* pos = start;
+ const char* last_dot_pos = pos - 1;
+ for (pos = start; pos != end; ++pos) {
+ if (*pos == '.') {
+ elements.emplace_back(last_dot_pos + 1, size_t(pos -
last_dot_pos - 1));
+ last_dot_pos = pos;
}
}
- return Status::OK();
+
+ elements.emplace_back(last_dot_pos + 1, size_t(pos - last_dot_pos -
1));
}
- CHECK(size() > row);
- rapidjson::StringBuffer buffer;
- rapidjson::Value root(rapidjson::kNullType);
- if (doc_structure == nullptr) {
- doc_structure = std::make_shared<rapidjson::Document>();
- rapidjson::Document::AllocatorType& allocator =
doc_structure->GetAllocator();
- get_json_by_column_tree(*doc_structure, allocator,
subcolumns.get_root());
+
+ size_t size() const { return elements.size(); }
+
+ std::vector<std::string_view> elements;
+};
+
+/// Struct that represents a prefix of a JSON path. Used during output of the
JSON object.
+struct Prefix {
+ /// Shrink current prefix to the common prefix of current prefix and
specified path.
+ /// For example, if current prefix is a.b.c.d and path is a.b.e, then
shrink the prefix to a.b.
+ void shrink_to_common_prefix(const PathElements& path_elements) {
+ /// Don't include last element in path_elements in the prefix.
+ size_t i = 0;
+ while (i != elements.size() && i != (path_elements.elements.size() -
1) &&
+ elements[i].first == path_elements.elements[i])
+ ++i;
+ elements.resize(i);
}
- if (!doc_structure->IsNull()) {
- root.CopyFrom(*doc_structure, doc_structure->GetAllocator());
+
+ /// Check is_first flag in current object.
+ bool is_first_in_current_object() const {
+ if (elements.empty()) return root_is_first_flag;
+ return elements.back().second;
}
- Arena mem_pool;
-#ifndef NDEBUG
- VLOG_DEBUG << "dump structure " <<
JsonFunctions::print_json_value(*doc_structure);
-#endif
- for (const auto& subcolumn : subcolumns) {
- RETURN_IF_ERROR(find_and_set_leave_value(
- subcolumn->data.get_finalized_column_ptr(), subcolumn->path,
- subcolumn->data.get_least_common_type_serde(),
- subcolumn->data.get_least_common_type(),
- subcolumn->data.least_common_type.get_base_type_id(), root,
- doc_structure->GetAllocator(), mem_pool, row));
- if (subcolumn->path.empty() && !root.IsObject()) {
- // root was modified, only handle root node
- break;
- }
+
+ /// Set flag is_first = false in current object.
+ void set_not_first_in_current_object() {
+ if (elements.empty())
+ root_is_first_flag = false;
+ else
+ elements.back().second = false;
}
- compact_null_values(root, doc_structure->GetAllocator());
- if (root.IsNull() && is_null != nullptr) {
- // Fast path
- *is_null = true;
- } else {
- output->Clear();
- rapidjson::Writer<rapidjson::StringBuffer> writer(*output);
- if (!root.Accept(writer)) {
- return Status::InternalError("Failed to serialize json value");
+
+ size_t size() const { return elements.size(); }
+
+ /// Elements of the prefix: (path element, is_first flag in this prefix).
+ /// is_first flag indicates if we already serialized some key in the
object with such prefix.
+ std::vector<std::pair<std::string_view, bool>> elements;
+ bool root_is_first_flag = true;
+};
+
+Status ColumnObject::serialize_one_row_to_json_format(int64_t row_num,
BufferWritable& output,
+ bool* is_null) const {
+ const auto& column_map = assert_cast<const
ColumnMap&>(*serialized_sparse_column);
+ const auto& sparse_data_offsets = column_map.get_offsets();
+ const auto [sparse_data_paths, sparse_data_values] =
get_sparse_data_paths_and_values();
+ size_t sparse_data_offset =
sparse_data_offsets[static_cast<ssize_t>(row_num) - 1];
+ size_t sparse_data_end =
sparse_data_offsets[static_cast<ssize_t>(row_num)];
+
+ // We need to convert the set of paths in this row to a JSON object.
+ // To do it, we first collect all the paths from current row, then we sort
them
+ // and construct the resulting JSON object by iterating over sorted list
of paths.
+ // For example:
+ // b.c, a.b, a.a, b.e, g, h.u.t -> a.a, a.b, b.c, b.e, g, h.u.t -> {"a" :
{"a" : ..., "b" : ...}, "b" : {"c" : ..., "e" : ...}, "g" : ..., "h" : {"u" :
{"t" : ...}}}.
+ std::vector<String> sorted_paths;
+ std::map<std::string, Subcolumn> subcolumn_path_map;
+ sorted_paths.reserve(get_subcolumns().size() + (sparse_data_end -
sparse_data_offset));
+ for (const auto& subcolumn : get_subcolumns()) {
+ /// We consider null value and absence of the path in a row as
equivalent cases, because we cannot actually distinguish them.
+ /// So, we don't output null values at all.
+ if (!subcolumn->data.is_null_at(row_num)) {
+ sorted_paths.emplace_back(subcolumn->path.get_path());
+ }
+ subcolumn_path_map.emplace(subcolumn->path.get_path(),
subcolumn->data);
+ }
+ for (size_t i = sparse_data_offset; i != sparse_data_end; ++i) {
+ auto path = sparse_data_paths->get_data_at(i).to_string();
+ sorted_paths.emplace_back(path);
+ }
+
+ std::sort(sorted_paths.begin(), sorted_paths.end());
+
+ writeChar('{', output);
+ size_t index_in_sparse_data_values = sparse_data_offset;
+ // current_prefix represents the path of the object we are currently
serializing keys in.
+ Prefix current_prefix;
+ for (const auto& path : sorted_paths) {
+ PathElements path_elements(path);
+ // Change prefix to common prefix between current prefix and current
path.
+ // If prefix changed (it can only decrease), close all finished
objects.
+ // For example:
+ // Current prefix: a.b.c.d
+ // Current path: a.b.e.f
+ // It means now we have : {..., "a" : {"b" : {"c" : {"d" : ...
+ // Common prefix will be a.b, so it means we should close objects
a.b.c.d and a.b.c: {..., "a" : {"b" : {"c" : {"d" : ...}}
+ // and continue serializing keys in object a.b
+ size_t prev_prefix_size = current_prefix.size();
+ current_prefix.shrink_to_common_prefix(path_elements);
+ size_t prefix_size = current_prefix.size();
+ if (prefix_size != prev_prefix_size) {
+ size_t objects_to_close = prev_prefix_size - prefix_size;
+ for (size_t i = 0; i != objects_to_close; ++i) {
+ writeChar('}', output);
+ }
+ }
+
+ // Now we are inside object that has common prefix with current path.
+ // We should go inside all objects in current path.
+ // From the example above we should open object a.b.e:
+ // {..., "a" : {"b" : {"c" : {"d" : ...}}, "e" : {
+ if (prefix_size + 1 < path_elements.size()) {
+ for (size_t i = prefix_size; i != path_elements.size() - 1; ++i) {
+ /// Write comma before the key if it's not the first key in
this prefix.
+ if (!current_prefix.is_first_in_current_object()) {
+ writeChar(',', output);
+ } else {
+ current_prefix.set_not_first_in_current_object();
+ }
+
+ writeJSONString(path_elements.elements[i], output);
+ writeCString(":{", output);
+
+ // Update current prefix.
+
current_prefix.elements.emplace_back(path_elements.elements[i], true);
+ }
+ }
+
+ // Write comma before the key if it's not the first key in this prefix.
+ if (!current_prefix.is_first_in_current_object()) {
+ writeChar(',', output);
+ } else {
+ current_prefix.set_not_first_in_current_object();
+ }
+
+ writeJSONString(path_elements.elements.back(), output);
+ writeCString(":", output);
+
+ // Serialize value of current path.
+ if (auto subcolumn_it = subcolumn_path_map.find(path);
+ subcolumn_it != subcolumn_path_map.end()) {
+ subcolumn_it->second.serialize_text_json(row_num, output);
+ } else {
+ // To serialize value stored in shared data we should first
deserialize it from binary format.
+ Subcolumn tmp_subcolumn(0, true);
+ const auto& data = ColumnObject::deserialize_from_sparse_column(
+ sparse_data_values, index_in_sparse_data_values++);
+ tmp_subcolumn.insert(data.first, data.second);
+ tmp_subcolumn.serialize_text_json(0, output);
}
}
+
+ // Close all remaining open objects.
+ for (size_t i = 0; i != current_prefix.elements.size(); ++i) {
+ writeChar('}', output);
+ }
+ writeChar('}', output);
+#ifndef NDEBUG
+ // check if it is a valid json
+#endif
return Status::OK();
}
@@ -2126,6 +2299,8 @@ const DataTypePtr ColumnObject::NESTED_TYPE =
std::make_shared<vectorized::DataT
std::make_shared<vectorized::DataTypeArray>(std::make_shared<vectorized::DataTypeNullable>(
std::make_shared<vectorized::DataTypeObject>())));
+const size_t ColumnObject::MAX_SUBCOLUMNS = 5;
+
DataTypePtr ColumnObject::get_root_type() const {
return subcolumns.get_root()->data.get_least_common_type();
}
@@ -2312,4 +2487,80 @@ bool ColumnObject::try_insert_default_from_nested(const
Subcolumns::NodePtr& ent
return true;
}
+size_t ColumnObject::find_path_lower_bound_in_sparse_data(StringRef path,
+ const ColumnString&
sparse_data_paths,
+ size_t start, size_t
end) {
+ // Simple random access iterator over values in ColumnString in specified
range.
+ class Iterator {
+ public:
+ using difference_type = size_t;
+ using value_type = StringRef;
+ using iterator_category = std::random_access_iterator_tag;
+ using pointer = StringRef*;
+ using reference = StringRef&;
+
+ Iterator() = delete;
+ Iterator(const ColumnString* data_, size_t index_) : data(data_),
index(index_) {}
+ Iterator(const Iterator& rhs) = default;
+ Iterator& operator=(const Iterator& rhs) = default;
+ inline Iterator& operator+=(difference_type rhs) {
+ index += rhs;
+ return *this;
+ }
+ inline StringRef operator*() const { return data->get_data_at(index); }
+
+ inline Iterator& operator++() {
+ ++index;
+ return *this;
+ }
+ inline Iterator& operator--() {
+ --index;
+ return *this;
+ }
+ inline difference_type operator-(const Iterator& rhs) const { return
index - rhs.index; }
+
+ const ColumnString* data;
+ size_t index;
+ };
+
+ Iterator start_it(&sparse_data_paths, start);
+ Iterator end_it(&sparse_data_paths, end);
+ auto it = std::lower_bound(start_it, end_it, path);
+ return it.index;
+}
+
+void ColumnObject::fill_path_olumn_from_sparse_data(Subcolumn& subcolumn,
StringRef path,
+ const ColumnPtr&
sparse_data_column,
+ size_t start, size_t end) {
+ const auto& sparse_data_map = assert_cast<const
ColumnMap&>(*sparse_data_column);
+ const auto& sparse_data_offsets = sparse_data_map.get_offsets();
+ size_t first_offset = sparse_data_offsets[static_cast<ssize_t>(start) - 1];
+ size_t last_offset = sparse_data_offsets[static_cast<ssize_t>(end) - 1];
+ // Check if we have at least one row with data.
+ if (first_offset == last_offset) {
+ subcolumn.insert_many_defaults(end - start);
+ return;
+ }
+
+ const auto& sparse_data_paths = assert_cast<const
ColumnString&>(sparse_data_map.get_keys());
+ const auto& sparse_data_values = assert_cast<const
ColumnString&>(sparse_data_map.get_values());
+ for (size_t i = start; i != end; ++i) {
+ size_t paths_start = sparse_data_offsets[static_cast<ssize_t>(i) - 1];
+ size_t paths_end = sparse_data_offsets[static_cast<ssize_t>(i)];
+ auto lower_bound_path_index =
ColumnObject::find_path_lower_bound_in_sparse_data(
+ path, sparse_data_paths, paths_start, paths_end);
+ if (lower_bound_path_index != paths_end &&
+ sparse_data_paths.get_data_at(lower_bound_path_index) == path) {
+ // auto value_data =
sparse_data_values.get_data_at(lower_bound_path_index);
+ // ReadBufferFromMemory buf(value_data.data, value_data.size);
+ // dynamic_serialization->deserializeBinary(path_column, buf,
getFormatSettings());
+ const auto& data =
ColumnObject::deserialize_from_sparse_column(&sparse_data_values,
+
lower_bound_path_index);
+ subcolumn.insert(data.first, data.second);
+ } else {
+ subcolumn.insert_default();
+ }
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_object.h
b/be/src/vec/columns/column_object.h
index 72cc783caf8..b63c0c5c0d8 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -98,7 +98,7 @@ public:
constexpr static TypeIndex MOST_COMMON_TYPE_ID = TypeIndex::JSONB;
// Nullable(Array(Nullable(Object)))
const static DataTypePtr NESTED_TYPE;
- const size_t MAX_SUBCOLUMNS = 3;
+ const static size_t MAX_SUBCOLUMNS;
// Finlize mode for subcolumns, write mode will estimate which subcolumns
are sparse columns(too many null values inside column),
// merge and encode them into a shared column in root column. Only affects
in flush block to segments.
// Otherwise read mode should be as default mode.
@@ -128,6 +128,8 @@ public:
size_t get_non_null_value_size() const;
+ size_t serialize_text_json(size_t n, BufferWritable& output) const;
+
const DataTypeSerDeSPtr& get_least_common_type_serde() const {
return least_common_type.get_serde();
}
@@ -136,6 +138,8 @@ public:
void get(size_t n, Field& res) const;
+ bool is_null_at(size_t n) const;
+
/// Inserts a field, which scalars can be arbitrary, but number of
/// dimensions should be consistent with current common type.
/// throws InvalidArgument when meet conflict types
@@ -233,6 +237,7 @@ public:
/// and it's the supertype for all type of column from 0 to i-1.
std::vector<WrappedPtr> data;
std::vector<DataTypePtr> data_types;
+ std::vector<DataTypeSerDeSPtr> data_serdes;
/// Until we insert any non-default field we don't know further
/// least common type and we count number of defaults in prefix,
/// which will be converted to the default type of final common type.
@@ -248,7 +253,6 @@ private:
const bool is_nullable;
Subcolumns subcolumns;
size_t num_rows;
-
// The rapidjson document format of Subcolumns tree structure
// the leaves is null.In order to display whole document, copy
// this structure and fill with Subcolumns sub items
@@ -268,6 +272,8 @@ public:
explicit ColumnObject(bool is_nullable_, bool create_root = true);
+ explicit ColumnObject(MutableColumnPtr&& sparse_column);
+
explicit ColumnObject(bool is_nullable_, DataTypePtr type,
MutableColumnPtr&& column);
// create without root, num_rows = size
@@ -292,7 +298,7 @@ public:
Status serialize_one_row_to_string(int64_t row, BufferWritable& output)
const;
// serialize one row to json format
- Status serialize_one_row_to_json_format(int64_t row,
rapidjson::StringBuffer* output,
+ Status serialize_one_row_to_json_format(int64_t row, BufferWritable&
output,
bool* is_null) const;
// Fill the `serialized_sparse_column`
@@ -360,11 +366,19 @@ public:
Subcolumns& get_subcolumns() { return subcolumns; }
- ColumnPtr get_sparse_column() {
+ ColumnPtr get_sparse_column() const {
return serialized_sparse_column->convert_to_full_column_if_const();
}
// use sparse_subcolumns_schema to record sparse column's path info and
type
+ static MutableColumnPtr create_sparse_column_fn() {
+ return
vectorized::ColumnMap::create(vectorized::ColumnString::create(),
+
vectorized::ColumnString::create(),
+
vectorized::ColumnArray::ColumnOffsets::create());
+ }
+
+ void set_sparse_column(ColumnPtr column) { serialized_sparse_column =
column; }
+
Status finalize(FinalizeMode mode);
/// Finalizes all subcolumns.
@@ -571,10 +585,18 @@ public:
const auto& value = assert_cast<const
ColumnString&>(column_map.get_values());
return {&key, &value};
}
+ // Insert all the data from sparse data with specified path to sub column.
+ static void fill_path_olumn_from_sparse_data(Subcolumn& subcolumn,
StringRef path,
+ const ColumnPtr&
sparse_data_column, size_t start,
+ size_t end);
+
+ static size_t find_path_lower_bound_in_sparse_data(StringRef path,
+ const ColumnString&
sparse_data_paths,
+ size_t start, size_t
end);
// Deserialize the i-th row of the column from the sparse column.
- std::pair<Field, FieldInfo> deserialize_from_sparse_column(const
ColumnString* value,
- size_t row)
const;
+ static std::pair<Field, FieldInfo> deserialize_from_sparse_column(const
ColumnString* value,
+ size_t
row);
private:
// May throw execption
diff --git a/be/src/vec/common/schema_util.cpp
b/be/src/vec/common/schema_util.cpp
index 42f9240646f..77b3299c5b5 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -608,7 +608,7 @@ bool has_schema_index_diff(const TabletSchema* new_schema,
const TabletSchema* o
TabletColumn create_sparse_column(int32_t parent_unique_id) {
TColumn tcolumn;
- tcolumn.column_name = ".sparse";
+ tcolumn.column_name = SPARSE_COLUMN_PATH;
tcolumn.col_unique_id = parent_unique_id;
tcolumn.column_type = TColumnType {};
tcolumn.column_type.type = TPrimitiveType::MAP;
@@ -618,7 +618,9 @@ TabletColumn create_sparse_column(int32_t parent_unique_id)
{
tcolumn.column_type.type = TPrimitiveType::STRING;
tcolumn.children_column.push_back(child_tcolumn);
tcolumn.children_column.push_back(child_tcolumn);
- return TabletColumn {tcolumn};
+ auto res = TabletColumn {tcolumn};
+ res.set_path_info(PathInData {SPARSE_COLUMN_PATH});
+ return res;
}
} // namespace doris::vectorized::schema_util
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 0507c9e2fe6..fee6e778325 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -49,6 +49,7 @@ struct ColumnWithTypeAndName;
} // namespace vectorized
} // namespace doris
+const std::string SPARSE_COLUMN_PATH = "__DORIS_VARIANT_SPARSE__";
namespace doris::vectorized::schema_util {
/// Returns number of dimensions in Array type. 0 if type is not array.
size_t get_number_of_dimensions(const IDataType& type);
diff --git a/be/src/vec/common/string_buffer.hpp
b/be/src/vec/common/string_buffer.hpp
index 8dca6f057a2..d297d465985 100644
--- a/be/src/vec/common/string_buffer.hpp
+++ b/be/src/vec/common/string_buffer.hpp
@@ -85,6 +85,92 @@ private:
const char* _data;
};
+inline void writeChar(char x, BufferWritable& buf) {
+ buf.write(x);
+}
+
+/** Writes a C-string without creating a temporary object. If the string is a
literal, then `strlen` is executed at the compilation stage.
+ * Use when the string is a literal.
+ */
+#define writeCString(s, buf) (buf).write((s), strlen(s))
+
+inline void writeJSONString(const char* begin, const char* end,
BufferWritable& buf) {
+ writeChar('"', buf);
+ for (const char* it = begin; it != end; ++it) {
+ switch (*it) {
+ case '\b':
+ writeChar('\\', buf);
+ writeChar('b', buf);
+ break;
+ case '\f':
+ writeChar('\\', buf);
+ writeChar('f', buf);
+ break;
+ case '\n':
+ writeChar('\\', buf);
+ writeChar('n', buf);
+ break;
+ case '\r':
+ writeChar('\\', buf);
+ writeChar('r', buf);
+ break;
+ case '\t':
+ writeChar('\\', buf);
+ writeChar('t', buf);
+ break;
+ case '\\':
+ writeChar('\\', buf);
+ writeChar('\\', buf);
+ break;
+ case '/':
+ writeChar('/', buf);
+ break;
+ case '"':
+ writeChar('\\', buf);
+ writeChar('"', buf);
+ break;
+ default:
+ UInt8 c = *it;
+ if (c <= 0x1F) {
+ /// Escaping of ASCII control characters.
+
+ UInt8 higher_half = c >> 4;
+ UInt8 lower_half = c & 0xF;
+
+ writeCString("\\u00", buf);
+ writeChar('0' + higher_half, buf);
+
+ if (lower_half <= 9) {
+ writeChar('0' + lower_half, buf);
+ } else {
+ writeChar('A' + lower_half - 10, buf);
+ }
+ } else if (end - it >= 3 && it[0] == '\xE2' && it[1] == '\x80' &&
+ (it[2] == '\xA8' || it[2] == '\xA9')) {
+ /// This is for compatibility with JavaScript, because
unescaped line separators are prohibited in string literals,
+ /// and these code points are alternative line separators.
+
+ if (it[2] == '\xA8') {
+ writeCString("\\u2028", buf);
+ }
+ if (it[2] == '\xA9') {
+ writeCString("\\u2029", buf);
+ }
+
+ /// Byte sequence is 3 bytes long. We have additional two
bytes to skip.
+ it += 2;
+ } else {
+ writeChar(*it, buf);
+ }
+ }
+ }
+ writeChar('"', buf);
+}
+
+inline void writeJSONString(std::string_view s, BufferWritable& buf) {
+ writeJSONString(s.data(), s.data() + s.size(), buf);
+}
+
using VectorBufferReader = BufferReadable;
using BufferReader = BufferReadable;
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp
b/be/src/vec/data_types/serde/data_type_object_serde.cpp
index f6719437285..6c902b60589 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp
@@ -58,15 +58,15 @@ Status DataTypeObjectSerDe::_write_column_to_mysql(const
IColumn& column,
root->get_finalized_column(), row_buffer, row_idx, col_const,
options));
} else {
// Serialize hierarchy types to json format
- rapidjson::StringBuffer buffer;
+ std::string buffer;
bool is_null = false;
- if (!variant.serialize_one_row_to_json_format(row_idx, &buffer,
&is_null)) {
+ if (!variant.serialize_one_row_to_string(row_idx, &buffer)) {
return Status::InternalError("Invalid json format");
}
if (is_null) {
row_buffer.push_null();
} else {
- row_buffer.push_string(buffer.GetString(), buffer.GetLength());
+ row_buffer.push_string(buffer.data(), buffer.size());
}
}
return Status::OK();
diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto
index 37a4f0a70ee..dee4a81d3bb 100644
--- a/gensrc/proto/segment_v2.proto
+++ b/gensrc/proto/segment_v2.proto
@@ -161,7 +161,7 @@ message ColumnPathInfo {
message VariantStatisticsPB {
// in the order of subcolumns in variant
- repeated uint32 subcolumn_non_null_size = 1;
+ map<string, uint32> subcolumn_non_null_size = 1;
map<string, uint32> sparse_column_non_null_size = 2;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]