This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b50ac3ef4a5f5b03df944bdbdb3ce7b69d0e25c4 Author: Sun Chenyang <[email protected]> AuthorDate: Tue Feb 6 20:08:53 2024 +0800 [fix](Variant Type) Add sparse columns meta to fix compaction (#28673) Co-authored-by: eldenmoon <[email protected]> --- be/src/olap/compaction.cpp | 2 + be/src/olap/rowset/segment_creator.cpp | 81 +------- .../rowset/segment_v2/hierarchical_data_reader.cpp | 4 + .../rowset/segment_v2/hierarchical_data_reader.h | 9 +- be/src/olap/rowset/segment_v2/segment.cpp | 103 +++++++---- be/src/olap/rowset/segment_v2/segment.h | 9 + be/src/olap/rowset/segment_v2/segment_writer.cpp | 4 + .../rowset/segment_v2/vertical_segment_writer.cpp | 4 + be/src/olap/tablet_schema.cpp | 29 +++ be/src/olap/tablet_schema.h | 15 ++ be/src/vec/columns/column_object.h | 6 +- be/src/vec/columns/subcolumn_tree.h | 3 +- be/src/vec/common/schema_util.cpp | 203 +++++++++++++++++++-- be/src/vec/common/schema_util.h | 19 +- be/src/vec/data_types/data_type_factory.cpp | 4 +- gensrc/proto/olap_file.proto | 2 + gensrc/proto/segment_v2.proto | 2 + .../data/variant_p0/compaction_sparse_column.out | 97 ++++++++++ regression-test/data/variant_p0/load.out | 2 +- .../variant_p0/test_compaction_extract_root.out | 19 ++ .../suites/variant_github_events_p0/load.groovy | 11 ++ .../variant_p0/compaction_sparse_column.groovy | 174 ++++++++++++++++++ regression-test/suites/variant_p0/load.groovy | 2 +- .../variant_p0/test_compaction_extract_root.groovy | 153 ++++++++++++++++ .../suites/variant_p0/variant_with_rowstore.groovy | 6 +- 25 files changed, 820 insertions(+), 143 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index bc84bbcb84e..99969cc5b72 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -326,6 +326,8 @@ Status Compaction::do_compaction_impl(int64_t permits) { } build_basic_info(); + VLOG_DEBUG << "dump tablet schema: " << _cur_tablet_schema->dump_structure(); + LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version << ", permits: " << permits; bool vertical_compaction = should_vertical_compaction(); diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 66825a9fbbd..c16d87e3daa 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -102,6 +102,7 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, } } } else { + // find positions of variant columns for (int i = 0; i < _context->original_tablet_schema->columns().size(); ++i) { if (_context->original_tablet_schema->columns()[i].is_variant_type()) { variant_column_pos.push_back(i); @@ -118,87 +119,11 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, RETURN_IF_ERROR(vectorized::schema_util::parse_and_encode_variant_columns( block, variant_column_pos, ctx)); - // Dynamic Block consists of two parts, dynamic part of columns and static part of columns - // static extracted - // | --------- | ----------- | - // The static ones are original _tablet_schame columns flush_schema = std::make_shared<TabletSchema>(); flush_schema->copy_from(*_context->original_tablet_schema); - vectorized::Block flush_block(std::move(block)); - // If column already exist in original tablet schema, then we pick common type - // and cast column to common type, and modify tablet column to common type, - // otherwise it's a new column - auto append_column = [&](const TabletColumn& parent_variant, auto& column_entry_from_object) { - const std::string& column_name = - parent_variant.name_lower_case() + "." + column_entry_from_object->path.get_path(); - const vectorized::DataTypePtr& final_data_type_from_object = - column_entry_from_object->data.get_least_common_type(); - vectorized::PathInDataBuilder full_path_builder; - auto full_path = full_path_builder.append(parent_variant.name_lower_case(), false) - .append(column_entry_from_object->path.get_parts(), false) - .build(); - TabletColumn tablet_column = vectorized::schema_util::get_column_by_type( - final_data_type_from_object, column_name, - vectorized::schema_util::ExtraInfo {.unique_id = -1, - .parent_unique_id = parent_variant.unique_id(), - .path_info = full_path}); - flush_schema->append_column(std::move(tablet_column)); - - flush_block.insert({column_entry_from_object->data.get_finalized_column_ptr()->get_ptr(), - final_data_type_from_object, column_name}); - }; - - // 1. Flatten variant column into flat columns, append flatten columns to the back of original Block and TabletSchema - // those columns are extracted columns, leave none extracted columns remain in original variant column, which is - // JSONB format at present. - // 2. Collect columns that need to be added or modified when data type changes or new columns encountered - for (size_t i = 0; i < variant_column_pos.size(); ++i) { - size_t variant_pos = variant_column_pos[i]; - auto column_ref = flush_block.get_by_position(variant_pos).column; - bool is_nullable = column_ref->is_nullable(); - const vectorized::ColumnObject& object_column = assert_cast<vectorized::ColumnObject&>( - remove_nullable(column_ref)->assume_mutable_ref()); - const TabletColumn& parent_column = - _context->original_tablet_schema->columns()[variant_pos]; - CHECK(object_column.is_finalized()); - std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root; - for (auto& entry : object_column.get_subcolumns()) { - if (entry->path.empty()) { - // root - root = entry; - continue; - } - append_column(parent_column, entry); - } - // Create new variant column and set root column - auto obj = vectorized::ColumnObject::create(true, false); - // '{}' indicates a root path - static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column( - {}, root->data.get_finalized_column_ptr()->assume_mutable(), - root->data.get_least_common_type()); - - // // set for rowstore - if (_context->original_tablet_schema->store_row_column()) { - static_cast<vectorized::ColumnObject*>(obj.get())->set_rowstore_column( - object_column.get_rowstore_column()); - } - - vectorized::ColumnPtr result = obj->get_ptr(); - if (is_nullable) { - const auto& null_map = assert_cast<const vectorized::ColumnNullable&>(*column_ref) - .get_null_map_column_ptr(); - result = vectorized::ColumnNullable::create(result, null_map); - } - flush_block.get_by_position(variant_pos).column = result; - vectorized::PathInDataBuilder full_root_path_builder; - auto full_root_path = - full_root_path_builder.append(parent_column.name_lower_case(), false).build(); - flush_schema->mutable_columns()[variant_pos].set_path_info(full_root_path); - VLOG_DEBUG << "set root_path : " << full_root_path.get_path(); - } - - vectorized::schema_util::inherit_tablet_index(flush_schema); + vectorized::schema_util::rebuild_schema_and_block( + _context->original_tablet_schema, variant_column_pos, flush_block, flush_schema); { // Update rowset schema, tablet's tablet schema will be updated when build Rowset diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp index 09764321223..1deae3a57dd 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp @@ -183,6 +183,10 @@ Status ExtractReader::extract_to(vectorized::MutableColumnPtr& dst, size_t nrows vectorized::MutableColumnPtr extracted_column; RETURN_IF_ERROR(root.extract_root( // trim the root name, eg. v.a.b -> a.b _col.path_info().copy_pop_front(), extracted_column)); + + if (_target_type_hint != nullptr) { + variant.create_root(_target_type_hint, _target_type_hint->create_column()); + } if (variant.empty() || variant.is_null_root()) { variant.create_root(root.get_root_type(), std::move(extracted_column)); } else { diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h index 99f926f44cf..a3ac277586c 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h @@ -31,6 +31,7 @@ #include "vec/columns/column_object.h" #include "vec/columns/subcolumn_tree.h" #include "vec/common/assert_cast.h" +#include "vec/data_types/data_type.h" #include "vec/data_types/data_type_object.h" #include "vec/data_types/data_type_string.h" #include "vec/json/path_in_data.h" @@ -210,8 +211,11 @@ private: // encodes sparse columns that are not materialized class ExtractReader : public ColumnIterator { public: - ExtractReader(const TabletColumn& col, std::unique_ptr<StreamReader>&& root_reader) - : _col(col), _root_reader(std::move(root_reader)) {} + ExtractReader(const TabletColumn& col, std::unique_ptr<StreamReader>&& root_reader, + vectorized::DataTypePtr target_type_hint) + : _col(col), + _root_reader(std::move(root_reader)), + _target_type_hint(target_type_hint) {} Status init(const ColumnIteratorOptions& opts) override; @@ -232,6 +236,7 @@ private: TabletColumn _col; // may shared among different column iterators std::unique_ptr<StreamReader> _root_reader; + vectorized::DataTypePtr _target_type_hint; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 1ae3f7acc7f..9a2a987b910 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -340,8 +340,9 @@ vectorized::DataTypePtr Segment::get_data_type_of(vectorized::PathInData path, b // Path has higher priority if (!path.empty()) { auto node = _sub_column_tree.find_leaf(path); + auto sparse_node = _sparse_column_tree.find_exact(path); if (node) { - if (ignore_children || node->children.empty()) { + if (ignore_children || (node->children.empty() && sparse_node == nullptr)) { return node->data.file_column_type; } } @@ -395,17 +396,33 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) { if (iter == column_path_to_footer_ordinal.end()) { continue; } + const ColumnMetaPB& column_pb = footer.columns(iter->second); ColumnReaderOptions opts; opts.kept_in_memory = _tablet_schema->is_in_memory(); std::unique_ptr<ColumnReader> reader; - RETURN_IF_ERROR(ColumnReader::create(opts, footer.columns(iter->second), footer.num_rows(), - _file_reader, &reader)); + RETURN_IF_ERROR( + ColumnReader::create(opts, column_pb, footer.num_rows(), _file_reader, &reader)); _sub_column_tree.add( iter->first, - SubcolumnReader {std::move(reader), - vectorized::DataTypeFactory::instance().create_data_type( - footer.columns(iter->second))}); + SubcolumnReader { + std::move(reader), + vectorized::DataTypeFactory::instance().create_data_type(column_pb)}); + // init sparse columns paths and type info + for (uint32_t ordinal = 0; ordinal < column_pb.sparse_columns().size(); ++ordinal) { + auto& spase_column_pb = column_pb.sparse_columns(ordinal); + if (spase_column_pb.has_column_path_info()) { + vectorized::PathInData path; + path.from_protobuf(spase_column_pb.column_path_info()); + // Read from root column, so reader is nullptr + _sparse_column_tree.add( + path, + SubcolumnReader {nullptr, + vectorized::DataTypeFactory::instance().create_data_type( + spase_column_pb)}); + } + } } + return Status::OK(); } @@ -426,6 +443,22 @@ static Status new_default_iterator(const TabletColumn& tablet_column, return Status::OK(); } +Status Segment::_new_iterator_with_variant_root(const TabletColumn& tablet_column, + std::unique_ptr<ColumnIterator>* iter, + const SubcolumnColumnReaders::Node* root, + vectorized::DataTypePtr target_type_hint) { + ColumnIterator* it; + RETURN_IF_ERROR(root->data.reader->new_iterator(&it)); + auto stream_iter = new ExtractReader( + tablet_column, + std::make_unique<StreamReader>(root->data.file_column_type->create_column(), + std::unique_ptr<ColumnIterator>(it), + root->data.file_column_type), + target_type_hint); + iter->reset(stream_iter); + return Status::OK(); +} + Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, std::unique_ptr<ColumnIterator>* iter, const StorageReadOptions* opt) { @@ -438,6 +471,7 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, } auto root = _sub_column_tree.find_leaf(root_path); auto node = _sub_column_tree.find_exact(tablet_column.path_info()); + auto sparse_node = _sparse_column_tree.find_exact(tablet_column.path_info()); if (opt != nullptr && opt->io_ctx.reader_type == ReaderType::READER_ALTER_TABLE) { CHECK(tablet_column.is_variant_type()); if (node == nullptr) { @@ -456,9 +490,15 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, if (opt == nullptr || opt->io_ctx.reader_type != ReaderType::READER_QUERY) { // Could be compaction ..etc and read flat leaves nodes data - auto node = _sub_column_tree.find_leaf(tablet_column.path_info()); + const auto* node = _sub_column_tree.find_leaf(tablet_column.path_info()); if (!node) { - RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); + // sparse_columns have this path, read from root + if (sparse_node != nullptr && sparse_node->is_leaf_node()) { + RETURN_IF_ERROR(_new_iterator_with_variant_root( + tablet_column, iter, root, sparse_node->data.file_column_type)); + } else { + RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); + } return Status::OK(); } ColumnIterator* it; @@ -467,37 +507,32 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, return Status::OK(); } - // Init iterators with extra path info. - // TODO If this segment does not contain any data correspond to the relatate path, - // then we could optimize to generate a default iterator - // This file doest not contain this column, so only read from sparse column - // to avoid read amplification - if (node != nullptr && node->is_scalar() && node->children.empty()) { - // Direct read extracted columns - const auto* node = _sub_column_tree.find_leaf(tablet_column.path_info()); - ColumnIterator* it; - RETURN_IF_ERROR(node->data.reader->new_iterator(&it)); - iter->reset(it); - } else if (node != nullptr && !node->children.empty()) { - // Create reader with hirachical data - RETURN_IF_ERROR( - HierarchicalDataReader::create(iter, tablet_column.path_info(), node, root)); + if (node != nullptr) { + if (node->is_leaf_node() && sparse_node == nullptr) { + // Node contains column without any child sub columns and no corresponding sparse columns + // Direct read extracted columns + const auto* node = _sub_column_tree.find_leaf(tablet_column.path_info()); + ColumnIterator* it; + RETURN_IF_ERROR(node->data.reader->new_iterator(&it)); + iter->reset(it); + } else { + // Node contains column with children columns or has correspoding sparse columns + // Create reader with hirachical data + RETURN_IF_ERROR( + HierarchicalDataReader::create(iter, tablet_column.path_info(), node, root)); + } } else { - // If file only exist column `v.a` and `v` but target path is `v.b`, read only read and parse root column - if (root == nullptr) { + // No such node, read from either sparse column or default column + if (sparse_node != nullptr) { + // sparse columns have this path, read from root + RETURN_IF_ERROR(_new_iterator_with_variant_root(tablet_column, iter, root, + sparse_node->data.file_column_type)); + } else { // No such variant column in this segment, get a default one RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); - return Status::OK(); } - ColumnIterator* it; - RETURN_IF_ERROR(root->data.reader->new_iterator(&it)); - auto stream_iter = new ExtractReader( - tablet_column, - std::make_unique<StreamReader>(root->data.file_column_type->create_column(), - std::unique_ptr<ColumnIterator>(it), - root->data.file_column_type)); - iter->reset(stream_iter); } + return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index 47ccab71be6..dc0f18ff02d 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -194,6 +194,12 @@ private: Status _load_pk_bloom_filter(); ColumnReader* _get_column_reader(const TabletColumn& col); + // Get Iterator which will read variant root column and extract with paths and types info + Status _new_iterator_with_variant_root(const TabletColumn& tablet_column, + std::unique_ptr<ColumnIterator>* iter, + const SubcolumnColumnReaders::Node* root, + vectorized::DataTypePtr target_type_hint); + Status _load_index_impl(); private: @@ -223,6 +229,9 @@ private: // for variants. SubcolumnColumnReaders _sub_column_tree; + // each sprase column's path and types info + SubcolumnColumnReaders _sparse_column_tree; + // used to guarantee that short key index will be loaded at most once in a thread-safe way DorisCallOnce<Status> _load_index_once; // used to guarantee that primary key bloom filter will be loaded at most once in a thread-safe way diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 5434c23ef8e..7613642ef99 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -149,6 +149,10 @@ void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id, init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i), tablet_schema); } + // add sparse column to footer + for (uint32_t i = 0; i < column.num_sparse_columns(); i++) { + init_column_meta(meta->add_sparse_columns(), -1, column.sparse_column_at(i), tablet_schema); + } } Status SegmentWriter::init() { diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 0aa8cea81ad..cc208d3903e 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -121,6 +121,10 @@ void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t colum for (uint32_t i = 0; i < column.get_subtype_count(); ++i) { _init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i)); } + // add sparse column to footer + for (uint32_t i = 0; i < column.num_sparse_columns(); i++) { + _init_column_meta(meta->add_sparse_columns(), -1, column.sparse_column_at(i)); + } } Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column) { diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index bd54af19603..058299cb865 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -552,6 +552,12 @@ void TabletColumn::init_from_pb(const ColumnPB& column) { _column_path.from_protobuf(column.column_path_info()); _parent_col_unique_id = column.column_path_info().parrent_column_unique_id(); } + for (auto& column_pb : column.sparse_columns()) { + TabletColumn column; + column.init_from_pb(column_pb); + _sparse_cols.emplace_back(std::move(column)); + _num_sparse_columns++; + } } TabletColumn TabletColumn::create_materialized_variant_column(const std::string& root, @@ -612,6 +618,10 @@ void TabletColumn::to_schema_pb(ColumnPB* column) const { // CHECK_GT(_parent_col_unique_id, 0); _column_path.to_protobuf(column->mutable_column_path_info(), _parent_col_unique_id); } + for (auto& col : _sparse_cols) { + ColumnPB* sparse_column = column->add_sparse_columns(); + col.to_schema_pb(sparse_column); + } } void TabletColumn::add_sub_column(TabletColumn& sub_column) { @@ -812,6 +822,11 @@ void TabletSchema::append_column(TabletColumn column, ColumnType col_type) { _num_columns++; } +void TabletColumn::append_sparse_column(TabletColumn column) { + _sparse_cols.push_back(std::move(column)); + _num_sparse_columns++; +} + void TabletSchema::append_index(TabletIndex index) { _indexes.push_back(std::move(index)); } @@ -1134,6 +1149,10 @@ const std::vector<TabletColumn>& TabletSchema::columns() const { return _cols; } +const std::vector<TabletColumn>& TabletColumn::sparse_columns() const { + return _sparse_cols; +} + std::vector<TabletColumn>& TabletSchema::mutable_columns() { return _cols; } @@ -1143,10 +1162,20 @@ const TabletColumn& TabletSchema::column(size_t ordinal) const { return _cols[ordinal]; } +const TabletColumn& TabletColumn::sparse_column_at(size_t ordinal) const { + DCHECK(ordinal < _sparse_cols.size()) + << "ordinal:" << ordinal << ", _num_columns:" << _sparse_cols.size(); + return _sparse_cols[ordinal]; +} + const TabletColumn& TabletSchema::column_by_uid(int32_t col_unique_id) const { return _cols.at(_field_id_to_index.at(col_unique_id)); } +TabletColumn& TabletSchema::mutable_column_by_uid(int32_t col_unique_id) { + return _cols.at(_field_id_to_index.at(col_unique_id)); +} + void TabletSchema::update_indexes_from_thrift(const std::vector<doris::TOlapTableIndex>& tindexes) { std::vector<TabletIndex> indexes; for (auto& tindex : tindexes) { diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index a515095814f..5e735b22a93 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -149,6 +149,11 @@ public: void set_parent_unique_id(int32_t col_unique_id) { _parent_col_unique_id = col_unique_id; } std::shared_ptr<const vectorized::IDataType> get_vec_type() const; + void append_sparse_column(TabletColumn column); + const TabletColumn& sparse_column_at(size_t oridinal) const; + const std::vector<TabletColumn>& sparse_columns() const; + size_t num_sparse_columns() const { return _num_sparse_columns; } + private: int32_t _unique_id = -1; std::string _col_name; @@ -183,6 +188,14 @@ private: bool _result_is_nullable = false; vectorized::PathInData _column_path; + + // Record information about columns merged into a sparse column within a variant + // `{"id": 100, "name" : "jack", "point" : 3.9}` + // If the information mentioned above is inserted into the variant column, + // 'id' and 'name' are correctly extracted, while 'point' is merged into the sparse column due to its sparsity. + // The path_info and type of 'point' will be recorded using the TabletColumn. + std::vector<TabletColumn> _sparse_cols; + size_t _num_sparse_columns = 0; }; bool operator==(const TabletColumn& a, const TabletColumn& b); @@ -262,6 +275,7 @@ public: const TabletColumn& column(const std::string& field_name) const; Status have_column(const std::string& field_name) const; const TabletColumn& column_by_uid(int32_t col_unique_id) const; + TabletColumn& mutable_column_by_uid(int32_t col_unique_id); const std::vector<TabletColumn>& columns() const; std::vector<TabletColumn>& mutable_columns(); size_t num_columns() const { return _num_columns; } @@ -392,6 +406,7 @@ private: SortType _sort_type = SortType::LEXICAL; size_t _sort_col_num = 0; std::vector<TabletColumn> _cols; + std::vector<TabletIndex> _indexes; std::unordered_map<std::string, int32_t> _field_name_to_index; std::unordered_map<int32_t, int32_t> _field_id_to_index; diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 407419ff5c4..7e50e34bc9d 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -34,6 +34,7 @@ #include <vector> #include "common/status.h" +#include "olap/tablet_schema.h" #include "vec/columns/column.h" #include "vec/columns/subcolumn_tree.h" #include "vec/common/cow.h" @@ -317,6 +318,8 @@ public: const Subcolumns& get_subcolumns() const { return subcolumns; } + const Subcolumns& get_sparse_subcolumns() const { return sparse_columns; } + Subcolumns& get_subcolumns() { return subcolumns; } PathsInData getKeys() const; @@ -338,7 +341,8 @@ public: void remove_subcolumns(const std::unordered_set<std::string>& keys); - void finalize(bool ignore_sparse); + // use sparse_subcolumns_schema to record sparse column's path info and type + void finalize(bool ignore_sparser); /// Finalizes all subcolumns. void finalize() override; diff --git a/be/src/vec/columns/subcolumn_tree.h b/be/src/vec/columns/subcolumn_tree.h index 53391bf797f..6b5dd996364 100644 --- a/be/src/vec/columns/subcolumn_tree.h +++ b/be/src/vec/columns/subcolumn_tree.h @@ -55,7 +55,8 @@ public: bool is_nested() const { return kind == NESTED; } bool is_scalar() const { return kind == SCALAR; } - bool is_scalar_without_children() const { return kind == SCALAR && children.empty(); } + + bool is_leaf_node() const { return kind == SCALAR && children.empty(); } // Only modify data and kind void modify(std::shared_ptr<Node>&& other) { diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index b943f2e637b..e532662f6af 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -173,6 +173,7 @@ Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, Co // We convert column string to jsonb type just add a string jsonb field to dst column instead of parse // each line in original string column. ctx->set_string_as_jsonb_string(true); + ctx->set_jsonb_string_as_string(true); tmp_block.insert({nullptr, type, arg.name}); RETURN_IF_ERROR( function->execute(ctx.get(), tmp_block, {0}, result_column, arg.column->size())); @@ -252,20 +253,10 @@ TabletColumn get_least_type_column(const TabletColumn& original, const DataTypeP return result_column; } -void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, - TabletSchemaSPtr& common_schema, int32_t variant_col_unique_id) { - // Types of subcolumns by path from all tuples. - std::unordered_map<PathInData, DataTypes, PathInData::Hash> subcolumns_types; - for (const TabletSchemaSPtr& schema : schemas) { - for (const TabletColumn& col : schema->columns()) { - // Get subcolumns of this variant - if (!col.path_info().empty() && col.parent_unique_id() > 0 && - col.parent_unique_id() == variant_col_unique_id) { - subcolumns_types[col.path_info()].push_back( - DataTypeFactory::instance().create_data_type(col, col.is_nullable())); - } - } - } +void update_least_schema_internal( + const std::unordered_map<PathInData, DataTypes, PathInData::Hash>& subcolumns_types, + TabletSchemaSPtr& common_schema, bool update_sparse_column, int32_t variant_col_unique_id, + std::unordered_set<PathInData, PathInData::Hash>* path_set = nullptr) { PathsInData tuple_paths; DataTypes tuple_types; // Get the least common type for all paths. @@ -297,7 +288,6 @@ void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, } CHECK_EQ(tuple_paths.size(), tuple_types.size()); - std::string variant_col_name = common_schema->column_by_uid(variant_col_unique_id).name(); // Append all common type columns of this variant for (int i = 0; i < tuple_paths.size(); ++i) { TabletColumn common_column; @@ -306,9 +296,76 @@ void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, ExtraInfo {.unique_id = -1, .parent_unique_id = variant_col_unique_id, .path_info = tuple_paths[i]}); - // set ColumnType::VARIANT to occupy _field_path_to_index - common_schema->append_column(common_column, TabletSchema::ColumnType::VARIANT); + if (update_sparse_column) { + common_schema->mutable_column_by_uid(variant_col_unique_id) + .append_sparse_column(common_column); + } else { + common_schema->append_column(common_column); + } + if (path_set != nullptr) { + path_set->insert(tuple_paths[i]); + } + } +} + +void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, + TabletSchemaSPtr& common_schema, int32_t variant_col_unique_id, + std::unordered_set<PathInData, PathInData::Hash>* path_set) { + // Types of subcolumns by path from all tuples. + std::unordered_map<PathInData, DataTypes, PathInData::Hash> subcolumns_types; + for (const TabletSchemaSPtr& schema : schemas) { + for (const TabletColumn& col : schema->columns()) { + // Get subcolumns of this variant + if (!col.path_info().empty() && col.parent_unique_id() > 0 && + col.parent_unique_id() == variant_col_unique_id) { + subcolumns_types[col.path_info()].push_back( + DataTypeFactory::instance().create_data_type(col, col.is_nullable())); + } + } + } + for (const TabletSchemaSPtr& schema : schemas) { + if (schema->field_index(variant_col_unique_id) == -1) { + // maybe dropped + continue; + } + for (const TabletColumn& col : + schema->mutable_column_by_uid(variant_col_unique_id).sparse_columns()) { + // Get subcolumns of this variant + if (!col.path_info().empty() && col.parent_unique_id() > 0 && + col.parent_unique_id() == variant_col_unique_id && + // this column have been found in origin columns + subcolumns_types.find(col.path_info()) != subcolumns_types.end()) { + subcolumns_types[col.path_info()].push_back( + DataTypeFactory::instance().create_data_type(col, col.is_nullable())); + } + } + } + update_least_schema_internal(subcolumns_types, common_schema, false, variant_col_unique_id, + path_set); +} + +void update_least_sparse_column(const std::vector<TabletSchemaSPtr>& schemas, + TabletSchemaSPtr& common_schema, int32_t variant_col_unique_id, + const std::unordered_set<PathInData, PathInData::Hash>& path_set) { + // Types of subcolumns by path from all tuples. + std::unordered_map<PathInData, DataTypes, PathInData::Hash> subcolumns_types; + for (const TabletSchemaSPtr& schema : schemas) { + if (schema->field_index(variant_col_unique_id) == -1) { + // maybe dropped + continue; + } + for (const TabletColumn& col : + schema->mutable_column_by_uid(variant_col_unique_id).sparse_columns()) { + // Get subcolumns of this variant + if (!col.path_info().empty() && col.parent_unique_id() > 0 && + col.parent_unique_id() == variant_col_unique_id && + path_set.find(col.path_info()) == path_set.end()) { + subcolumns_types[col.path_info()].push_back( + DataTypeFactory::instance().create_data_type(col, col.is_nullable())); + } + } } + update_least_schema_internal(subcolumns_types, common_schema, true, variant_col_unique_id); } void inherit_tablet_index(TabletSchemaSPtr& schema) { @@ -381,8 +438,24 @@ Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, build_schema_without_extracted_columns(base_schema); } + // schema consists of two parts, static part of columns, variant columns (include extracted columns and sparse columns) + // static extracted sparse + // | --------- | ----------- | ------------| + // If a sparse column in one schema's is found in another schema's extracted columns + // move it out of the sparse column and merge it into the extracted column. + // static extracted sparse + // | --------- | ----------- ------------ ------- ---------| ------------| + // schema 1: k (int) v:a (float) v:c (string) v:b (int) + // schema 2: k (int) v:a (float) v:b (bigint) v:d (string) + // schema 3: k (int) v:a (double) v:b (smallint) + // result : k (int) v:a (double) v:b (bigint) v:c (string) v:d (string) for (int32_t unique_id : variant_column_unique_id) { - update_least_common_schema(schemas, output_schema, unique_id); + std::unordered_set<PathInData, PathInData::Hash> path_set; + // 1. cast extracted column to common type + // path set is used to record the paths of those sparse columns that have been merged into the extracted columns, eg: v:b + update_least_common_schema(schemas, output_schema, unique_id, &path_set); + // 2. cast sparse column to common type, exclude the columns from the path set + update_least_sparse_column(schemas, output_schema, unique_id, path_set); } inherit_tablet_index(output_schema); @@ -498,6 +571,8 @@ void finalize_variant_columns(Block& block, const std::vector<int>& variant_pos, ? assert_cast<ColumnObject&>( assert_cast<ColumnNullable&>(column_ref).get_nested_column()) : assert_cast<ColumnObject&>(column_ref); + // Record information about columns merged into a sparse column within a variant + std::vector<TabletColumn> sparse_subcolumns_schema; column.finalize(ignore_sparse); } } @@ -517,6 +592,98 @@ void encode_variant_sparse_subcolumns(Block& block, const std::vector<int>& vari } } +void _append_column(const TabletColumn& parent_variant, + const ColumnObject::Subcolumns::NodePtr& subcolumn, TabletSchemaSPtr& to_append, + bool is_sparse) { + // If column already exist in original tablet schema, then we pick common type + // and cast column to common type, and modify tablet column to common type, + // otherwise it's a new column + const std::string& column_name = + parent_variant.name_lower_case() + "." + subcolumn->path.get_path(); + const vectorized::DataTypePtr& final_data_type_from_object = + subcolumn->data.get_least_common_type(); + vectorized::PathInDataBuilder full_path_builder; + auto full_path = full_path_builder.append(parent_variant.name_lower_case(), false) + .append(subcolumn->path.get_parts(), false) + .build(); + TabletColumn tablet_column = vectorized::schema_util::get_column_by_type( + final_data_type_from_object, column_name, + vectorized::schema_util::ExtraInfo {.unique_id = -1, + .parent_unique_id = parent_variant.unique_id(), + .path_info = full_path}); + + if (!is_sparse) { + to_append->append_column(std::move(tablet_column)); + } else { + to_append->mutable_column_by_uid(parent_variant.unique_id()) + .append_sparse_column(std::move(tablet_column)); + } +} + +void rebuild_schema_and_block(const TabletSchemaSPtr& original, + const std::vector<int>& variant_positions, Block& flush_block, + TabletSchemaSPtr& flush_schema) { + // rebuild schema and block with variant extracted columns + + // 1. Flatten variant column into flat columns, append flatten columns to the back of original Block and TabletSchema + // those columns are extracted columns, leave none extracted columns remain in original variant column, which is + // JSONB format at present. + // 2. Collect columns that need to be added or modified when data type changes or new columns encountered + for (size_t variant_pos : variant_positions) { + auto column_ref = flush_block.get_by_position(variant_pos).column; + bool is_nullable = column_ref->is_nullable(); + const vectorized::ColumnObject& object_column = assert_cast<vectorized::ColumnObject&>( + remove_nullable(column_ref)->assume_mutable_ref()); + const TabletColumn& parent_column = original->columns()[variant_pos]; + CHECK(object_column.is_finalized()); + std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root; + // common extracted columns + for (const auto& entry : object_column.get_subcolumns()) { + if (entry->path.empty()) { + // root + root = entry; + continue; + } + _append_column(parent_column, entry, flush_schema, false); + const std::string& column_name = + parent_column.name_lower_case() + "." + entry->path.get_path(); + flush_block.insert({entry->data.get_finalized_column_ptr()->get_ptr(), + entry->data.get_least_common_type(), column_name}); + } + + // add sparse columns to flush_schema + for (const auto& entry : object_column.get_sparse_subcolumns()) { + _append_column(parent_column, entry, flush_schema, true); + } + + // Create new variant column and set root column + auto obj = vectorized::ColumnObject::create(true, false); + // '{}' indicates a root path + static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column( + {}, root->data.get_finalized_column_ptr()->assume_mutable(), + root->data.get_least_common_type()); + // // set for rowstore + if (original->store_row_column()) { + static_cast<vectorized::ColumnObject*>(obj.get())->set_rowstore_column( + object_column.get_rowstore_column()); + } + vectorized::ColumnPtr result = obj->get_ptr(); + if (is_nullable) { + const auto& null_map = assert_cast<const vectorized::ColumnNullable&>(*column_ref) + .get_null_map_column_ptr(); + result = vectorized::ColumnNullable::create(result, null_map); + } + flush_block.get_by_position(variant_pos).column = result; + vectorized::PathInDataBuilder full_root_path_builder; + auto full_root_path = + full_root_path_builder.append(parent_column.name_lower_case(), false).build(); + flush_schema->mutable_columns()[variant_pos].set_path_info(full_root_path); + VLOG_DEBUG << "set root_path : " << full_root_path.get_path(); + } + + vectorized::schema_util::inherit_tablet_index(flush_schema); +} + // --------------------------- Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst) { auto type_string = std::make_shared<DataTypeString>(); diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index c626b875c85..6948b548cc0 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -86,10 +86,10 @@ struct ParseContext { // record an extract json column, used for encoding row store bool record_raw_json_column = false; }; -// thread steps to parse and encode variant columns into flatterned columns +// three steps to parse and encode variant columns into flatterned columns // 1. parse variant from raw json string // 2. finalize variant column to each subcolumn least commn types, default ignore sparse sub columns -// 2. encode sparse sub columns +// 3. encode sparse sub columns Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& variant_pos, const ParseContext& ctx); Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos, @@ -109,11 +109,24 @@ Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, // Get least common types for extracted columns which has Path info, // with a speicified variant column's unique id void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas, - TabletSchemaSPtr& common_schema, int32_t variant_col_unique_id); + TabletSchemaSPtr& common_schema, int32_t variant_col_unique_id, + std::unordered_set<PathInData, PathInData::Hash>* path_set); + +void update_least_sparse_column(const std::vector<TabletSchemaSPtr>& schemas, + TabletSchemaSPtr& common_schema, int32_t variant_col_unique_id, + const std::unordered_set<PathInData, PathInData::Hash>& path_set); // inherit index info from it's parent column void inherit_tablet_index(TabletSchemaSPtr& schema); +// Rebuild schema from original schema by extend dynamic columns generated from ColumnObject. +// Block consists of two parts, dynamic part of columns and static part of columns. +// static extracted +// | --------- | ----------- | +// The static ones are original tablet_schame columns +void rebuild_schema_and_block(const TabletSchemaSPtr& original, const std::vector<int>& variant_pos, + Block& flush_block, TabletSchemaSPtr& flush_schema); + // Extract json data from source with path Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst); diff --git a/be/src/vec/data_types/data_type_factory.cpp b/be/src/vec/data_types/data_type_factory.cpp index d349fffe99a..59d2113008f 100644 --- a/be/src/vec/data_types/data_type_factory.cpp +++ b/be/src/vec/data_types/data_type_factory.cpp @@ -585,8 +585,8 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) { DataTypePtr DataTypeFactory::create_data_type(const segment_v2::ColumnMetaPB& pcolumn) { DataTypePtr nested = nullptr; if (pcolumn.type() == static_cast<int>(FieldType::OLAP_FIELD_TYPE_ARRAY)) { - // Item subcolumn and length subcolumn - DCHECK_GE(pcolumn.children_columns().size(), 2) << pcolumn.DebugString(); + // Item subcolumn and length subcolumn, for sparse columns only subcolumn + DCHECK_GE(pcolumn.children_columns().size(), 1) << pcolumn.DebugString(); nested = std::make_shared<DataTypeArray>(create_data_type(pcolumn.children_columns(0))); } else if (pcolumn.type() == static_cast<int>(FieldType::OLAP_FIELD_TYPE_MAP)) { DCHECK_GE(pcolumn.children_columns().size(), 2) << pcolumn.DebugString(); diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 2bf8470719c..66e9f084992 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -208,6 +208,8 @@ message ColumnPB { optional bool result_is_nullable = 19; // persist info for PathInData that represents path in document, e.g. JSON. optional segment_v2.ColumnPathInfo column_path_info = 20; + // sparse column within a variant column + repeated ColumnPB sparse_columns = 21; } enum IndexType { diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index 4e4ea560d5c..ad0002697dc 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -191,6 +191,8 @@ message ColumnMetaPB { optional bytes default_value = 14; // ColumnMessage.default_value ? optional int32 precision = 15; // ColumnMessage.precision optional int32 frac = 16; // ColumnMessag + + repeated ColumnMetaPB sparse_columns = 17; // sparse column within a variant column } message PrimaryKeyIndexMetaPB { diff --git a/regression-test/data/variant_p0/compaction_sparse_column.out b/regression-test/data/variant_p0/compaction_sparse_column.out new file mode 100644 index 00000000000..3b2ee1dbb97 --- /dev/null +++ b/regression-test/data/variant_p0/compaction_sparse_column.out @@ -0,0 +1,97 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_b_bfcompact -- +12291 + +-- !select_xxxx_bfcompact -- +12291 + +-- !select_point_bfcompact -- +3 + +-- !select_1_bfcompact -- +4096 + +-- !select_2_bfcompact -- +4096 + +-- !select_3_bfcompact -- +4096 + +-- !select_4_bfcompact -- +1 + +-- !select_5_bfcompact -- +1 + +-- !select_6_bfcompact -- +1 + +-- !select_1_1_bfcompact -- +1 + +-- !select_2_1_bfcompact -- +1 + +-- !select_3_1_bfcompact -- +1 + +-- !select_4_1_bfcompact -- +4096 + +-- !select_5_1_bfcompact -- +4096 + +-- !select_6_1_bfcompact -- +4096 + +-- !select_all_bfcompact -- +3 {"a":1234,"point":1,"xxxx":"ddddd"} + +-- !select_b -- +12291 + +-- !select_xxxx -- +12291 + +-- !select_point -- +3 + +-- !select_1 -- +4096 + +-- !select_2 -- +4096 + +-- !select_3 -- +4096 + +-- !select_4 -- +1 + +-- !select_5 -- +1 + +-- !select_6 -- +1 + +-- !select_1_1 -- +1 + +-- !select_2_1 -- +1 + +-- !select_3_1 -- +1 + +-- !select_4_1 -- +4096 + +-- !select_5_1 -- +4096 + +-- !select_6_1 -- +4096 + +-- !select_all -- +3 {"a":1234,"xxxx":"ddddd","point":1} + diff --git a/regression-test/data/variant_p0/load.out b/regression-test/data/variant_p0/load.out index dd77b508b0b..6798ca3868d 100644 --- a/regression-test/data/variant_p0/load.out +++ b/regression-test/data/variant_p0/load.out @@ -97,7 +97,7 @@ \N 123456789101112 -- !sql_14 -- -\N 123456 {"A":123456} +123456 {"A":123456} -- !sql_18 -- \N 123 {"A":123} \N diff --git a/regression-test/data/variant_p0/test_compaction_extract_root.out b/regression-test/data/variant_p0/test_compaction_extract_root.out new file mode 100644 index 00000000000..c8a0cf24017 --- /dev/null +++ b/regression-test/data/variant_p0/test_compaction_extract_root.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_b_1 -- +12291 + +-- !select_b_2 -- +12288 + +-- !select_1_bfcompact -- +{"code":2,"state":"open"} + +-- !select_b_3 -- +12288 + +-- !select_b_4 -- +12288 + +-- !select_1 -- +\N + diff --git a/regression-test/suites/variant_github_events_p0/load.groovy b/regression-test/suites/variant_github_events_p0/load.groovy index 889c4c6b07e..1e8fe8ced1c 100644 --- a/regression-test/suites/variant_github_events_p0/load.groovy +++ b/regression-test/suites/variant_github_events_p0/load.groovy @@ -16,6 +16,16 @@ // under the License. suite("regression_test_variant_github_events_p0", "variant_type"){ + def set_be_config = { key, value -> + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } sql "set enable_memtable_on_sink_node = true" def load_json_data = {table_name, file_name -> // load the json data @@ -58,6 +68,7 @@ suite("regression_test_variant_github_events_p0", "variant_type"){ DISTRIBUTED BY HASH(k) BUCKETS 4 properties("replication_num" = "1", "disable_auto_compaction" = "false"); """ + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") // 2015 load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""") load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""") diff --git a/regression-test/suites/variant_p0/compaction_sparse_column.groovy b/regression-test/suites/variant_p0/compaction_sparse_column.groovy new file mode 100644 index 00000000000..6a26c043e22 --- /dev/null +++ b/regression-test/suites/variant_p0/compaction_sparse_column.groovy @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_compaction_sparse_column", "nonConcurrent") { + def tableName = "test_compaction" + + try { + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + boolean disableAutoCompaction = true + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) + } + } + + def set_be_config = { key, value -> + (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + + sql """insert into ${tableName} select 0, '{"a": 11245, "b" : 42000}' as json_str + union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "aaaaa"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + + sql """insert into ${tableName} select 1, '{"a": 11245, "b" : 42001}' as json_str + union all select 1, '{"a": 1123}' as json_str union all select 1, '{"a" : 1234, "xxxx" : "bbbbb"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + + sql """insert into ${tableName} select 2, '{"a": 11245, "b" : 42002}' as json_str + union all select 2, '{"a": 1123}' as json_str union all select 2, '{"a" : 1234, "xxxx" : "ccccc"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + + sql """insert into ${tableName} select 3, '{"a" : 1234, "point" : 1, "xxxx" : "ddddd"}' as json_str + union all select 3, '{"a": 1123}' as json_str union all select 3, '{"a": 11245, "b" : 42003}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + + sql """insert into ${tableName} select 4, '{"a" : 1234, "xxxx" : "eeeee", "point" : 5}' as json_str + union all select 4, '{"a": 1123}' as json_str union all select 4, '{"a": 11245, "b" : 42004}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + + sql """insert into ${tableName} select 5, '{"a" : 1234, "xxxx" : "fffff", "point" : 42000}' as json_str + union all select 5, '{"a": 1123}' as json_str union all select 5, '{"a": 11245, "b" : 42005}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + qt_select_b_bfcompact """ SELECT count(cast(v['b'] as int)) FROM ${tableName};""" + qt_select_xxxx_bfcompact """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName};""" + qt_select_point_bfcompact """ SELECT count(cast(v['point'] as bigint)) FROM ${tableName};""" + qt_select_1_bfcompact """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'aaaaa';""" + qt_select_2_bfcompact """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'bbbbb';""" + qt_select_3_bfcompact """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'ccccc';""" + qt_select_4_bfcompact """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'eeeee';""" + qt_select_5_bfcompact """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'ddddd';""" + qt_select_6_bfcompact """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'fffff';""" + qt_select_1_1_bfcompact """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42000;""" + qt_select_2_1_bfcompact """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42001;""" + qt_select_3_1_bfcompact """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42002;""" + qt_select_4_1_bfcompact """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42003;""" + qt_select_5_1_bfcompact """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42004;""" + qt_select_6_1_bfcompact """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42005;""" + qt_select_all_bfcompact """SELECT * from ${tableName} where (cast(v['point'] as int) = 1);""" + + //TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus + String[][] tablets = sql """ show tablets from ${tableName}; """ + + // trigger compactions for all tablets in ${tableName} + for (String[] tablet in tablets) { + String tablet_id = tablet[0] + backend_id = tablet[2] + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + if (compactJson.status.toLowerCase() == "fail") { + assertEquals(disableAutoCompaction, false) + logger.info("Compaction was done automatically!") + } + if (disableAutoCompaction) { + assertEquals("success", compactJson.status.toLowerCase()) + } + } + + // wait for all compactions done + for (String[] tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet[0] + backend_id = tablet[2] + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + int rowCount = 0 + for (String[] tablet in tablets) { + String tablet_id = tablet[0] + def compactionStatusUrlIndex = 18 + (code, out, err) = curl("GET", tablet[compactionStatusUrlIndex]) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List<String>) tabletJson.rowsets) { + rowCount += Integer.parseInt(rowset.split(" ")[1]) + } + } + assert (rowCount <= 8) + qt_select_b """ SELECT count(cast(v['b'] as int)) FROM ${tableName};""" + qt_select_xxxx """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName};""" + qt_select_point """ SELECT count(cast(v['point'] as bigint)) FROM ${tableName};""" + qt_select_1 """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'aaaaa';""" + qt_select_2 """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'bbbbb';""" + qt_select_3 """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'ccccc';""" + qt_select_4 """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'eeeee';""" + qt_select_5 """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'ddddd';""" + qt_select_6 """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName} where cast(v['xxxx'] as string) = 'fffff';""" + qt_select_1_1 """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42000;""" + qt_select_2_1 """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42001;""" + qt_select_3_1 """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42002;""" + qt_select_4_1 """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42003;""" + qt_select_5_1 """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42004;""" + qt_select_6_1 """ SELECT count(cast(v['b'] as int)) FROM ${tableName} where cast(v['b'] as int) = 42005;""" + qt_select_all """SELECT * from ${tableName} where (cast(v['point'] as int) = 1);""" + } finally { + // try_sql("DROP TABLE IF EXISTS ${tableName}") + } +} diff --git a/regression-test/suites/variant_p0/load.groovy b/regression-test/suites/variant_p0/load.groovy index 85e8bfe865c..3e40bcc47a1 100644 --- a/regression-test/suites/variant_p0/load.groovy +++ b/regression-test/suites/variant_p0/load.groovy @@ -158,7 +158,7 @@ suite("regression_test_variant", "nonConcurrent"){ qt_sql_12 "select v['A'], v from ${table_name} where cast(v['A'] as bigint) > 1 order by k" // ----%%---- qt_sql_13 "select v['a'], v['A'] from simple_select_variant where 1=1 and cast(v['a'] as json) is null and cast(v['A'] as bigint) >= 1 order by k;" - qt_sql_14 """select v['a'], v['A'], v from simple_select_variant where cast(v['A'] as bigint) > 0 and cast(v['A'] as bigint) = 123456 limit 1;""" + qt_sql_14 """select v['A'], v from simple_select_variant where cast(v['A'] as bigint) > 0 and cast(v['A'] as bigint) = 123456 limit 1;""" sql """insert into simple_select_variant values (12, '{"oamama": 1.1}')""" qt_sql_18 "select cast(v['a'] as text), v['A'], v, v['oamama'] from simple_select_variant where cast(v['oamama'] as double) is null order by k;" diff --git a/regression-test/suites/variant_p0/test_compaction_extract_root.groovy b/regression-test/suites/variant_p0/test_compaction_extract_root.groovy new file mode 100644 index 00000000000..f3d6da37557 --- /dev/null +++ b/regression-test/suites/variant_p0/test_compaction_extract_root.groovy @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_compaction_extract_root", "nonConcurrent") { + def tableName = "test_t" + + def set_be_config = { key, value -> + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + boolean disableAutoCompaction = true + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) + } + } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + + set_be_config.call("enable_vertical_segment_writer", "true") + sql """insert into ${tableName} select 0, '{"a": 11245, "b" : {"state" : "open", "code" : 2}}' as json_str + union all select 8, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "aaaaa"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + + sql """insert into ${tableName} select 1, '{"a": 11245, "b" : {"state" : "colse", "code" : 2}}' as json_str + union all select 1, '{"a": 1123}' as json_str union all select 1, '{"a" : 1234, "xxxx" : "bbbbb"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + + sql """insert into ${tableName} select 2, '{"a": 11245, "b" : {"state" : "flat", "code" : 3}}' as json_str + union all select 2, '{"a": 1123}' as json_str union all select 2, '{"a" : 1234, "xxxx" : "ccccc"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + + sql """insert into ${tableName} select 3, '{"a" : 1234, "xxxx" : 4, "point" : 5}' as json_str + union all select 3, '{"a": 1123}' as json_str union all select 3, '{"a": 11245, "b" : 42003}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + + sql """insert into ${tableName} select 4, '{"a" : 1234, "xxxx" : "eeeee", "point" : 5}' as json_str + union all select 4, '{"a": 1123}' as json_str union all select 4, '{"a": 11245, "b" : 42004}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + + sql """insert into ${tableName} select 5, '{"a" : 1234, "xxxx" : "fffff", "point" : 42000}' as json_str + union all select 5, '{"a": 1123}' as json_str union all select 5, '{"a": 11245, "b" : 42005}' as json_str from numbers("number" = "4096") limit 4096 ;""" + + // // fix cast to string tobe {} + qt_select_b_1 """ SELECT count(cast(v['b'] as string)) FROM ${tableName};""" + qt_select_b_2 """ SELECT count(cast(v['b'] as int)) FROM ${tableName};""" + + qt_select_1_bfcompact """select v['b'] from test_t where k = 0 and cast(v['a'] as int) = 11245;""" + + //TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus + String[][] tablets = sql """ show tablets from ${tableName}; """ + + // trigger compactions for all tablets in ${tableName} + for (String[] tablet in tablets) { + String tablet_id = tablet[0] + backend_id = tablet[2] + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + if (compactJson.status.toLowerCase() == "fail") { + assertEquals(disableAutoCompaction, false) + logger.info("Compaction was done automatically!") + } + if (disableAutoCompaction) { + assertEquals("success", compactJson.status.toLowerCase()) + } + } + + // wait for all compactions done + for (String[] tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet[0] + backend_id = tablet[2] + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + int rowCount = 0 + for (String[] tablet in tablets) { + String tablet_id = tablet[0] + def compactionStatusUrlIndex = 18 + (code, out, err) = curl("GET", tablet[compactionStatusUrlIndex]) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List<String>) tabletJson.rowsets) { + rowCount += Integer.parseInt(rowset.split(" ")[1]) + } + } + assert (rowCount <= 8) + // fix cast to string tobe {} + qt_select_b_3 """ SELECT count(cast(v['b'] as string)) FROM ${tableName};""" + qt_select_b_4 """ SELECT count(cast(v['b'] as int)) FROM ${tableName};""" + + qt_select_1 """select v['b'] from test_t where k = 0 and cast(v['a'] as int) = 11245;""" +} diff --git a/regression-test/suites/variant_p0/variant_with_rowstore.groovy b/regression-test/suites/variant_p0/variant_with_rowstore.groovy index 9b9b9ebe5b9..58c245ee831 100644 --- a/regression-test/suites/variant_p0/variant_with_rowstore.groovy +++ b/regression-test/suites/variant_p0/variant_with_rowstore.groovy @@ -27,7 +27,7 @@ suite("regression_test_variant_rowstore", "variant_type"){ logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) } - def table_name = "var_rs" + def table_name = "var_rowstore" sql "DROP TABLE IF EXISTS ${table_name}" set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") @@ -41,9 +41,11 @@ suite("regression_test_variant_rowstore", "variant_type"){ properties("replication_num" = "1", "disable_auto_compaction" = "false", "store_row_column" = "true"); """ sql "set experimental_enable_nereids_planner = false" + sql "sync" sql """insert into ${table_name} values (-3, '{"a" : 1, "b" : 1.5, "c" : [1, 2, 3]}')""" sql """insert into ${table_name} select -2, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : "null", "e" : 7.111}}' as json_str union all select -1, '{"a": 1123}' as json_str union all select *, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + sql "sync" qt_sql "select * from ${table_name} order by k limit 10" @@ -59,6 +61,6 @@ suite("regression_test_variant_rowstore", "variant_type"){ DISTRIBUTED BY HASH(k) BUCKETS 1 properties("replication_num" = "1", "disable_auto_compaction" = "false", "store_row_column" = "true"); """ - sql """insert into ${table_name} select k, cast(v as string), cast(v as string) from var_rs""" + sql """insert into ${table_name} select k, cast(v as string), cast(v as string) from var_rowstore""" qt_sql "select * from ${table_name} order by k limit 10" } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
