This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push:
new c807382b3d8 [improve](variant) Add max count variant sparse (#47541)
c807382b3d8 is described below
commit c807382b3d891381f92fa5cdaa76ea0e9b9d68f2
Author: Sun Chenyang <[email protected]>
AuthorDate: Mon Feb 17 11:08:50 2025 +0800
[improve](variant) Add max count variant sparse (#47541)
---
be/src/common/config.cpp | 1 -
be/src/common/config.h | 2 -
be/src/olap/compaction.cpp | 6 +-
be/src/olap/rowset/segment_v2/column_reader.cpp | 16 +-
.../rowset/segment_v2/hierarchical_data_reader.cpp | 12 +-
.../rowset/segment_v2/hierarchical_data_reader.h | 5 +-
be/src/olap/rowset/segment_v2/segment.cpp | 39 ++-
be/src/olap/rowset/segment_v2/segment.h | 17 +-
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 8 +-
be/src/olap/rowset/segment_v2/segment_iterator.h | 9 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 1 +
.../segment_v2/variant_column_writer_impl.cpp | 15 +-
.../rowset/segment_v2/variant_column_writer_impl.h | 1 +
.../rowset/segment_v2/vertical_segment_writer.cpp | 1 +
be/src/olap/tablet_meta.cpp | 3 +
be/src/olap/tablet_schema.cpp | 11 +-
be/src/olap/tablet_schema.h | 10 +-
be/src/runtime/types.cpp | 11 +-
be/src/runtime/types.h | 13 +-
be/src/vec/columns/column_object.cpp | 56 ++---
be/src/vec/columns/column_object.h | 15 +-
be/src/vec/common/schema_util.cpp | 15 +-
be/src/vec/core/block.h | 4 +-
be/src/vec/data_types/data_type_factory.cpp | 13 +-
be/src/vec/data_types/data_type_object.cpp | 25 +-
be/src/vec/data_types/data_type_object.h | 13 +-
be/src/vec/data_types/get_least_supertype.cpp | 4 -
.../data_types/serde/data_type_object_serde.cpp | 22 +-
be/src/vec/exec/scan/new_olap_scanner.cpp | 3 +-
.../vec/functions/array/function_array_utils.cpp | 4 +-
be/src/vec/functions/function_cast.h | 9 +-
be/src/vec/functions/function_variant_element.cpp | 98 +++++++-
be/test/vec/columns/column_object_test.cpp | 12 +-
.../java/org/apache/doris/catalog/ScalarType.java | 36 ++-
.../apache/doris/alter/SchemaChangeHandler.java | 7 +
.../java/org/apache/doris/analysis/CastExpr.java | 3 +
.../apache/doris/analysis/FunctionCallExpr.java | 3 +
.../org/apache/doris/analysis/MVColumnItem.java | 6 +
.../main/java/org/apache/doris/catalog/Column.java | 7 +
.../main/java/org/apache/doris/catalog/Env.java | 6 +
.../java/org/apache/doris/catalog/OlapTable.java | 16 ++
.../org/apache/doris/catalog/TableProperty.java | 10 +
.../apache/doris/common/util/PropertyAnalyzer.java | 22 ++
.../apache/doris/datasource/InternalCatalog.java | 15 ++
.../rules/rewrite/VariantSubPathPruning.java | 2 +-
.../expressions/functions/ComputeSignature.java | 1 +
.../functions/ComputeSignatureHelper.java | 29 +++
.../functions/generator/ExplodeVariantArray.java | 2 +-
.../expressions/functions/scalar/ElementAt.java | 4 +-
.../org/apache/doris/nereids/types/DataType.java | 5 +-
.../apache/doris/nereids/types/VariantType.java | 20 +-
.../java/org/apache/doris/qe/SessionVariable.java | 23 ++
.../doris/statistics/util/StatisticsUtil.java | 3 +-
.../dialect/trino/TrinoLogicalPlanBuilder.java | 2 +
gensrc/proto/data.proto | 1 +
gensrc/proto/olap_file.proto | 1 +
gensrc/proto/segment_v2.proto | 1 +
gensrc/proto/types.proto | 3 +
gensrc/thrift/Types.thrift | 6 +-
regression-test/data/variant_p0/delete_update.out | Bin 921 -> 921 bytes
.../data/variant_p0/test_sub_path_pruning.out | Bin 5835 -> 5871 bytes
regression-test/data/variant_p0/update/load.out | Bin 0 -> 251 bytes
regression-test/data/variant_p0/update/query.out | Bin 0 -> 3183 bytes
.../compaction/test_compaction_extract_root.out | Bin 243 -> 266 bytes
.../suites/variant_p0/delete_update.groovy | 12 +-
.../suites/variant_p0/element_function.groovy | 3 +-
regression-test/suites/variant_p0/load.groovy | 1 +
.../suites/variant_p0/select_partition.groovy | 1 +
.../suites/variant_p0/test_sub_path_pruning.groovy | 12 +-
.../suites/variant_p0/update/load.groovy | 123 ++++++++++
.../suites/variant_p0/update/query.groovy | 262 +++++++++++++++++++++
.../compaction/compaction_sparse_column.groovy | 8 +-
regression-test/suites/variant_p2/load.groovy | 2 +-
73 files changed, 927 insertions(+), 205 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 2f17c4f62dd..8ea9c48df27 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1022,7 +1022,6 @@ DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms,
"10000");
// Whether use schema dict in backend side instead of MetaService side(cloud
mode)
DEFINE_mBool(variant_use_cloud_schema_dict, "true");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
-DEFINE_mInt32(variant_max_subcolumns_count, "5");
// block file cache
DEFINE_Bool(enable_file_cache, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 3f5860332d1..cc3e61a72c4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1223,8 +1223,6 @@ DECLARE_mBool(variant_use_cloud_schema_dict);
// Treat invalid json format str as string, instead of throwing exception if
false
DECLARE_mBool(variant_throw_exeception_on_invalid_json);
-DECLARE_mInt32(variant_max_subcolumns_count);
-
DECLARE_mBool(enable_merge_on_write_correctness_check);
// USED FOR DEBUGING
// core directly if the compaction found there's duplicate key on mow table
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 571ec7f9525..7a9c079ff9d 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -343,7 +343,8 @@ void CompactionMixin::build_basic_info() {
std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
std::transform(_input_rowsets.begin(), _input_rowsets.end(),
rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return
rowset->rowset_meta(); });
- _cur_tablet_schema =
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
+ _cur_tablet_schema =
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas)
+ ->copy_without_variant_extracted_columns();
}
bool CompactionMixin::handle_ordered_data_compaction() {
@@ -1357,7 +1358,8 @@ void CloudCompactionMixin::build_basic_info() {
std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
std::transform(_input_rowsets.begin(), _input_rowsets.end(),
rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return
rowset->rowset_meta(); });
- _cur_tablet_schema =
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
+ _cur_tablet_schema =
_tablet->tablet_schema_with_merged_max_schema_version(rowset_metas)
+ ->copy_without_variant_extracted_columns();
}
int64_t CloudCompactionMixin::get_compaction_permits() {
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index fbf208789e7..48f7c36f163 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -331,7 +331,21 @@ Status VariantColumnReader::init(const
ColumnReaderOptions& opts, const SegmentF
_subcolumn_readers = std::make_unique<SubcolumnColumnReaders>();
const ColumnMetaPB& self_column_pb = footer.columns(column_id);
for (const ColumnMetaPB& column_pb : footer.columns()) {
- if (column_pb.unique_id() != self_column_pb.unique_id()) {
+ // Find all columns belonging to the current variant column
+ // 1. not the variant column
+ if (!column_pb.has_column_path_info()) {
+ continue;
+ }
+
+ // 2. other variant root columns
+ if (column_pb.type() == (int)FieldType::OLAP_FIELD_TYPE_VARIANT &&
+ column_pb.unique_id() != self_column_pb.unique_id()) {
+ continue;
+ }
+
+ // 3. other variant's subcolumns
+ if (column_pb.type() != (int)FieldType::OLAP_FIELD_TYPE_VARIANT &&
+ column_pb.column_path_info().parrent_column_unique_id() !=
self_column_pb.unique_id()) {
continue;
}
DCHECK(column_pb.has_column_path_info());
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
index 09f1f78ebdc..6d03a50010b 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
@@ -194,7 +194,8 @@ Status HierarchicalDataReader::_process_nested_columns(
// into a new object column and wrap it with array column using the first
element offsets.The wrapped array column
// will type the type of ColumnObject::NESTED_TYPE, whih is
Nullable<ColumnArray<NULLABLE(ColumnObject)>>.
for (const auto& entry : nested_subcolumns) {
- MutableColumnPtr nested_object = ColumnObject::create(true);
+ MutableColumnPtr nested_object =
+ ColumnObject::create(container_variant.max_subcolumns_count());
const auto* base_array =
check_and_get_column<ColumnArray>(*remove_nullable(entry.second[0].column));
MutableColumnPtr offset =
base_array->get_offsets_ptr()->assume_mutable();
@@ -238,13 +239,13 @@ Status HierarchicalDataReader::_process_nested_columns(
parent_path.unset_nested();
DCHECK(!parent_path.has_nested_part());
container_variant.add_sub_column(parent_path, array->assume_mutable(),
- ColumnObject::NESTED_TYPE);
+ container_variant.NESTED_TYPE);
}
return Status::OK();
}
Status HierarchicalDataReader::_init_container(vectorized::MutableColumnPtr&
container,
- size_t nrows) {
+ size_t nrows, int32_t
max_subcolumns_count) {
using namespace vectorized;
// build variant as container
@@ -262,12 +263,13 @@ Status
HierarchicalDataReader::_init_container(vectorized::MutableColumnPtr& con
MutableColumnPtr column = _root_reader->column->get_ptr();
// container_variant.add_sub_column({}, std::move(column),
_root_reader->type);
DCHECK(column->size() == nrows);
- container = ColumnObject::create(_root_reader->type,
std::move(column));
+ container =
+ ColumnObject::create(max_subcolumns_count, _root_reader->type,
std::move(column));
} else {
auto root_type =
vectorized::DataTypeFactory::instance().create_data_type(TypeIndex::Nothing,
false);
auto column = vectorized::ColumnNothing::create(nrows);
- container = ColumnObject::create(root_type, std::move(column));
+ container = ColumnObject::create(max_subcolumns_count, root_type,
std::move(column));
}
auto& container_variant = assert_cast<ColumnObject&>(*container);
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index c50ac26169e..af9a584fbc1 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -109,7 +109,8 @@ private:
// 3. init container with subcolumns
// 4. init container with nested subcolumns
// 5. init container with sparse column
- Status _init_container(vectorized::MutableColumnPtr& container, size_t
nrows);
+ Status _init_container(vectorized::MutableColumnPtr& container, size_t
nrows,
+ int max_subcolumns_count);
// clear all subcolumns's column data for next batch read
// set null map for nullable column
@@ -147,7 +148,7 @@ private:
}
MutableColumnPtr container;
- RETURN_IF_ERROR(_init_container(container, nrows));
+ RETURN_IF_ERROR(_init_container(container, nrows,
variant.max_subcolumns_count()));
auto& container_variant = assert_cast<ColumnObject&>(*container);
variant.insert_range_from(container_variant, 0, nrows);
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index b1d86c2f89f..e05cb4ebd08 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -571,14 +571,13 @@ Status Segment::healthy_status() {
// Return the storage datatype of related column to field.
// Return nullptr meaning no such storage infomation for this column
-vectorized::DataTypePtr Segment::get_data_type_of(const ColumnIdentifier&
identifier,
+vectorized::DataTypePtr Segment::get_data_type_of(const TabletColumn& column,
bool read_flat_leaves) const
{
// Path has higher priority
- auto relative_path = identifier.path != nullptr ?
identifier.path->copy_pop_front()
- : vectorized::PathInData();
+ auto path = column.path_info_ptr();
+ auto relative_path = path != nullptr ? path->copy_pop_front() :
vectorized::PathInData();
if (!relative_path.empty()) {
- int32_t unique_id =
- identifier.unique_id > 0 ? identifier.unique_id :
identifier.parent_unique_id;
+ int32_t unique_id = column.unique_id() > 0 ? column.unique_id() :
column.parent_unique_id();
const auto* node = _column_readers.contains(unique_id)
?
((VariantColumnReader*)(_column_readers.at(unique_id).get()))
->get_reader_by_path(relative_path)
@@ -593,9 +592,11 @@ vectorized::DataTypePtr Segment::get_data_type_of(const
ColumnIdentifier& identi
return nullptr;
}
// it contains children or column missing in storage, so treat it as
variant
- return identifier.is_nullable
- ?
vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>())
- : std::make_shared<vectorized::DataTypeObject>();
+ return column.is_nullable()
+ ?
vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>(
+ column.variant_max_subcolumns_count()))
+ : std::make_shared<vectorized::DataTypeObject>(
+ column.variant_max_subcolumns_count());
}
// TODO support normal column type
return nullptr;
@@ -1041,12 +1042,7 @@ Status Segment::read_key_by_rowid(uint32_t row_id,
std::string* key) {
bool Segment::same_with_storage_type(int32_t cid, const Schema& schema,
bool read_flat_leaves) const {
const auto* col = schema.column(cid);
- auto file_column_type =
- get_data_type_of(ColumnIdentifier {.unique_id = col->unique_id(),
- .parent_unique_id =
col->parent_unique_id(),
- .path = col->path(),
- .is_nullable =
col->is_nullable()},
- read_flat_leaves);
+ auto file_column_type = get_data_type_of(col->get_desc(),
read_flat_leaves);
auto expected_type = Schema::get_data_type_ptr(*col);
#ifndef NDEBUG
if (file_column_type && !file_column_type->equals(*expected_type)) {
@@ -1076,18 +1072,13 @@ Status Segment::seek_and_read_by_rowid(const
TabletSchema& schema, SlotDescripto
};
std::vector<segment_v2::rowid_t> single_row_loc {row_id};
if (!slot->column_paths().empty()) {
- vectorized::PathInDataPtr path =
std::make_shared<vectorized::PathInData>(
- schema.column_by_uid(slot->col_unique_id()).name_lower_case(),
- slot->column_paths());
- auto storage_type = get_data_type_of(ColumnIdentifier {.unique_id =
slot->col_unique_id(),
- .path = path,
- .is_nullable =
slot->is_nullable()},
- false);
- vectorized::MutableColumnPtr file_storage_column =
storage_type->create_column();
- DCHECK(storage_type != nullptr);
TabletColumn column = TabletColumn::create_materialized_variant_column(
schema.column_by_uid(slot->col_unique_id()).name_lower_case(),
slot->column_paths(),
- slot->col_unique_id());
+ slot->col_unique_id(), slot->type().max_subcolumns_count());
+ auto storage_type = get_data_type_of(column, false);
+ vectorized::MutableColumnPtr file_storage_column =
storage_type->create_column();
+ DCHECK(storage_type != nullptr);
+
if (iterator_hint == nullptr) {
RETURN_IF_ERROR(new_column_iterator(column, &iterator_hint,
&storage_read_opt));
RETURN_IF_ERROR(iterator_hint->init(opt));
diff --git a/be/src/olap/rowset/segment_v2/segment.h
b/be/src/olap/rowset/segment_v2/segment.h
index f5cb6c3c450..5d47b1540b1 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -164,19 +164,12 @@ public:
// another method `get_metadata_size` not include the column reader, only
the segment object itself.
int64_t meta_mem_usage() const { return _meta_mem_usage; }
- // Identify the column by unique id or path info
- struct ColumnIdentifier {
- int32_t unique_id = -1;
- int32_t parent_unique_id = -1;
- vectorized::PathInDataPtr path;
- bool is_nullable = false;
- };
// Get the inner file column's data type
// ignore_chidren set to false will treat field as variant
// when it contains children with field paths.
// nullptr will returned if storage type does not contains such column
- std::shared_ptr<const vectorized::IDataType> get_data_type_of(
- const ColumnIdentifier& identifier, bool read_flat_leaves) const;
+ std::shared_ptr<const vectorized::IDataType> get_data_type_of(const
TabletColumn& column,
+ bool
read_flat_leaves) const;
// Check is schema read type equals storage column type
bool same_with_storage_type(int32_t cid, const Schema& schema, bool
read_flat_leaves) const;
@@ -186,11 +179,7 @@ public:
ReaderType read_type) const {
const Field* col = schema.column(cid);
vectorized::DataTypePtr storage_column_type =
- get_data_type_of(ColumnIdentifier {.unique_id =
col->unique_id(),
- .parent_unique_id =
col->parent_unique_id(),
- .path = col->path(),
- .is_nullable =
col->is_nullable()},
- read_type != ReaderType::READER_QUERY);
+ get_data_type_of(col->get_desc(), read_type !=
ReaderType::READER_QUERY);
if (storage_column_type == nullptr) {
// Default column iterator
return true;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 366c6d3ce21..8ef6f06f896 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -316,13 +316,7 @@ Status SegmentIterator::_init_impl(const
StorageReadOptions& opts) {
const Field* col = _schema->column(i);
if (col) {
auto storage_type = _segment->get_data_type_of(
- Segment::ColumnIdentifier {
- col->unique_id(),
- col->parent_unique_id(),
- col->path(),
- col->is_nullable(),
- },
- _opts.io_ctx.reader_type != ReaderType::READER_QUERY);
+ col->get_desc(), _opts.io_ctx.reader_type !=
ReaderType::READER_QUERY);
if (storage_type == nullptr) {
storage_type =
vectorized::DataTypeFactory::instance().create_data_type(*col);
}
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 5b4c8f6d73d..05cdaa29710 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -263,13 +263,8 @@ private:
if (block_cid >= block->columns()) {
continue;
}
- vectorized::DataTypePtr storage_type = _segment->get_data_type_of(
- Segment::ColumnIdentifier {
- .unique_id = _schema->column(cid)->unique_id(),
- .parent_unique_id =
_schema->column(cid)->parent_unique_id(),
- .path = _schema->column(cid)->path(),
- .is_nullable =
_schema->column(cid)->is_nullable()},
- false);
+ vectorized::DataTypePtr storage_type =
+
_segment->get_data_type_of(_schema->column(cid)->get_desc(), false);
if (storage_type &&
!storage_type->equals(*block->get_by_position(block_cid).type)) {
// Do additional cast
vectorized::MutableColumnPtr tmp =
storage_type->create_column();
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index fb72a33e075..1a3f03dee30 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -166,6 +166,7 @@ void SegmentWriter::init_column_meta(ColumnMetaPB* meta,
uint32_t column_id,
meta->set_result_is_nullable(column.get_result_is_nullable());
meta->set_function_name(column.get_aggregation_name());
meta->set_be_exec_version(column.get_be_exec_version());
+
meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
}
Status SegmentWriter::init() {
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
index 442595eb0c0..bb8558ee3d4 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
@@ -47,8 +47,9 @@ Status VariantColumnWriterImpl::init() {
// caculate stats info
std::set<std::string> subcolumn_paths;
RETURN_IF_ERROR(_get_subcolumn_paths_from_stats(subcolumn_paths));
-
- auto col = vectorized::ColumnObject::create(true);
+ DCHECK(_tablet_column->variant_max_subcolumns_count() >= 0)
+ << "max subcolumns count is: " <<
_tablet_column->variant_max_subcolumns_count();
+ auto col =
vectorized::ColumnObject::create(_tablet_column->variant_max_subcolumns_count());
for (const auto& str_path : subcolumn_paths) {
DCHECK(col->add_sub_column(vectorized::PathInData(str_path), 0));
}
@@ -100,7 +101,11 @@ Status
VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::st
}
// Check if the number of all subcolumn paths exceeds the limit.
- if (path_to_total_number_of_non_null_values.size() >
config::variant_max_subcolumns_count) {
+ DCHECK(_tablet_column->variant_max_subcolumns_count() >= 0)
+ << "max subcolumns count is: " <<
_tablet_column->variant_max_subcolumns_count();
+ if (_tablet_column->variant_max_subcolumns_count() &&
+ path_to_total_number_of_non_null_values.size() >
+ _tablet_column->variant_max_subcolumns_count()) {
// Sort paths by total number of non null values.
std::vector<std::pair<size_t, std::string_view>> paths_with_sizes;
paths_with_sizes.reserve(path_to_total_number_of_non_null_values.size());
@@ -111,7 +116,7 @@ Status
VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::st
// Fill subcolumn_paths with first subcolumn paths in sorted list.
// reserve 1 for root column
for (const auto& [size, path] : paths_with_sizes) {
- if (paths.size() < config::variant_max_subcolumns_count) {
+ if (paths.size() < _tablet_column->variant_max_subcolumns_count())
{
VLOG_DEBUG << "pick " << path << " as subcolumn";
paths.emplace(path);
}
@@ -204,7 +209,7 @@ Status
VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt
auto full_path =
full_path_builder.append(_tablet_column->name_lower_case(), false)
.append(entry->path.get_parts(), false)
.build();
- // set unique_id and parent_unique_id, will use unique_id to get
iterator correct
+ // set unique_id and parent_unique_id, will use parent_unique_id to
get iterator correct
return vectorized::schema_util::get_column_by_type(
final_data_type_from_object, column_name,
vectorized::schema_util::ExtraInfo {.unique_id =
_tablet_column->unique_id(),
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
index d9298d42db7..1f9d5d3191a 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
@@ -65,6 +65,7 @@ public:
Status append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows);
private:
+ // not including root column
void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const
TabletColumn& column);
// subcolumn path from variant stats info to distinguish from sparse column
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 7930e154cdb..142520fe91b 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -168,6 +168,7 @@ void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB*
meta, uint32_t colum
for (uint32_t i = 0; i < column.num_sparse_columns(); i++) {
_init_column_meta(meta->add_sparse_columns(), -1,
column.sparse_column_at(i));
}
+
meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
}
Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const
TabletColumn& column,
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 43b0d5d8bd0..2b4a49b0b7a 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -444,6 +444,9 @@ void TabletMeta::init_column_from_tcolumn(uint32_t
unique_id, const TColumn& tco
init_column_from_tcolumn(tcolumn.children_column[i].col_unique_id,
tcolumn.children_column[i], children_column);
}
+ if (tcolumn.column_type.__isset.variant_max_subcolumns_count) {
+
column->set_variant_max_subcolumns_count(tcolumn.column_type.variant_max_subcolumns_count);
+ }
}
Status TabletMeta::create_from_file(const string& file_path) {
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 7b6b5f313c1..4508e1c4145 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -579,11 +579,14 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
_sparse_cols.emplace_back(std::make_shared<TabletColumn>(std::move(column)));
_num_sparse_columns++;
}
+ if (column.has_variant_max_subcolumns_count()) {
+ _variant_max_subcolumns_count = column.variant_max_subcolumns_count();
+ }
}
-TabletColumn TabletColumn::create_materialized_variant_column(const
std::string& root,
- const
std::vector<std::string>& paths,
- int32_t
parent_unique_id) {
+TabletColumn TabletColumn::create_materialized_variant_column(
+ const std::string& root, const std::vector<std::string>& paths,
int32_t parent_unique_id,
+ int32_t variant_max_subcolumns_count) {
TabletColumn subcol;
subcol.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
subcol.set_is_nullable(true);
@@ -592,6 +595,7 @@ TabletColumn
TabletColumn::create_materialized_variant_column(const std::string&
vectorized::PathInData path(root, paths);
subcol.set_path_info(path);
subcol.set_name(path.get_path());
+ subcol.set_variant_max_subcolumns_count(variant_max_subcolumns_count);
return subcol;
}
@@ -656,6 +660,7 @@ void TabletColumn::to_schema_pb(ColumnPB* column) const {
ColumnPB* sparse_column = column->add_sparse_columns();
col->to_schema_pb(sparse_column);
}
+ column->set_variant_max_subcolumns_count(_variant_max_subcolumns_count);
}
void TabletColumn::add_sub_column(TabletColumn& sub_column) {
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 3dfe055fbf4..9ff39af14ca 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -107,7 +107,8 @@ public:
// add them into tablet_schema for later column indexing.
static TabletColumn create_materialized_variant_column(const std::string&
root,
const
std::vector<std::string>& paths,
- int32_t
parent_unique_id);
+ int32_t
parent_unique_id,
+ int32_t
max_subcolumns_count);
bool has_default_value() const { return _has_default_value; }
std::string default_value() const { return _default_value; }
size_t length() const { return _length; }
@@ -199,6 +200,11 @@ public:
return Status::OK();
}
+ void set_variant_max_subcolumns_count(int32_t
variant_max_subcolumns_count) {
+ _variant_max_subcolumns_count = variant_max_subcolumns_count;
+ }
+ int32_t variant_max_subcolumns_count() const { return
_variant_max_subcolumns_count; }
+
private:
int32_t _unique_id = -1;
std::string _col_name;
@@ -247,6 +253,7 @@ private:
// Use shared_ptr for reuse and reducing column memory usage
std::vector<TabletColumnPtr> _sparse_cols;
size_t _num_sparse_columns = 0;
+ int32_t _variant_max_subcolumns_count = 0;
};
bool operator==(const TabletColumn& a, const TabletColumn& b);
@@ -582,6 +589,7 @@ private:
// ATTN: For compability reason empty cids means all columns of tablet
schema are encoded to row column
std::vector<int32_t> _row_store_column_unique_ids;
bool _variant_enable_flatten_nested = false;
+
int64_t _vl_field_mem_size {0}; // variable length field
};
diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp
index 7b7154fb38a..7fecdc0a77d 100644
--- a/be/src/runtime/types.cpp
+++ b/be/src/runtime/types.cpp
@@ -63,6 +63,10 @@ TypeDescriptor::TypeDescriptor(const std::vector<TTypeNode>&
types, int* idx)
} else {
len = OLAP_STRING_MAX_LENGTH;
}
+ } else if (type == TYPE_VARIANT) {
+ DCHECK(scalar_type.variant_max_subcolumns_count >= 0)
+ << "count is: " <<
scalar_type.variant_max_subcolumns_count;
+ variant_max_subcolumns_count =
scalar_type.variant_max_subcolumns_count;
}
break;
}
@@ -162,6 +166,8 @@ void TypeDescriptor::to_thrift(TTypeDesc* thrift_type)
const {
DCHECK_NE(scale, -1);
scalar_type.__set_precision(precision);
scalar_type.__set_scale(scale);
+ } else if (type == TYPE_VARIANT) {
+
scalar_type.__set_variant_max_subcolumns_count(variant_max_subcolumns_count);
}
}
}
@@ -206,6 +212,7 @@ void TypeDescriptor::to_protobuf(PTypeDesc* ptype) const {
}
} else if (type == TYPE_VARIANT) {
node->set_type(TTypeNodeType::VARIANT);
+ node->set_variant_max_subcolumns_count(variant_max_subcolumns_count);
}
}
@@ -276,6 +283,7 @@ TypeDescriptor::TypeDescriptor(const
google::protobuf::RepeatedPtrField<PTypeNod
}
case TTypeNodeType::VARIANT: {
type = TYPE_VARIANT;
+ variant_max_subcolumns_count = node.variant_max_subcolumns_count();
break;
}
default:
@@ -337,7 +345,8 @@ std::string TypeDescriptor::debug_string() const {
return ss.str();
}
case TYPE_VARIANT:
- ss << "VARIANT";
+ ss << "VARIANT"
+ << ", max subcolumns count: " << variant_max_subcolumns_count;
return ss.str();
default:
return type_to_string(type);
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 5d2f83c1bd6..16911e7b95a 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -65,16 +65,22 @@ struct TypeDescriptor {
// Whether subtypes of a complex type is nullable
std::vector<bool> contains_nulls;
+ // Only set if type == TYPE_VARIANT
+ int variant_max_subcolumns_count = 0;
+
TypeDescriptor() : type(INVALID_TYPE), len(-1), precision(-1), scale(-1) {}
// explicit TypeDescriptor(PrimitiveType type) :
- TypeDescriptor(PrimitiveType type) : type(type), len(-1), precision(-1),
scale(-1) {
+ TypeDescriptor(PrimitiveType type, int variant_max_subcolumns_count_ = -1)
+ : type(type), len(-1), precision(-1), scale(-1) {
if (type == TYPE_DECIMALV2) {
precision = 27;
scale = 9;
} else if (type == TYPE_DATETIMEV2) {
precision = 18;
scale = 6;
+ } else if (type == TYPE_VARIANT) {
+ variant_max_subcolumns_count = variant_max_subcolumns_count_;
}
}
@@ -181,6 +187,9 @@ struct TypeDescriptor {
if (type == TYPE_DECIMALV2) {
return precision == o.precision && scale == o.scale;
}
+ if (type == TYPE_VARIANT) {
+ return variant_max_subcolumns_count ==
o.variant_max_subcolumns_count;
+ }
return true;
}
@@ -261,6 +270,8 @@ struct TypeDescriptor {
// use to struct type add sub type
void add_sub_type(TypeDescriptor sub_type, std::string field_name, bool
is_nullable = true);
+ int32_t max_subcolumns_count() const { return
variant_max_subcolumns_count; }
+
private:
/// Used to create a possibly nested type from the flattened Thrift
representation.
///
diff --git a/be/src/vec/columns/column_object.cpp
b/be/src/vec/columns/column_object.cpp
index 535e4332bbe..972aeed36ae 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -51,8 +51,6 @@
#include "vec/aggregate_functions/helpers.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
-#include "vec/columns/column_map.h"
-#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
@@ -61,19 +59,11 @@
#include "vec/common/field_visitors.h"
#include "vec/common/schema_util.h"
#include "vec/common/string_buffer.hpp"
-#include "vec/common/string_ref.h"
#include "vec/core/column_with_type_and_name.h"
-#include "vec/core/field.h"
-#include "vec/core/types.h"
#include "vec/data_types/convert_field_to_type.h"
-#include "vec/data_types/data_type.h"
-#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_factory.hpp"
-#include "vec/data_types/data_type_jsonb.h"
#include "vec/data_types/data_type_nothing.h"
-#include "vec/data_types/data_type_nullable.h"
-#include "vec/data_types/data_type_object.h"
#include "vec/data_types/get_least_supertype.h"
#include "vec/json/path_in_data.h"
@@ -644,7 +634,7 @@ MutableColumnPtr ColumnObject::apply_for_columns(Func&&
func) const {
return finalized_object.apply_for_columns(std::forward<Func>(func));
}
auto new_root = func(get_root())->assume_mutable();
- auto res = ColumnObject::create(get_root_type(), std::move(new_root));
+ auto res = ColumnObject::create(_max_subcolumns_count, get_root_type(),
std::move(new_root));
for (const auto& subcolumn : subcolumns) {
if (subcolumn->data.is_root) {
continue;
@@ -807,28 +797,39 @@
ColumnObject::Subcolumn::LeastCommonType::LeastCommonType(DataTypePtr type_, boo
: base_type->get_type_id();
}
-ColumnObject::ColumnObject(bool is_nullable_) : is_nullable(is_nullable_),
num_rows(0) {
+ColumnObject::ColumnObject(int32_t max_subcolumns_count)
+ : is_nullable(true), num_rows(0),
_max_subcolumns_count(max_subcolumns_count) {
subcolumns.create_root(Subcolumn(0, is_nullable, true /*root*/));
ENABLE_CHECK_CONSISTENCY(this);
}
-ColumnObject::ColumnObject(DataTypePtr root_type, MutableColumnPtr&&
root_column)
- : is_nullable(true), num_rows(root_column->size()) {
+ColumnObject::ColumnObject(int32_t max_subcolumns_count, DataTypePtr root_type,
+ MutableColumnPtr&& root_column)
+ : is_nullable(true),
+ num_rows(root_column->size()),
+ _max_subcolumns_count(max_subcolumns_count) {
subcolumns.create_root(
Subcolumn(std::move(root_column), root_type, is_nullable, true
/*root*/));
serialized_sparse_column->insert_many_defaults(num_rows);
ENABLE_CHECK_CONSISTENCY(this);
}
-ColumnObject::ColumnObject(Subcolumns&& subcolumns_)
+ColumnObject::ColumnObject(int32_t max_subcolumns_count, Subcolumns&&
subcolumns_)
: is_nullable(true),
subcolumns(std::move(subcolumns_)),
- num_rows(subcolumns.empty() ? 0 :
(*subcolumns.begin())->data.size()) {
+ num_rows(subcolumns.empty() ? 0 :
(*subcolumns.begin())->data.size()),
+ _max_subcolumns_count(max_subcolumns_count) {
+ if (max_subcolumns_count && subcolumns_.size() > max_subcolumns_count + 1)
{
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "unmatched max subcolumns count:, max
subcolumns count: {}, but "
+ "subcolumns count: {}",
+ max_subcolumns_count, subcolumns_.size());
+ }
serialized_sparse_column->insert_many_defaults(num_rows);
- ENABLE_CHECK_CONSISTENCY(this);
}
-ColumnObject::ColumnObject(size_t size) : is_nullable(true), num_rows(0) {
+ColumnObject::ColumnObject(int32_t max_subcolumns_count, size_t size)
+ : is_nullable(true), num_rows(0),
_max_subcolumns_count(max_subcolumns_count) {
subcolumns.create_root(Subcolumn(0, is_nullable, true /*root*/));
insert_many_defaults(size);
ENABLE_CHECK_CONSISTENCY(this);
@@ -883,7 +884,7 @@ size_t ColumnObject::size() const {
MutableColumnPtr ColumnObject::clone_resized(size_t new_size) const {
if (new_size == 0) {
- return ColumnObject::create(is_nullable);
+ return ColumnObject::create(_max_subcolumns_count);
}
return apply_for_columns(
[&](const ColumnPtr column) { return
column->clone_resized(new_size); });
@@ -1299,10 +1300,11 @@ void ColumnObject::set_num_rows(size_t n) {
}
bool ColumnObject::try_add_new_subcolumn(const PathInData& path) {
+ DCHECK(_max_subcolumns_count >= 0) << "max subcolumns count is: " <<
_max_subcolumns_count;
if (subcolumns.get_root() == nullptr || path.empty()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "column object has no root
or path is empty");
}
- if (subcolumns.size() < config::variant_max_subcolumns_count + 1) {
+ if (!_max_subcolumns_count || subcolumns.size() < _max_subcolumns_count +
1) {
return add_sub_column(path, num_rows);
}
@@ -1954,6 +1956,7 @@ Status ColumnObject::finalize(FinalizeMode mode) {
ENABLE_CHECK_CONSISTENCY(this);
return Status::OK();
}
+ DCHECK(_max_subcolumns_count >= 0) << "max subcolumns count is: " <<
_max_subcolumns_count;
Subcolumns new_subcolumns;
if (auto root = subcolumns.get_mutable_root(); root == nullptr) {
@@ -1967,7 +1970,7 @@ Status ColumnObject::finalize(FinalizeMode mode) {
// 2. root column must be exsit in subcolumns
bool need_pick_subcolumn_to_sparse_column =
mode == FinalizeMode::WRITE_MODE &&
- subcolumns.size() > config::variant_max_subcolumns_count + 1;
+ (_max_subcolumns_count && subcolumns.size() >
_max_subcolumns_count + 1);
// finalize all subcolumns
for (auto&& entry : subcolumns) {
@@ -1978,8 +1981,7 @@ Status ColumnObject::finalize(FinalizeMode mode) {
const auto& least_common_type = entry->data.get_least_common_type();
// unnest all nested columns, add them to new_subcolumns
- if (mode == FinalizeMode::WRITE_MODE &&
- least_common_type->equals(*ColumnObject::NESTED_TYPE)) {
+ if (mode == FinalizeMode::WRITE_MODE &&
least_common_type->equals(*NESTED_TYPE)) {
unnest(entry, new_subcolumns);
continue;
}
@@ -2017,8 +2019,7 @@ Status ColumnObject::finalize(FinalizeMode mode) {
[](const auto& a, const auto& b) { return a.second >
b.second; });
// 3. pick config::variant_max_subcolumns_count selected subcolumns
- for (size_t i = 0;
- i < std::min(size_t(config::variant_max_subcolumns_count),
sorted_by_size.size());
+ for (size_t i = 0; i < std::min(size_t(_max_subcolumns_count),
sorted_by_size.size());
++i) {
// if too many null values, then consider it as sparse column
if (double(sorted_by_size[i].second) < double(num_rows) * 0.95) {
@@ -2088,12 +2089,13 @@ ColumnPtr ColumnObject::filter(const Filter& filter,
ssize_t count) const {
return finalized_object.filter(filter, count);
}
if (subcolumns.empty()) {
- auto res = ColumnObject::create(count_bytes_in_filter(filter));
+ auto res = ColumnObject::create(_max_subcolumns_count,
count_bytes_in_filter(filter));
ENABLE_CHECK_CONSISTENCY(res.get());
return res;
}
auto new_root = get_root()->filter(filter, count)->assume_mutable();
- auto new_column = ColumnObject::create(get_root_type(),
std::move(new_root));
+ auto new_column =
+ ColumnObject::create(_max_subcolumns_count, get_root_type(),
std::move(new_root));
for (auto& entry : subcolumns) {
if (entry->data.is_root) {
continue;
diff --git a/be/src/vec/columns/column_object.h
b/be/src/vec/columns/column_object.h
index 367c303299d..3a7429a4fb0 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -46,9 +46,11 @@
#include "vec/core/field.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_jsonb.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_object.h"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/io/reader_buffer.h"
#include "vec/json/path_in_data.h"
@@ -268,18 +270,21 @@ private:
WrappedPtr serialized_sparse_column = ColumnMap::create(
ColumnString::create(), ColumnString::create(),
ColumnArray::ColumnOffsets::create());
+ int32_t _max_subcolumns_count = 0;
+
public:
static constexpr auto COLUMN_NAME_DUMMY = "_dummy";
// always create root: data type nothing
- explicit ColumnObject(bool is_nullable_);
+ explicit ColumnObject(int32_t max_subcolumns_count);
// always create root: data type nothing
- explicit ColumnObject(size_t size = 0);
+ explicit ColumnObject(int32_t max_subcolumns_count, size_t size);
- explicit ColumnObject(DataTypePtr root_type, MutableColumnPtr&&
root_column);
+ explicit ColumnObject(int32_t max_subcolumns_count, DataTypePtr root_type,
+ MutableColumnPtr&& root_column);
- explicit ColumnObject(Subcolumns&& subcolumns_);
+ explicit ColumnObject(int32_t max_subcolumns_count, Subcolumns&&
subcolumns_);
~ColumnObject() override = default;
@@ -346,6 +351,8 @@ public:
size_t rows() const { return num_rows; }
+ int32_t max_subcolumns_count() const { return _max_subcolumns_count; }
+
/// Adds a subcolumn from existing IColumn.
bool add_sub_column(const PathInData& key, MutableColumnPtr&& subcolumn,
DataTypePtr type);
diff --git a/be/src/vec/common/schema_util.cpp
b/be/src/vec/common/schema_util.cpp
index 440dfcd0b97..3efd1cd628d 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -162,10 +162,16 @@ Status cast_column(const ColumnWithTypeAndName& arg,
const DataTypePtr& type, Co
// nullable to Variant instead of the root of Variant
// correct output: Nullable(Array(int)) ->
Nullable(Variant(Nullable(Array(int))))
// incorrect output: Nullable(Array(int)) -> Nullable(Variant(Array(int)))
- if (WhichDataType(remove_nullable(type)).is_variant_type()) {
- // set variant root column/type to from column/type
- auto variant = ColumnObject::create(true /*always nullable*/);
+ if (auto to_type = remove_nullable(type);
WhichDataType(to_type).is_variant_type()) {
+ if (auto from_type = remove_nullable(arg.type);
+ WhichDataType(from_type).is_variant_type()) {
+ return Status::InternalError("Not support cast: from {} to {}",
arg.type->get_name(),
+ type->get_name());
+ }
CHECK(arg.column->is_nullable());
+ const auto& data_type_object = assert_cast<const
DataTypeObject&>(*to_type);
+ auto variant =
ColumnObject::create(data_type_object.variant_max_subcolumns_count());
+
variant->create_root(arg.type, arg.column->assume_mutable());
ColumnPtr nullable = ColumnNullable::create(
variant->get_ptr(),
@@ -510,7 +516,7 @@ Status _parse_variant_columns(Block& block, const
std::vector<int>& variant_pos,
}
if (scalar_root_column->is_column_string()) {
- variant_column = ColumnObject::create(true);
+ variant_column = ColumnObject::create(var.max_subcolumns_count());
parse_json_to_variant(*variant_column.get(),
assert_cast<const
ColumnString&>(*scalar_root_column), config);
} else {
@@ -613,6 +619,7 @@ TabletColumn create_sparse_column(const TabletColumn&
variant) {
res.set_type(FieldType::OLAP_FIELD_TYPE_MAP);
res.set_aggregation_method(variant.aggregation());
res.set_path_info(PathInData {SPARSE_COLUMN_PATH});
+ res.set_parent_unique_id(variant.unique_id());
TabletColumn child_tcolumn;
child_tcolumn.set_type(FieldType::OLAP_FIELD_TYPE_STRING);
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 2242db3f905..04b62497ae8 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -596,7 +596,9 @@ public:
<< " src type: " <<
block.get_by_position(i).type->get_name();
DCHECK(((DataTypeNullable*)_data_types[i].get())
->get_nested_type()
- ->equals(*block.get_by_position(i).type));
+ ->equals(*block.get_by_position(i).type))
+ << " target type: " << _data_types[i]->get_name()
+ << " src type: " <<
block.get_by_position(i).type->get_name();
DCHECK(!block.get_by_position(i).type->is_nullable());
_columns[i]->insert_range_from(*make_nullable(block.get_by_position(i).column)
->convert_to_full_column_if_const(),
diff --git a/be/src/vec/data_types/data_type_factory.cpp
b/be/src/vec/data_types/data_type_factory.cpp
index 369809d77f6..cb0fb452bfe 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -102,6 +102,8 @@ DataTypePtr DataTypeFactory::create_data_type(const
TabletColumn& col_desc, bool
names.push_back(col_desc.get_sub_column(i).name());
}
nested = std::make_shared<DataTypeStruct>(dataTypes, names);
+ } else if (col_desc.type() == FieldType::OLAP_FIELD_TYPE_VARIANT) {
+ nested =
std::make_shared<DataTypeObject>(col_desc.variant_max_subcolumns_count());
} else {
nested =
_create_primitive_data_type(col_desc.type(),
col_desc.precision(), col_desc.frac());
@@ -163,7 +165,8 @@ DataTypePtr DataTypeFactory::create_data_type(const
TypeDescriptor& col_desc, bo
nested = std::make_shared<vectorized::DataTypeFloat64>();
break;
case TYPE_VARIANT:
- nested = std::make_shared<vectorized::DataTypeObject>("", true);
+ nested =
+
std::make_shared<vectorized::DataTypeObject>(col_desc.variant_max_subcolumns_count);
break;
case TYPE_STRING:
case TYPE_CHAR:
@@ -305,9 +308,6 @@ DataTypePtr DataTypeFactory::create_data_type(const
TypeIndex& type_index, bool
case TypeIndex::String:
nested = std::make_shared<vectorized::DataTypeString>();
break;
- case TypeIndex::VARIANT:
- nested = std::make_shared<vectorized::DataTypeObject>("", true);
- break;
case TypeIndex::Decimal32:
nested =
std::make_shared<DataTypeDecimal<Decimal32>>(BeConsts::MAX_DECIMAL32_PRECISION,
0);
break;
@@ -408,9 +408,6 @@ DataTypePtr
DataTypeFactory::_create_primitive_data_type(const FieldType& type,
case FieldType::OLAP_FIELD_TYPE_STRING:
result = std::make_shared<vectorized::DataTypeString>();
break;
- case FieldType::OLAP_FIELD_TYPE_VARIANT:
- result = std::make_shared<vectorized::DataTypeObject>("", true);
- break;
case FieldType::OLAP_FIELD_TYPE_JSONB:
result = std::make_shared<vectorized::DataTypeJsonb>();
break;
@@ -489,7 +486,7 @@ DataTypePtr DataTypeFactory::create_data_type(const
PColumnMeta& pcolumn) {
nested = std::make_shared<DataTypeString>();
break;
case PGenericType::VARIANT:
- nested = std::make_shared<DataTypeObject>("", true);
+ nested =
std::make_shared<DataTypeObject>(pcolumn.variant_max_subcolumns_count());
break;
case PGenericType::JSONB:
nested = std::make_shared<DataTypeJsonb>();
diff --git a/be/src/vec/data_types/data_type_object.cpp
b/be/src/vec/data_types/data_type_object.cpp
index 7513a42075b..57b39575c6f 100644
--- a/be/src/vec/data_types/data_type_object.cpp
+++ b/be/src/vec/data_types/data_type_object.cpp
@@ -47,10 +47,17 @@ class IColumn;
namespace doris::vectorized {
#include "common/compile_check_begin.h"
-DataTypeObject::DataTypeObject(const String& schema_format_, bool is_nullable_)
- : schema_format(to_lower(schema_format_)), is_nullable(is_nullable_) {}
+DataTypeObject::DataTypeObject(int32_t max_subcolumns_count)
+ : _max_subcolumns_count(max_subcolumns_count) {}
bool DataTypeObject::equals(const IDataType& rhs) const {
- return typeid_cast<const DataTypeObject*>(&rhs) != nullptr;
+ auto rhs_type = typeid_cast<const DataTypeObject*>(&rhs);
+ if (rhs_type && _max_subcolumns_count !=
rhs_type->variant_max_subcolumns_count()) {
+ VLOG_DEBUG << "_max_subcolumns_count is" << _max_subcolumns_count
+ << "rhs_type->variant_max_subcolumns_count()"
+ << rhs_type->variant_max_subcolumns_count();
+ return false;
+ }
+ return rhs_type && _max_subcolumns_count ==
rhs_type->variant_max_subcolumns_count();
}
int64_t DataTypeObject::get_uncompressed_serialized_bytes(const IColumn&
column,
@@ -184,7 +191,6 @@ const char* DataTypeObject::deserialize(const char* buf,
MutableColumnPtr* colum
// serialize num of rows, only take effect when subcolumns empty
if (be_exec_version >= VARIANT_SERDE) {
num_rows = *reinterpret_cast<const uint32_t*>(buf);
- column_object->set_num_rows(num_rows);
buf += sizeof(uint32_t);
}
@@ -198,6 +204,8 @@ const char* DataTypeObject::deserialize(const char* buf,
MutableColumnPtr* colum
column_object->get_subcolumn({})->insert_many_defaults(num_rows);
}
+ column_object->set_num_rows(num_rows);
+
column_object->finalize();
#ifndef NDEBUG
// DCHECK size
@@ -218,4 +226,13 @@ void DataTypeObject::to_string(const IColumn& column,
size_t row_num, BufferWrit
static_cast<void>(variant.serialize_one_row_to_string(cast_set<Int32>(row_num),
ostr));
}
+void DataTypeObject::to_pb_column_meta(PColumnMeta* col_meta) const {
+ IDataType::to_pb_column_meta(col_meta);
+ col_meta->set_variant_max_subcolumns_count(_max_subcolumns_count);
+}
+
+MutableColumnPtr DataTypeObject::create_column() const {
+ return ColumnObject::create(_max_subcolumns_count);
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_object.h
b/be/src/vec/data_types/data_type_object.h
index f3336067419..272f086fc83 100644
--- a/be/src/vec/data_types/data_type_object.h
+++ b/be/src/vec/data_types/data_type_object.h
@@ -34,7 +34,6 @@
#include "runtime/define_primitive_type.h"
#include "runtime/types.h"
#include "serde/data_type_object_serde.h"
-#include "vec/columns/column_object.h"
#include "vec/common/assert_cast.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
@@ -50,21 +49,21 @@ class IColumn;
namespace doris::vectorized {
class DataTypeObject : public IDataType {
private:
- String schema_format;
- bool is_nullable;
+ int32_t _max_subcolumns_count = 0;
public:
- DataTypeObject(const String& schema_format_ = "json", bool is_nullable_ =
true);
+ DataTypeObject() {}
+ DataTypeObject(int32_t max_subcolumns_count);
const char* get_family_name() const override { return "Variant"; }
TypeIndex get_type_id() const override { return TypeIndex::VARIANT; }
TypeDescriptor get_type_as_type_descriptor() const override {
- return TypeDescriptor(TYPE_VARIANT);
+ return TypeDescriptor(TYPE_VARIANT, _max_subcolumns_count);
}
doris::FieldType get_storage_field_type() const override {
return doris::FieldType::OLAP_FIELD_TYPE_VARIANT;
}
- MutableColumnPtr create_column() const override { return
ColumnObject::create(is_nullable); }
+ MutableColumnPtr create_column() const override;
bool equals(const IDataType& rhs) const override;
bool have_subtypes() const override { return true; };
int64_t get_uncompressed_serialized_bytes(const IColumn& column,
@@ -92,5 +91,7 @@ public:
DataTypeSerDeSPtr get_serde(int nesting_level = 1) const override {
return std::make_shared<DataTypeObjectSerDe>(nesting_level);
};
+ void to_pb_column_meta(PColumnMeta* col_meta) const override;
+ int32_t variant_max_subcolumns_count() const { return
_max_subcolumns_count; }
};
} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/get_least_supertype.cpp
b/be/src/vec/data_types/get_least_supertype.cpp
index 82bea452923..a0f27482b5a 100644
--- a/be/src/vec/data_types/get_least_supertype.cpp
+++ b/be/src/vec/data_types/get_least_supertype.cpp
@@ -281,10 +281,6 @@ void get_least_supertype_jsonb(const TypeIndexSet& types,
DataTypePtr* type) {
*type = std::make_shared<DataTypeJsonb>();
return;
}
- if (which.is_variant_type()) {
- *type = std::make_shared<DataTypeObject>();
- return;
- }
if (which.is_date_v2()) {
*type = std::make_shared<DataTypeDateV2>();
return;
diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp
b/be/src/vec/data_types/serde/data_type_object_serde.cpp
index 8e9d540f50f..ecc8cea0d2b 100644
--- a/be/src/vec/data_types/serde/data_type_object_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp
@@ -48,24 +48,12 @@ Status DataTypeObjectSerDe::_write_column_to_mysql(const
IColumn& column,
int64_t row_idx, bool
col_const,
const FormatOptions&
options) const {
const auto& variant = assert_cast<const ColumnObject&>(column);
- if (variant.is_scalar_variant()) {
- // Serialize scalar types, like int, string, array, faster path
- const auto& root = variant.get_subcolumn({});
-
RETURN_IF_ERROR(root->get_least_common_type_serde()->write_column_to_mysql(
- root->get_finalized_column(), row_buffer, row_idx, col_const,
options));
- } else {
- // Serialize hierarchy types to json format
- std::string buffer;
- bool is_null = false;
- if (!variant.serialize_one_row_to_string(row_idx, &buffer)) {
- return Status::InternalError("Invalid json format");
- }
- if (is_null) {
- row_buffer.push_null();
- } else {
- row_buffer.push_string(buffer.data(), buffer.size());
- }
+ // Serialize hierarchy types to json format
+ std::string buffer;
+ if (!variant.serialize_one_row_to_string(row_idx, &buffer)) {
+ return Status::InternalError("Invalid json format");
}
+ row_buffer.push_string(buffer.data(), buffer.size());
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index d3a05cbb3c2..6d6aee4482e 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -430,7 +430,8 @@ Status NewOlapScanner::_init_variant_columns() {
// add them into tablet_schema for later column indexing.
TabletColumn subcol =
TabletColumn::create_materialized_variant_column(
tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(),
- slot->column_paths(), slot->col_unique_id());
+ slot->column_paths(), slot->col_unique_id(),
+ slot->type().max_subcolumns_count());
if (tablet_schema->field_index(*subcol.path_info_ptr()) < 0) {
tablet_schema->append_column(subcol,
TabletSchema::ColumnType::VARIANT);
}
diff --git a/be/src/vec/functions/array/function_array_utils.cpp
b/be/src/vec/functions/array/function_array_utils.cpp
index d25904baf93..21d3911b3b0 100644
--- a/be/src/vec/functions/array/function_array_utils.cpp
+++ b/be/src/vec/functions/array/function_array_utils.cpp
@@ -57,7 +57,9 @@ bool extract_column_array_info(const IColumn& src,
ColumnArrayExecutionData& dat
if (data.output_as_variant &&
!WhichDataType(remove_nullable(data.nested_type)).is_variant_type()) {
// set variant root column/type to from column/type
- auto variant = ColumnObject::create(true /*always nullable*/);
+ const auto& data_type_object =
+ assert_cast<const
DataTypeObject&>(*remove_nullable(data.nested_type));
+ auto variant =
ColumnObject::create(data_type_object.variant_max_subcolumns_count());
variant->create_root(data.nested_type,
make_nullable(data.nested_col)->assume_mutable());
data.nested_col = variant->get_ptr();
}
diff --git a/be/src/vec/functions/function_cast.h
b/be/src/vec/functions/function_cast.h
index 483e837de5d..2622f201b9c 100644
--- a/be/src/vec/functions/function_cast.h
+++ b/be/src/vec/functions/function_cast.h
@@ -1946,12 +1946,13 @@ private:
static Status execute(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, const uint32_t
result,
size_t input_rows_count) {
- // auto& data_type_to = block.get_by_position(result).type;
+ auto& data_type_to = block.get_by_position(result).type;
const auto& col_with_type_and_name =
block.get_by_position(arguments[0]);
auto& from_type = col_with_type_and_name.type;
auto& col_from = col_with_type_and_name.column;
// set variant root column/type to from column/type
- auto variant = ColumnObject::create(true /*always nullable*/);
+ const auto& data_type_object = assert_cast<const
DataTypeObject&>(*data_type_to);
+ auto variant =
ColumnObject::create(data_type_object.variant_max_subcolumns_count());
variant->create_root(from_type, col_from->assume_mutable());
block.replace_by_position(result, std::move(variant));
return Status::OK();
@@ -2265,10 +2266,10 @@ private:
// variant needs to be judged first
if (to_type->get_type_id() == TypeIndex::VARIANT) {
- return create_variant_wrapper(from_type, static_cast<const
DataTypeObject&>(*to_type));
+ return create_variant_wrapper(from_type, assert_cast<const
DataTypeObject&>(*to_type));
}
if (from_type->get_type_id() == TypeIndex::VARIANT) {
- return create_variant_wrapper(static_cast<const
DataTypeObject&>(*from_type), to_type);
+ return create_variant_wrapper(assert_cast<const
DataTypeObject&>(*from_type), to_type);
}
switch (from_type->get_type_id()) {
diff --git a/be/src/vec/functions/function_variant_element.cpp
b/be/src/vec/functions/function_variant_element.cpp
index b32cd870fab..21bff9d6362 100644
--- a/be/src/vec/functions/function_variant_element.cpp
+++ b/be/src/vec/functions/function_variant_element.cpp
@@ -74,7 +74,10 @@ public:
DCHECK(is_string(arguments[1]))
<< "Second argument for function: " << name << " should be
String but it has type "
<< arguments[1]->get_name() << ".";
- return make_nullable(std::make_shared<DataTypeObject>());
+ auto arg_variant = remove_nullable(arguments[0]);
+ const auto& data_type_object = assert_cast<const
DataTypeObject&>(*arg_variant);
+ return make_nullable(
+
std::make_shared<DataTypeObject>(data_type_object.variant_max_subcolumns_count()));
}
// wrap variant column with nullable
@@ -120,11 +123,18 @@ public:
}
private:
+ // Return sub-path by specified prefix.
+ // For example, for prefix a.b:
+ // a.b.c.d -> c.d, a.b.c -> c
+ static std::string_view get_sub_path(const std::string_view& path,
+ const std::string_view& prefix) {
+ return path.substr(prefix.size() + 1);
+ }
static Status get_element_column(const ColumnObject& src, const ColumnPtr&
index_column,
ColumnPtr* result) {
std::string field_name = index_column->get_data_at(0).to_string();
if (src.empty()) {
- *result = ColumnObject::create(true);
+ *result = ColumnObject::create(src.max_subcolumns_count());
// src subcolumns empty but src row count may not be 0
(*result)->assume_mutable()->insert_many_defaults(src.size());
// ColumnObject should be finalized before parsing, finalize maybe
modify original column structure
@@ -152,7 +162,8 @@ private:
result_column->insert_default();
}
}
- *result = ColumnObject::create(type, std::move(result_column));
+ *result = ColumnObject::create(src.max_subcolumns_count(), type,
+ std::move(result_column));
(*result)->assume_mutable()->finalize();
return Status::OK();
} else {
@@ -161,13 +172,69 @@ private:
PathInData path(field_name);
ColumnObject::Subcolumns subcolumns =
mutable_ptr->get_subcolumns();
const auto* node = subcolumns.find_exact(path);
- MutableColumnPtr result_col;
+ MutableColumnPtr result_col =
ColumnObject::create(src.max_subcolumns_count());
+ ColumnObject::Subcolumns new_subcolumns;
+
+ auto extract_from_sparse_column = [&](auto& container) {
+ ColumnObject::Subcolumn root {0, true, true};
+ // no root, no sparse column
+ const auto& sparse_data_map =
+ assert_cast<const
ColumnMap&>(*mutable_ptr->get_sparse_column());
+ const auto& src_sparse_data_offsets =
sparse_data_map.get_offsets();
+ const auto& src_sparse_data_paths =
+ assert_cast<const
ColumnString&>(sparse_data_map.get_keys());
+ const auto& src_sparse_data_values =
+ assert_cast<const
ColumnString&>(sparse_data_map.get_values());
+ auto& sparse_data_offsets =
+
assert_cast<ColumnMap&>(*container->get_sparse_column()->assume_mutable())
+ .get_offsets();
+ auto [sparse_data_paths, sparse_data_values] =
+ container->get_sparse_data_paths_and_values();
+ StringRef prefix_ref(path.get_path());
+ std::string_view path_prefix(prefix_ref.data, prefix_ref.size);
+ for (size_t i = 0; i != src_sparse_data_offsets.size(); ++i) {
+ size_t start = src_sparse_data_offsets[ssize_t(i) - 1];
+ size_t end = src_sparse_data_offsets[ssize_t(i)];
+ size_t lower_bound_index =
+
vectorized::ColumnObject::find_path_lower_bound_in_sparse_data(
+ prefix_ref, src_sparse_data_paths, start,
end);
+ for (; lower_bound_index != end; ++lower_bound_index) {
+ auto path_ref =
src_sparse_data_paths.get_data_at(lower_bound_index);
+ std::string_view path(path_ref.data, path_ref.size);
+ if (!path.starts_with(path_prefix)) {
+ break;
+ }
+ // Don't include path that is equal to the prefix.
+ if (path.size() != path_prefix.size()) {
+ auto sub_path = get_sub_path(path, path_prefix);
+ sparse_data_paths->insert_data(sub_path.data(),
sub_path.size());
+
sparse_data_values->insert_from(src_sparse_data_values,
+ lower_bound_index);
+ } else {
+ // insert into root column, example: access
v['b'] and b is in sparse column
+ // data example:
+ // {"b" : 123}
+ // {"b" : {"c" : 456}}
+ // b maybe in sparse column, and b.c is in
subolumn, put `b` into root column to distinguish
+ // from "" which is empty path and root
+ const auto& data =
ColumnObject::deserialize_from_sparse_column(
+ &src_sparse_data_values,
lower_bound_index);
+ root.insert(data.first, data.second);
+ }
+ }
+ if (root.size() == sparse_data_offsets.size()) {
+ root.insert_default();
+ }
+ sparse_data_offsets.push_back(sparse_data_paths->size());
+ }
+ container->get_subcolumns().create_root(root);
+ container->set_num_rows(mutable_ptr->size());
+ };
+
if (node != nullptr) {
- result_col = ColumnObject::create(true);
std::vector<decltype(node)> nodes;
PathsInData paths;
ColumnObject::Subcolumns::get_leaves_of_node(node, nodes,
paths);
- ColumnObject::Subcolumns new_subcolumns;
for (const auto* n : nodes) {
PathInData new_path = n->path.copy_pop_front();
VLOG_DEBUG << "add node " << new_path.get_path()
@@ -179,19 +246,28 @@ private:
VLOG_DEBUG << "failed to add node " <<
new_path.get_path();
}
}
+
// handle the root node
if (new_subcolumns.empty() && !nodes.empty()) {
CHECK_EQ(nodes.size(), 1);
new_subcolumns.create_root(ColumnObject::Subcolumn {
nodes[0]->data.get_finalized_column_ptr()->assume_mutable(),
nodes[0]->data.get_least_common_type(), true,
true});
+ auto container =
ColumnObject::create(src.max_subcolumns_count(),
+
std::move(new_subcolumns));
+ result_col->insert_range_from(*container, 0,
container->size());
+ } else {
+ auto container =
ColumnObject::create(src.max_subcolumns_count(),
+
std::move(new_subcolumns));
+ container->clear_sparse_column();
+ extract_from_sparse_column(container);
+ result_col->insert_range_from(*container, 0,
container->size());
}
- auto container =
ColumnObject::create(std::move(new_subcolumns));
- result_col->insert_range_from(*container, 0,
container->size());
} else {
- // Create with root, otherwise the root type maybe type
Nothing ?
- result_col = ColumnObject::create(true);
- result_col->insert_many_defaults(src.size());
+ auto container =
+ ColumnObject::create(src.max_subcolumns_count(),
std::move(new_subcolumns));
+ extract_from_sparse_column(container);
+ result_col->insert_range_from(*container, 0,
container->size());
}
*result = result_col->get_ptr();
// ColumnObject should be finalized before parsing, finalize maybe
modify original column structure
diff --git a/be/test/vec/columns/column_object_test.cpp
b/be/test/vec/columns/column_object_test.cpp
index e59219827db..a6b68f6b972 100644
--- a/be/test/vec/columns/column_object_test.cpp
+++ b/be/test/vec/columns/column_object_test.cpp
@@ -45,7 +45,7 @@ doris::vectorized::Field construct_variant_map(
auto construct_basic_varint_column() {
// 1. create an empty variant column
- auto variant = ColumnObject::create();
+ auto variant = ColumnObject::create(5);
std::vector<std::pair<std::string, doris::vectorized::Field>> data;
@@ -85,7 +85,7 @@ auto construct_dst_varint_column() {
vectorized::ColumnObject::Subcolumn {0, true});
dynamic_subcolumns.add(vectorized::PathInData("v.c.d"),
vectorized::ColumnObject::Subcolumn {0, true});
- return ColumnObject::create(std::move(dynamic_subcolumns));
+ return ColumnObject::create(5, std::move(dynamic_subcolumns));
}
TEST(ColumnVariantTest, basic_finalize) {
@@ -331,7 +331,7 @@ doris::vectorized::Field get_jsonb_field(std::string_view
type) {
auto construct_advanced_varint_column() {
// 1. create an empty variant column
- auto variant = ColumnObject::create();
+ auto variant = ColumnObject::create(5);
std::vector<std::pair<std::string, doris::vectorized::Field>> data;
@@ -609,7 +609,7 @@ TEST(ColumnVariantTest, advanced_insert_range_from) {
auto construct_varint_column_only_subcolumns() {
// 1. create an empty variant column
- auto variant = ColumnObject::create();
+ auto variant = ColumnObject::create(5);
std::vector<std::pair<std::string, doris::vectorized::Field>> data;
@@ -631,7 +631,7 @@ auto construct_varint_column_only_subcolumns() {
auto construct_varint_column_more_subcolumns() {
// 1. create an empty variant column
- auto variant = ColumnObject::create();
+ auto variant = ColumnObject::create(5);
std::vector<std::pair<std::string, doris::vectorized::Field>> data;
@@ -657,7 +657,7 @@ TEST(ColumnVariantTest, empty_inset_range_from) {
EXPECT_EQ(src->size(), 6);
// dst is an empty column
- auto dst = ColumnObject::create();
+ auto dst = ColumnObject::create(5);
// subcolumn->subcolumn v.a v.b v.c v.f v.e
dst->insert_range_from(*src, 0, 6);
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java
b/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java
index e9f1b50c0df..54af508fd95 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java
@@ -124,6 +124,9 @@ public class ScalarType extends Type {
@SerializedName(value = "lenStr")
private String lenStr;
+ @SerializedName(value = "variantMaxSubcolumnsCount")
+ private int variantMaxSubcolumnsCount;
+
public ScalarType(PrimitiveType type) {
this.type = type;
}
@@ -727,11 +730,17 @@ public class ScalarType extends Type {
case CHAR:
case HLL:
case STRING:
- case JSONB:
- case VARIANT: {
+ case JSONB: {
scalarType.setLen(getLength());
break;
}
+ case VARIANT: {
+
scalarType.setVariantMaxSubcolumnsCount(variantMaxSubcolumnsCount);
+ if (variantMaxSubcolumnsCount < 0) {
+ throw new IllegalArgumentException(String.format("error
count: %d", variantMaxSubcolumnsCount));
+ }
+ break;
+ }
case DECIMALV2:
case DECIMAL32:
case DECIMAL64:
@@ -913,6 +922,9 @@ public class ScalarType extends Type {
if (isDatetimeV2() && scalarType.isDatetimeV2()) {
return true;
}
+ if (isVariantType() && scalarType.isVariantType()) {
+ return true;
+ }
return false;
}
@@ -943,6 +955,9 @@ public class ScalarType extends Type {
if (type.isDecimalV2Type() || type == PrimitiveType.DATETIMEV2 || type
== PrimitiveType.TIMEV2) {
return precision == other.precision && scale == other.scale;
}
+ if (this.isVariantType() && other.isVariantType()) {
+ return this.getVariantMaxSubcolumnsCount() ==
other.getVariantMaxSubcolumnsCount();
+ }
return true;
}
@@ -1128,6 +1143,14 @@ public class ScalarType extends Type {
return finalType;
}
+ if (t1.isVariantType() && t2.isVariantType()) {
+ if (t1.getVariantMaxSubcolumnsCount() ==
t2.getVariantMaxSubcolumnsCount()) {
+ return t1;
+ } else {
+ return Type.UNSUPPORTED;
+ }
+ }
+
PrimitiveType smallerType =
(t1.type.ordinal() < t2.type.ordinal() ? t1.type : t2.type);
PrimitiveType largerType =
@@ -1213,4 +1236,13 @@ public class ScalarType extends Type {
result = 31 * result + scale;
return result;
}
+
+ public void setVariantMaxSubcolumnsCount(int variantMaxSubcolumnsCount) {
+ this.variantMaxSubcolumnsCount = variantMaxSubcolumnsCount;
+ LOG.info("set max count is: {}", variantMaxSubcolumnsCount);
+ }
+
+ public int getVariantMaxSubcolumnsCount() {
+ return variantMaxSubcolumnsCount;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index eb1cc09f506..9400e5693a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -63,10 +63,12 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaContext;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -1020,6 +1022,11 @@ public class SchemaChangeHandler extends AlterHandler {
lightSchemaChange = false;
}
+ Type type = newColumn.getType();
+ if (type.isVariantType()) {
+ ScalarType scType = (ScalarType) type;
+
scType.setVariantMaxSubcolumnsCount(olapTable.getVariantMaxSubcolumnsCount());
+ }
// check if the new column already exist in base schema.
// do not support adding new column which already exist in base schema.
List<Column> baseSchema = olapTable.getBaseSchema(true);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
index de257991ca6..cc885ca3e8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
@@ -158,6 +158,9 @@ public class CastExpr extends Expr {
if (from.isComplexType() && type.isJsonbType()) {
nullableMode = Function.NullableMode.ALWAYS_NULLABLE;
}
+ if (from.isVariantType() || type.isVariantType()) {
+ nullableMode = Function.NullableMode.DEPEND_ON_ARGUMENT;
+ }
Preconditions.checkState(nullableMode != null,
"cannot find nullable node for cast from " + from + " to "
+ to);
fn = new Function(new FunctionName(getFnName(type)),
Lists.newArrayList(e.type), type,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
index 51e18d750f2..93326c5a582 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
@@ -2032,6 +2032,9 @@ public class FunctionCallExpr extends Expr {
} else if (children.size() == 1) {
this.type = ScalarType.createDatetimeV2Type(6);
}
+ } else if
(fn.getFunctionName().getFunction().equalsIgnoreCase("element_at")
+ &&
getChild(0).type.isVariantType()) {
+ this.type = getChild(0).type;
} else {
this.type = fn.getReturnType();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java
index a9e11458582..41d284cfe54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java
@@ -195,6 +195,12 @@ public class MVColumnItem {
result.setIsAllowNull(defineExpr.isNullable());
}
}
+ if (result.getType().isVariantType()) {
+ ScalarType variantType = (ScalarType) this.getType();
+ if (variantType.getVariantMaxSubcolumnsCount() !=
olapTable.getVariantMaxSubcolumnsCount()) {
+ throw new DdlException("MVColumnItem variantType is error");
+ }
+ }
result.setName(name);
result.setAggregationType(aggregationType, isAggregationTypeImplicit);
result.setDefineExpr(defineExpr);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 3ef5f680e94..15507d5955e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -610,6 +610,10 @@ public class Column implements GsonPostProcessable {
tColumnType.setScale(this.getScale());
tColumnType.setIndexLen(this.getOlapColumnIndexSize());
+ if (this.getType().isVariantType()) {
+ ScalarType variantType = (ScalarType) this.getType();
+
tColumnType.setVariantMaxSubcolumnsCount(variantType.getVariantMaxSubcolumnsCount());
+ }
tColumn.setColumnType(tColumnType);
if (null != this.aggregationType) {
@@ -832,6 +836,9 @@ public class Column implements GsonPostProcessable {
for (Column c : childrenColumns) {
builder.addChildrenColumns(c.toPb(Sets.newHashSet(),
Lists.newArrayList()));
}
+ } else if (this.type.isVariantType()) {
+ ScalarType variantType = (ScalarType) this.getType();
+
builder.setVariantMaxSubcolumnsCount(variantType.getVariantMaxSubcolumnsCount());
}
OlapFile.ColumnPB col = builder.build();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 1c6345613d7..70403423613 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3737,6 +3737,12 @@ public class Env {
sb.append(olapTable.variantEnableFlattenNested()).append("\"");
}
+ // variant max subcolumns count
+ if (olapTable.getVariantMaxSubcolumnsCount() != 0) {
+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT).append("\"
= \"");
+ sb.append(olapTable.getVariantMaxSubcolumnsCount()).append("\"");
+ }
+
// binlog
if (Config.enable_feature_binlog) {
BinlogConfig binlogConfig = olapTable.getBinlogConfig();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 477f7630112..8cb4a6e698b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2527,6 +2527,22 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
return false;
}
+ public void setVariantMaxSubcolumnsCount(int maxSubcoumnsCount) {
+
getOrCreatTableProperty().setVariantMaxSubcolumnsCount(maxSubcoumnsCount);
+ List<Column> columns = getBaseSchema(true);
+ for (Column column : columns) {
+ Type type = column.getType();
+ if (type.isVariantType()) {
+ ScalarType scType = (ScalarType) type;
+ scType.setVariantMaxSubcolumnsCount(maxSubcoumnsCount);
+ }
+ }
+ }
+
+ public int getVariantMaxSubcolumnsCount() {
+ return getOrCreatTableProperty().getVariantMaxSubcolumnsCount();
+ }
+
public int getBaseSchemaVersion() {
MaterializedIndexMeta baseIndexMeta = indexIdToMeta.get(baseIndexId);
return baseIndexMeta.getSchemaVersion();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 9c1e09d7d16..7f3ab8eaae3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -283,6 +283,16 @@ public class TableProperty implements Writable,
GsonPostProcessable {
return variantEnableFlattenNested;
}
+ public void setVariantMaxSubcolumnsCount(int maxSubcoumnsCount) {
+
properties.put(PropertyAnalyzer.PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT,
Integer.toString(maxSubcoumnsCount));
+ }
+
+ public int getVariantMaxSubcolumnsCount() {
+ return Integer.parseInt(properties.getOrDefault(
+ PropertyAnalyzer.PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT,
+
Integer.toString(PropertyAnalyzer.VARIANT_MAX_SUBCOLUMNS_COUNT_DEFAULT_VALUE)));
+ }
+
public TableProperty buildEnableSingleReplicaCompaction() {
enableSingleReplicaCompaction = Boolean.parseBoolean(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION,
"false"));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 536ca5633bf..60fbad81f0f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -240,6 +240,9 @@ public class PropertyAnalyzer {
public static final long
TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE = 5;
public static final long
TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE = 1;
+ public static final String PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT =
"variant_max_subcolumns_count";
+ public static final int VARIANT_MAX_SUBCOLUMNS_COUNT_DEFAULT_VALUE = 0;
+
public enum RewriteType {
PUT, // always put property
REPLACE, // replace if exists property
@@ -1725,4 +1728,23 @@ public class PropertyAnalyzer {
}
return properties;
}
+
+ public static int analyzeVariantMaxSubcolumnsCount(Map<String, String>
properties, int defuatValue)
+
throws AnalysisException {
+ int maxSubcoumnsCount = defuatValue;
+ if (properties != null &&
properties.containsKey(PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT)) {
+ String maxSubcoumnsCountStr =
properties.get(PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT);
+ try {
+ maxSubcoumnsCount = Integer.parseInt(maxSubcoumnsCountStr);
+ if (maxSubcoumnsCount < 0 || maxSubcoumnsCount > 10000) {
+ throw new AnalysisException("varaint max counts count must
between 10 and 10000 ");
+ }
+ } catch (Exception e) {
+ throw new AnalysisException("varaint max counts count format
error");
+ }
+
+ properties.remove(PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT);
+ }
+ return maxSubcoumnsCount;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 7c7ded88f55..f300aabe80a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1640,6 +1640,10 @@ public class InternalCatalog implements
CatalogIf<Database> {
properties.put(PropertyAnalyzer.PROPERTIES_VARIANT_ENABLE_FLATTEN_NESTED,
olapTable.variantEnableFlattenNested().toString());
}
+ if
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT))
{
+
properties.put(PropertyAnalyzer.PROPERTIES_VARIANT_MAX_SUBCOLUMNS_COUNT,
+
Integer.toString(olapTable.getVariantMaxSubcolumnsCount()));
+ }
if
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION))
{
properties.put(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION,
olapTable.enableSingleReplicaCompaction().toString());
@@ -3069,6 +3073,16 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
Preconditions.checkNotNull(versionInfo);
+ int variantMaxSubcolumnsCount = ConnectContext.get() == null ? 0 :
ConnectContext.get()
+
.getSessionVariable().getGlobalVariantMaxSubcolumnsCount();
+ try {
+ variantMaxSubcolumnsCount = PropertyAnalyzer
+
.analyzeVariantMaxSubcolumnsCount(properties, variantMaxSubcolumnsCount);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ olapTable.setVariantMaxSubcolumnsCount(variantMaxSubcolumnsCount);
+
// a set to record every new tablet created when create table
// if failed in any step, use this set to do clear things
Set<Long> tabletIdSet = new HashSet<>();
@@ -3280,6 +3294,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
throw t;
}
}
+
return tableHasExist;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java
index 414dac1c95d..7e43bdc5401 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/VariantSubPathPruning.java
@@ -307,7 +307,7 @@ public class VariantSubPathPruning extends
DefaultPlanRewriter<PruneContext> imp
}
SlotReference outputSlot = new
SlotReference(StatementScopeIdGenerator.newExprId(),
- entry.getValue().get(0).getName(),
VariantType.INSTANCE,
+ entry.getValue().get(0).getName(),
entry.getValue().get(0).getDataType(),
true, ImmutableList.of(),
null,
null,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignature.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignature.java
index ea6997e8482..ba902847e92 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignature.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignature.java
@@ -114,6 +114,7 @@ public interface ComputeSignature extends FunctionTrait,
ImplicitCastInputTypes
.then(ComputeSignatureHelper::implementFollowToArgumentReturnType)
.then(ComputeSignatureHelper::normalizeDecimalV2)
.then(ComputeSignatureHelper::dynamicComputePropertiesOfArray)
+ .then(ComputeSignatureHelper::dynamicComputeVariantArgs)
.get();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignatureHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignatureHelper.java
index 166f1c9db7f..662185bc8eb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignatureHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/ComputeSignatureHelper.java
@@ -31,6 +31,7 @@ import org.apache.doris.nereids.types.DecimalV3Type;
import org.apache.doris.nereids.types.MapType;
import org.apache.doris.nereids.types.NullType;
import org.apache.doris.nereids.types.StructType;
+import org.apache.doris.nereids.types.VariantType;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.nereids.types.coercion.FollowToAnyDataType;
import org.apache.doris.nereids.types.coercion.FollowToArgumentType;
@@ -428,6 +429,34 @@ public class ComputeSignatureHelper {
return signature;
}
+ /** dynamicComputeVariantArgs */
+ public static FunctionSignature dynamicComputeVariantArgs(
+ FunctionSignature signature, List<Expression> arguments) {
+ List<DataType> newArgTypes =
Lists.newArrayListWithCapacity(arguments.size());
+ boolean findVariantType = false;
+ for (int i = 0; i < arguments.size(); i++) {
+ DataType sigType;
+ if (i >= signature.argumentsTypes.size()) {
+ sigType = signature.getVarArgType().orElseThrow(
+ () -> new AnalysisException("function arity not match
with signature"));
+ } else {
+ sigType = signature.argumentsTypes.get(i);
+ }
+ DataType expressionType = arguments.get(i).getDataType();
+ if (sigType instanceof VariantType && expressionType instanceof
VariantType) {
+ newArgTypes.add(expressionType);
+ signature = signature.withReturnType(expressionType);
+ findVariantType = true;
+ } else {
+ newArgTypes.add(sigType);
+ }
+ }
+ if (findVariantType) {
+ signature = signature.withArgumentTypes(signature.hasVarArgs,
newArgTypes);
+ }
+ return signature;
+ }
+
private static FunctionSignature defaultDecimalV3PrecisionPromotion(
FunctionSignature signature, List<Expression> arguments) {
DecimalV3Type finalType = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/generator/ExplodeVariantArray.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/generator/ExplodeVariantArray.java
index 62d7eb72e20..c0e60fdf135 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/generator/ExplodeVariantArray.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/generator/ExplodeVariantArray.java
@@ -36,7 +36,7 @@ import java.util.List;
public class ExplodeVariantArray extends TableGeneratingFunction implements
UnaryExpression, AlwaysNullable {
public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
- FunctionSignature.ret(new VariantType()).args(new VariantType())
+ FunctionSignature.ret(new VariantType(0)).args(new VariantType(0))
);
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
index c8d54189d37..ed62fab5ad9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
@@ -45,8 +45,8 @@ public class ElementAt extends ScalarFunction
public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(new FollowToAnyDataType(0))
.args(ArrayType.of(new AnyDataType(0)),
BigIntType.INSTANCE),
- FunctionSignature.ret(new VariantType())
- .args(new VariantType(), VarcharType.SYSTEM_DEFAULT),
+ FunctionSignature.ret(new VariantType(0))
+ .args(new VariantType(0), VarcharType.SYSTEM_DEFAULT),
FunctionSignature.ret(new FollowToAnyDataType(1))
.args(MapType.of(new AnyDataType(0), new AnyDataType(1)),
new FollowToAnyDataType(0))
);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java
index 9b77017f6de..e54877f8d2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java
@@ -351,7 +351,10 @@ public abstract class DataType {
case CHAR: return CharType.createCharType(type.getLength());
case VARCHAR: return
VarcharType.createVarcharType(type.getLength());
case STRING: return StringType.INSTANCE;
- case VARIANT: return VariantType.INSTANCE;
+ case VARIANT: {
+ ScalarType scType = (ScalarType) type;
+ return new VariantType(scType.getVariantMaxSubcolumnsCount());
+ }
case JSONB: return JsonType.INSTANCE;
case IPV4: return IPv4Type.INSTANCE;
case IPV6: return IPv6Type.INSTANCE;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VariantType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VariantType.java
index 63752594998..a115c1bd25b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VariantType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/VariantType.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.types;
+import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.nereids.annotation.Developing;
import org.apache.doris.nereids.types.coercion.PrimitiveType;
@@ -31,13 +32,25 @@ import java.util.Objects;
@Developing
public class VariantType extends PrimitiveType {
- public static final VariantType INSTANCE = new VariantType();
+ public static final VariantType INSTANCE = new VariantType(0);
public static final int WIDTH = 24;
+ private int variantMaxSubcolumnsCount = 0;
+
+ // public static createVariantType(int variantMaxSubcolumnsCount) {
+ // return new VariantType(variantMaxSubcolumnsCount);
+ // }
+
+ public VariantType(int variantMaxSubcolumnsCount) {
+ this.variantMaxSubcolumnsCount = variantMaxSubcolumnsCount;
+ }
+
@Override
public Type toCatalogDataType() {
- return Type.VARIANT;
+ ScalarType type = ScalarType.createVariantType();
+ type.setVariantMaxSubcolumnsCount(variantMaxSubcolumnsCount);
+ return type;
}
@Override
@@ -58,7 +71,8 @@ public class VariantType extends PrimitiveType {
if (o == null || getClass() != o.getClass()) {
return false;
}
- return super.equals(o);
+ VariantType other = (VariantType) o;
+ return this.variantMaxSubcolumnsCount ==
other.variantMaxSubcolumnsCount;
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index cf26cce7383..f698ce7d269 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -702,6 +702,8 @@ public class SessionVariable implements Serializable,
Writable {
*/
public static final String ENABLE_AUTO_CREATE_WHEN_OVERWRITE =
"enable_auto_create_when_overwrite";
+ public static final String GLOBAL_VARIANT_SUBCOLUMNS_COUNT =
"global_variant_max_subcolumns_count";
+
/**
* If set false, user couldn't submit analyze SQL and FE won't allocate
any related resources.
*/
@@ -2371,6 +2373,14 @@ public class SessionVariable implements Serializable,
Writable {
})
public boolean skipCheckingAcidVersionFile = false;
+ @VariableMgr.VarAttr(
+ name = GLOBAL_VARIANT_SUBCOLUMNS_COUNT,
+ needForward = true,
+ checker = "checkGlobalVariantMaxSubcolumnsCount",
+ fuzzy = true
+ )
+ public int globalVariantMaxSubcolumnsCount = 5;
+
public void setEnableEsParallelScroll(boolean enableESParallelScroll) {
this.enableESParallelScroll = enableESParallelScroll;
}
@@ -2412,6 +2422,7 @@ public class SessionVariable implements Serializable,
Writable {
this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
// this.enableHashJoinEarlyStartProbe = random.nextBoolean();
this.enableParallelResultSink = random.nextBoolean();
+ this.globalVariantMaxSubcolumnsCount = random.nextInt(10);
int randomInt = random.nextInt(4);
if (randomInt % 2 == 0) {
this.rewriteOrToInPredicateThreshold = 100000;
@@ -3722,6 +3733,14 @@ public class SessionVariable implements Serializable,
Writable {
}
}
+ public void checkGlobalVariantMaxSubcolumnsCount(String
variantMaxSubcolumnsCount) {
+ int value = Integer.valueOf(variantMaxSubcolumnsCount);
+ if (value < 0 || value > 10000) {
+ throw new UnsupportedOperationException(
+ "variant max subcolumns count is: " +
variantMaxSubcolumnsCount + "it must between 0 and 10000");
+ }
+ }
+
public void checkQueryTimeoutValid(String newQueryTimeout) {
int value = Integer.valueOf(newQueryTimeout);
if (value <= 0) {
@@ -4634,4 +4653,8 @@ public class SessionVariable implements Serializable,
Writable {
public boolean getDisableInvertedIndexV1ForVaraint() {
return disableInvertedIndexV1ForVaraint;
}
+
+ public int getGlobalVariantMaxSubcolumnsCount() {
+ return globalVariantMaxSubcolumnsCount;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 6b50cd32a1e..9fe4933429c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -45,7 +45,6 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
-import org.apache.doris.catalog.VariantType;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
@@ -721,7 +720,7 @@ public class StatisticsUtil {
return type instanceof ArrayType
|| type instanceof StructType
|| type instanceof MapType
- || type instanceof VariantType
+ || type.isVariantType()
|| type instanceof AggStateType;
}
diff --git
a/fe_plugins/trino-converter/src/main/java/org/apache/doris/plugin/dialect/trino/TrinoLogicalPlanBuilder.java
b/fe_plugins/trino-converter/src/main/java/org/apache/doris/plugin/dialect/trino/TrinoLogicalPlanBuilder.java
index 2e6ace105b1..4f81182837a 100644
---
a/fe_plugins/trino-converter/src/main/java/org/apache/doris/plugin/dialect/trino/TrinoLogicalPlanBuilder.java
+++
b/fe_plugins/trino-converter/src/main/java/org/apache/doris/plugin/dialect/trino/TrinoLogicalPlanBuilder.java
@@ -324,6 +324,8 @@ public class TrinoLogicalPlanBuilder extends
io.trino.sql.tree.AstVisitor<Object
} else if (dataType instanceof io.trino.sql.tree.DateTimeDataType) {
// TODO: support date data type mapping
throw new DialectTransformException("transform date data type");
+ } else if("variant".eqluals(typeName)) {
+ throw new DialectTransformException("transform variant data type");
}
throw new AnalysisException("Nereids do not support type: " +
dataType);
}
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index 9b3824db3dc..26c0c9f0c31 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -63,6 +63,7 @@ message PColumnMeta {
optional bool result_is_nullable = 6;
optional string function_name = 7;
optional int32 be_exec_version = 8;
+ optional int32 variant_max_subcolumns_count = 9 [default = 0];
}
message PBlock {
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 2c378fe2d45..3908b1c1847 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -321,6 +321,7 @@ message ColumnPB {
// only reference by variant sparse columns
optional int32 parent_unique_id = 23;
optional int32 be_exec_version = 24;
+ optional int32 variant_max_subcolumns_count = 25 [default = 0];
}
// Dictionary of Schema info, to reduce TabletSchemaCloudPB fdb kv size
diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto
index dee4a81d3bb..c51982a8dab 100644
--- a/gensrc/proto/segment_v2.proto
+++ b/gensrc/proto/segment_v2.proto
@@ -204,6 +204,7 @@ message ColumnMetaPB {
optional string function_name = 19; // used on agg_state type
optional int32 be_exec_version = 20; // used on agg_state type
optional VariantStatisticsPB variant_statistics = 21; // only used in
variant type
+ optional int32 variant_max_subcolumns_count = 22 [default = 0];
}
message PrimaryKeyIndexMetaPB {
diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto
index 012434dc3bc..c6beb626e96 100644
--- a/gensrc/proto/types.proto
+++ b/gensrc/proto/types.proto
@@ -53,6 +53,9 @@ message PTypeNode {
optional bool contains_null = 4;
// update for map/struct type
repeated bool contains_nulls = 5;
+
+ // only used for VARIANT
+ optional int32 variant_max_subcolumns_count = 6 [default = 0];
};
// A flattened representation of a tree of column types obtained by depth-first
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 623da9ce067..785cfb6d93b 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -105,7 +105,7 @@ enum TTypeNodeType {
ARRAY,
MAP,
STRUCT,
- VARIANT,
+ VARIANT, // unused
}
enum TStorageBackendType {
@@ -137,6 +137,9 @@ struct TScalarType {
// Only set for DECIMAL
3: optional i32 precision
4: optional i32 scale
+
+ // Only set for VARIANT
+ 5: optional i32 variant_max_subcolumns_count = 0;
}
// Represents a field in a STRUCT type.
@@ -280,6 +283,7 @@ struct TColumnType {
3: optional i32 index_len
4: optional i32 precision
5: optional i32 scale
+ 6: optional i32 variant_max_subcolumns_count = 0;
}
// A TNetworkAddress is the standard host, port representation of a
diff --git a/regression-test/data/variant_p0/delete_update.out
b/regression-test/data/variant_p0/delete_update.out
index 2b014bc0674..574464e9e86 100644
Binary files a/regression-test/data/variant_p0/delete_update.out and
b/regression-test/data/variant_p0/delete_update.out differ
diff --git a/regression-test/data/variant_p0/test_sub_path_pruning.out
b/regression-test/data/variant_p0/test_sub_path_pruning.out
index ae75160a91d..54570025f52 100644
Binary files a/regression-test/data/variant_p0/test_sub_path_pruning.out and
b/regression-test/data/variant_p0/test_sub_path_pruning.out differ
diff --git a/regression-test/data/variant_p0/update/load.out
b/regression-test/data/variant_p0/update/load.out
new file mode 100644
index 00000000000..c1c66031e7c
Binary files /dev/null and b/regression-test/data/variant_p0/update/load.out
differ
diff --git a/regression-test/data/variant_p0/update/query.out
b/regression-test/data/variant_p0/update/query.out
new file mode 100644
index 00000000000..ea7dff7c6fb
Binary files /dev/null and b/regression-test/data/variant_p0/update/query.out
differ
diff --git
a/regression-test/data/variant_p1/compaction/test_compaction_extract_root.out
b/regression-test/data/variant_p1/compaction/test_compaction_extract_root.out
index 0adc70c83aa..ed982672fa5 100644
Binary files
a/regression-test/data/variant_p1/compaction/test_compaction_extract_root.out
and
b/regression-test/data/variant_p1/compaction/test_compaction_extract_root.out
differ
diff --git a/regression-test/suites/variant_p0/delete_update.groovy
b/regression-test/suites/variant_p0/delete_update.groovy
index 7317b2c3145..6af093e7de1 100644
--- a/regression-test/suites/variant_p0/delete_update.groovy
+++ b/regression-test/suites/variant_p0/delete_update.groovy
@@ -28,15 +28,15 @@ suite("regression_test_variant_delete_and_update",
"variant_type"){
)
UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 3
- properties("replication_num" = "1", "enable_unique_key_merge_on_write"
= "false", "variant_enable_flatten_nested" = "true");
+ properties("replication_num" = "1", "enable_unique_key_merge_on_write"
= "false", "variant_enable_flatten_nested" = "false");
"""
// test mor table
- sql """insert into ${table_name} values (1, '{"a":1,"b":[1],"c":1.0, "d" :
[{"x" : 1}]}')"""
- sql """insert into ${table_name} values (2, '{"a":2,"b":[1],"c":2.0, "d" :
[{"y" : 1}]}')"""
- sql """insert into ${table_name} values (3, '{"a":3,"b":[3],"c":3.0, "d" :
[{"o" : 1}]}')"""
- sql """insert into ${table_name} values (4, '{"a":4,"b":[4],"c":4.0, "d" :
[{"p" : 1}]}')"""
- sql """insert into ${table_name} values (5, '{"a":5,"b":[5],"c":5.0, "d" :
[{"q" : 1}]}')"""
+ sql """insert into ${table_name} values (1, '{"a":1,"b":[1],"c":1.1, "d" :
[{"x" : 1}]}')"""
+ sql """insert into ${table_name} values (2, '{"a":2,"b":[1],"c":2.1, "d" :
[{"y" : 1}]}')"""
+ sql """insert into ${table_name} values (3, '{"a":3,"b":[3],"c":3.1, "d" :
[{"o" : 1}]}')"""
+ sql """insert into ${table_name} values (4, '{"a":4,"b":[4],"c":4.1, "d" :
[{"p" : 1}]}')"""
+ sql """insert into ${table_name} values (5, '{"a":5,"b":[5],"c":5.1, "d" :
[{"q" : 1}]}')"""
sql "delete from ${table_name} where k = 1"
sql """update ${table_name} set v = '{"updated_value":123}' where k = 2"""
diff --git a/regression-test/suites/variant_p0/element_function.groovy
b/regression-test/suites/variant_p0/element_function.groovy
index 7b5e55ea53b..51555508c33 100644
--- a/regression-test/suites/variant_p0/element_function.groovy
+++ b/regression-test/suites/variant_p0/element_function.groovy
@@ -16,7 +16,8 @@
// under the License.
suite("regression_test_variant_element_at", "p0") {
- sql """
+ sql """ DROP TABLE IF EXISTS element_fn_test """
+ sql """
CREATE TABLE IF NOT EXISTS element_fn_test(
k bigint,
v variant,
diff --git a/regression-test/suites/variant_p0/load.groovy
b/regression-test/suites/variant_p0/load.groovy
index cd5e9ee523d..4b61f46e696 100644
--- a/regression-test/suites/variant_p0/load.groovy
+++ b/regression-test/suites/variant_p0/load.groovy
@@ -323,6 +323,7 @@ suite("regression_test_variant", "p0"){
// test mow with delete
table_name = "variant_mow"
+ sql """ DROP TABLE IF EXISTS ${table_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
k bigint,
diff --git a/regression-test/suites/variant_p0/select_partition.groovy
b/regression-test/suites/variant_p0/select_partition.groovy
index a057e3b9a1d..c5e30aebc7d 100644
--- a/regression-test/suites/variant_p0/select_partition.groovy
+++ b/regression-test/suites/variant_p0/select_partition.groovy
@@ -60,6 +60,7 @@ suite("query_on_specific_partition") {
qt_sql """select * from t_p temporary partition tp1;"""
+ sql """ DROP TABLE IF EXISTS test_iot """
sql """
CREATE TABLE IF NOT EXISTS test_iot (
`test_int` int NOT NULL,
diff --git a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
index 1210c57e3bc..77dd439f1bf 100644
--- a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
+++ b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
@@ -26,7 +26,7 @@ suite("variant_sub_path_pruning", "variant_type"){
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id)
- PROPERTIES("replication_num"="1")
+ PROPERTIES("replication_num"="1", "variant_max_subcolumns_count" = "0")
"""
sql """
@@ -156,19 +156,19 @@ suite("variant_sub_path_pruning", "variant_type"){
order_qt_sql """select dt['a'] as c1 from pruning_test union all select
dt['a'] as c1 from pruning_test union all select dt['a'] as c1 from
pruning_test;"""
order_qt_sql """select c1['a'] from (select dt as c1 from pruning_test
union all select dt as c1 from pruning_test union all select dt as c1 from
pruning_test) v1;"""
order_qt_sql """select c1['b'] from (select dt['a'] as c1 from
pruning_test union all select dt['a'] as c1 from pruning_test union all select
dt['a'] as c1 from pruning_test) v1;"""
- order_qt_sql """select c1['d'] from (select dt['a'] as c1 from
pruning_test union all select dt['b'] as c1 from pruning_test union all select
dt['c'] as c1 from pruning_test) v1;"""
+ // order_qt_sql """select c1['d'] from (select dt['a'] as c1 from
pruning_test union all select dt['b'] as c1 from pruning_test union all select
dt['c'] as c1 from pruning_test) v1;"""
order_qt_sql """select c1['d'] from (select dt['a'] as c1 from
pruning_test union all select dt['b'] as c1 from pruning_test union all select
dt['b'] as c1 from pruning_test) v1;"""
order_qt_sql """select c1['c']['d'] from (select dt['a']['b'] as c1 from
pruning_test union all select dt['a'] as c1 from pruning_test union all select
dt as c1 from pruning_test) v1;"""
// one table + one const list
order_qt_sql """select id, cast(c1['a'] as text) from (select
cast('{"a":1}' as variant) as c1, 1 as id union all select dt as c1, id from
pruning_test) tmp order by id limit 100;"""
order_qt_sql """select c1['a'] from (select id, c1 from (select
cast('{"a":1}' as variant) as c1, 1 as id union all select dt as c1, id from
pruning_test) tmp order by id limit 100) tmp;"""
- order_qt_sql """select c2['b'] from (select id, cast(c1['a'] as text) as
c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 1 as id union all
select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;"""
- // order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from
(select cast('1' as variant) as c1, 1 as id union all select dt as c1, id from
pruning_test) tmp order by id limit 100) tmp;"""
+ order_qt_sql """select c2['b'] from (select id, cast(c1['a'] as variant)
as c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 0 as id union all
select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;"""
+ order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from (select
cast('1' as variant) as c1, 1 as id union all select dt as c1, id from
pruning_test) tmp order by id limit 100) tmp;"""
order_qt_sql """select id, cast(c1['c'] as text) from (select
cast('{"c":1}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1,
id from pruning_test) tmp order by 1, 2 limit 100;"""
order_qt_sql """select c1['c'] from (select id, c1 from (select
cast('{"c":1}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1,
id from pruning_test) tmp order by id limit 100) tmp;"""
- // order_qt_sql """select cast(c2['d'] as text) from (select id, c1['a']
as c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all
select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp
order by 1;"""
- // order_qt_sql """select c2['c']['d'] from (select id, c1 as c2 from
(select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all select
dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;"""
+ order_qt_sql """select cast(c2['d'] as text) from (select id, c1['a'] as
c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all
select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp
order by 1;"""
+ order_qt_sql """select c2['c']['d'] from (select id, c1 as c2 from (select
cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all select dt['a']['b']
as c1, id from pruning_test) tmp order by id limit 100) tmp;"""
// two const list
order_qt_sql """select id, cast(c1['a'] as text) from (select
cast('{"a":1}' as variant) as c1, 1 as id union all select cast('{"a":1}' as
variant) as c1, 2 as id) tmp order by id limit 100;"""
diff --git a/regression-test/suites/variant_p0/update/load.groovy
b/regression-test/suites/variant_p0/update/load.groovy
new file mode 100644
index 00000000000..a857a912da3
--- /dev/null
+++ b/regression-test/suites/variant_p0/update/load.groovy
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("update_test_load", "p0") {
+
+ def load_json_data = {table_name, file_name ->
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'read_json_by_line', 'true'
+ set 'format', 'json'
+ set 'max_filter_ratio', '0.1'
+ set 'memtable_on_sink_node', 'true'
+ file file_name // import json file
+ time 10000 // limit inflight 10s
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ logger.info("Stream load ${file_name} result:
${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ // assertEquals(json.NumberTotalRows, json.NumberLoadedRows +
json.NumberUnselectedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+
+ def table_name = "test_update"
+
+ sql "DROP TABLE IF EXISTS ${table_name}"
+ sql "DROP MATERIALIZED VIEW IF EXISTS
regression_test_variant_p0_update.table_mv2;"
+ sql "DROP MATERIALIZED VIEW IF EXISTS
regression_test_variant_p0_update.table_mv4;"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ k bigint,
+ v variant
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS 6
+ properties("replication_num" = "1", "disable_auto_compaction" =
"true");
+ """
+
+ for (int i = 0; i < 10; i++) {
+ load_json_data.call(table_name, """${getS3Url() +
'/regression/load/ghdata_sample.json'}""")
+ }
+
+ qt_sql """ select count() from ${table_name} """
+
+ createMV ("create materialized view table_mv1 as select
(abs(cast(v['repo']['id'] as int)) + cast(v['payload']['review']['user']['id']
as int) + 20) as kk from ${table_name};")
+
+ explain {
+ sql("select max(kk) from (select (abs(cast(v['repo']['id'] as int)) +
cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv;")
+ contains("table_mv1 chose")
+ }
+ explain {
+ sql("select min(kk) from (select (abs(cast(v['repo']['id'] as int)) +
cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv;")
+ contains("table_mv1 chose")
+ }
+ explain {
+ sql("select count(kk) from (select (abs(cast(v['repo']['id'] as int))
+ cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv;")
+ contains("table_mv1 chose")
+ }
+
+ qt_sql """ select max(kk) from (select (abs(cast(v['repo']['id'] as int))
+ cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv; """
+ qt_sql """ select min(kk) from (select (abs(cast(v['repo']['id'] as int))
+ cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv; """
+ qt_sql """ select count(kk) from (select (abs(cast(v['repo']['id'] as
int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv; """
+
+ sql """
+ CREATE MATERIALIZED VIEW table_mv2 BUILD IMMEDIATE REFRESH AUTO ON
MANUAL DISTRIBUTED BY RANDOM BUCKETS 6 PROPERTIES
+('replication_num' = '1') AS SELECT cast(v['type'] as text), cast(v['public']
as int) FROM ${table_name};
+ """
+ waitingMTMVTaskFinishedByMvName("table_mv2")
+
+ explain {
+ sql("SELECT sum(cast(v['public'] as int)) FROM ${table_name} group by
cast(v['type'] as text) order by cast(v['type'] as text);")
+ contains("table_mv2 chose")
+ }
+
+ qt_sql """ SELECT sum(cast(v['public'] as int)) FROM ${table_name} group
by cast(v['type'] as text) order by cast(v['type'] as text); """
+
+
+ def create_table_load_data = {create_table_name->
+ sql "DROP TABLE IF EXISTS ${create_table_name}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${create_table_name} (
+ k bigint,
+ v variant NOT NULL
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS 6
+ properties("replication_num" = "1", "disable_auto_compaction" =
"true");
+ """
+
+ for (int i = 0; i < 10; i++) {
+ load_json_data.call(create_table_name, """${getS3Url() +
'/regression/load/ghdata_sample.json'}""")
+ }
+ }
+
+ create_table_load_data.call("test_update_sc")
+ create_table_load_data.call("test_update_compact")
+ create_table_load_data.call("test_update_sc2")
+}
diff --git a/regression-test/suites/variant_p0/update/query.groovy
b/regression-test/suites/variant_p0/update/query.groovy
new file mode 100644
index 00000000000..efc3b880548
--- /dev/null
+++ b/regression-test/suites/variant_p0/update/query.groovy
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("update_test_query", "p0") {
+
+ def load_json_data = {table_name, file_name ->
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'read_json_by_line', 'true'
+ set 'format', 'json'
+ set 'max_filter_ratio', '0.1'
+ set 'memtable_on_sink_node', 'true'
+ file file_name // import json file
+ time 10000 // limit inflight 10s
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ logger.info("Stream load ${file_name} result:
${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ // assertEquals(json.NumberTotalRows, json.NumberLoadedRows +
json.NumberUnselectedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+
+ def table_name = "test_update"
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def compaction = {compact_table_name ->
+
+ def tablets = sql_return_maparray """ show tablets from
${compact_table_name}; """
+
+ // trigger compactions for all tablets in ${tableName}
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def backend_id = tablet.BackendId
+ def (code, out, err) =
be_run_full_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ assertEquals("success", compactJson.status.toLowerCase())
+ }
+
+ // wait for all compactions done
+ for (def tablet in tablets) {
+ Awaitility.await().atMost(30, TimeUnit.MINUTES).untilAsserted(()
-> {
+ Thread.sleep(10000)
+ String tablet_id = tablet.TabletId
+ def backend_id = tablet.BackendId
+ def (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("compaction task for this tablet is not running",
compactionStatus.msg.toLowerCase())
+ });
+ }
+
+
+ for (def tablet in tablets) {
+ int afterSegmentCount = 0
+ String tablet_id = tablet.TabletId
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ logger.info("rowset is: " + rowset)
+ afterSegmentCount += Integer.parseInt(rowset.split(" ")[1])
+ }
+ assertEquals(afterSegmentCount, 1)
+ }
+ }
+
+ for (int i = 0; i < 10; i++) {
+ load_json_data.call(table_name, """${getS3Url() +
'/regression/load/ghdata_sample.json'}""")
+ }
+
+ def normal_check = {
+ qt_sql """ select count() from ${table_name} """
+ qt_sql """ select v['actor'] from ${table_name} order by k limit 1"""
+ qt_sql """ select count(cast (v['repo']['url'] as text)) from
${table_name} group by cast (v['type'] as text) order by cast (v['type'] as
text)"""
+ qt_sql """ select max(cast (v['public'] as tinyint)) from
${table_name}"""
+ }
+
+ def dbName = "regression_test_variant_p0_update"
+
+ // mv1, mv2
+ def mv_check = {
+ sql 'REFRESH MATERIALIZED VIEW table_mv2 AUTO'
+ waitingMTMVTaskFinished(getJobName(dbName, 'table_mv2'))
+ explain {
+ sql("select max(kk) from (select (abs(cast(v['repo']['id'] as
int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv;")
+ contains("table_mv1 chose")
+ }
+ explain {
+ sql("select min(kk) from (select (abs(cast(v['repo']['id'] as
int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv;")
+ contains("table_mv1 chose")
+ }
+ explain {
+ sql("select count(kk) from (select (abs(cast(v['repo']['id'] as
int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv;")
+ contains("table_mv1 chose")
+ }
+
+ explain {
+ sql("SELECT sum(cast(v['public'] as int)) FROM ${table_name}
group by cast(v['type'] as text) order by cast(v['type'] as text);")
+ contains("table_mv2 chose")
+ }
+
+ qt_sql """ select max(kk) from (select (abs(cast(v['repo']['id'] as
int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv; """
+ qt_sql """ select min(kk) from (select (abs(cast(v['repo']['id'] as
int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv; """
+ qt_sql """ select count(kk) from (select (abs(cast(v['repo']['id'] as
int)) + cast(v['payload']['review']['user']['id'] as int) + 20) as kk from
${table_name}) as mv; """
+ qt_sql """ SELECT sum(cast(v['public'] as int)) FROM ${table_name}
group by cast(v['type'] as text) order by cast(v['type'] as text); """
+ }
+
+ // mv3, mv4
+ def mv_check2 = {
+ sql 'REFRESH MATERIALIZED VIEW table_mv4 AUTO'
+ waitingMTMVTaskFinished(getJobName(dbName, 'table_mv4'))
+ explain {
+ sql("select max(element) from (select (abs(cast(v['org']['id'] as
int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from
${table_name}) as mv2;")
+ contains("table_mv3 chose")
+ }
+ explain {
+ sql("select min(element) from (select (abs(cast(v['org']['id'] as
int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from
${table_name}) as mv2;")
+ contains("table_mv3 chose")
+ }
+ explain {
+ sql("select count(element) from (select (abs(cast(v['org']['id']
as int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from
${table_name}) as mv2;")
+ contains("table_mv3 chose")
+ }
+ explain {
+ sql("SELECT cast(v['payload']['before'] as text) FROM
${table_name} order by cast(v['actor']['id'] as int) limit 1; ")
+ contains("table_mv4 chose")
+ }
+ qt_sql """ select max(element) from (select (abs(cast(v['org']['id']
as int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from
${table_name}) as mv2; """
+ qt_sql """ select min(element) from (select (abs(cast(v['org']['id']
as int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from
${table_name}) as mv2; """
+ qt_sql """ select count(element) from (select (abs(cast(v['org']['id']
as int)) + cast(v['payload']['comment']['id'] as int) + 30) as element from
${table_name}) as mv2; """
+ qt_sql """ SELECT cast(v['payload']['before'] as text) FROM
${table_name} order by cast(v['actor']['id'] as int) limit 1; """
+ }
+
+ createMV ("create materialized view table_mv3 as select
(abs(cast(v['org']['id'] as int)) + cast(v['payload']['comment']['id'] as int)
+ 30) as element from ${table_name};")
+
+ sql """
+ CREATE MATERIALIZED VIEW table_mv4 BUILD IMMEDIATE REFRESH AUTO ON
MANUAL DISTRIBUTED BY RANDOM BUCKETS 1 PROPERTIES
+('replication_num' = '1') AS SELECT cast(v['payload']['before'] as text),
cast(v['actor']['id'] as int) FROM ${table_name};
+ """
+ waitingMTMVTaskFinishedByMvName("table_mv4")
+
+ normal_check.call()
+ mv_check.call()
+ mv_check2.call()
+
+ compaction.call(table_name)
+
+ normal_check.call()
+ mv_check.call()
+ mv_check2.call()
+
+ def table_name_sc = "test_update_sc"
+
+ for (int i = 0; i < 10; i++) {
+ load_json_data.call(table_name_sc, """${getS3Url() +
'/regression/load/ghdata_sample.json'}""")
+ }
+
+ def schema_change = {schema_change_table_name ->
+ def tablets = sql_return_maparray """ show tablets from
${schema_change_table_name}; """
+ Set<String> rowsetids = new HashSet<>();
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ int segmentCount = Integer.parseInt(rowset.split(" ")[1])
+ if (segmentCount == 0) {
+ continue;
+ }
+ String rowsetid = rowset.split(" ")[4];
+ rowsetids.add(rowsetid)
+ logger.info("rowsetid: " + rowsetid)
+ }
+ }
+ sql """ alter table ${schema_change_table_name} modify column v
variant null"""
+ Awaitility.await().atMost(30, TimeUnit.MINUTES).untilAsserted(() -> {
+ Thread.sleep(10000)
+ tablets = sql_return_maparray """ show tablets from
${schema_change_table_name}; """
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ int segmentCount = Integer.parseInt(rowset.split(" ")[1])
+ if (segmentCount == 0) {
+ continue;
+ }
+ String rowsetid = rowset.split(" ")[4];
+ logger.info("rowsetid: " + rowsetid)
+ assertTrue(!rowsetids.contains(rowsetid))
+ }
+ }
+ });
+ }
+
+ def sql_check = { check_table_name ->
+ qt_sql """ select count() from ${check_table_name} """
+ qt_sql """ select v['actor'] from ${check_table_name} order by k limit
1"""
+ qt_sql """ select count(cast (v['repo']['url'] as text)) from
${check_table_name} group by cast (v['type'] as text) order by cast (v['type']
as text) """
+ qt_sql """ select max(cast (v['public'] as tinyint)) from
${check_table_name}"""
+ }
+
+ sql_check.call(table_name_sc)
+ schema_change.call(table_name_sc)
+ sql_check.call(table_name_sc)
+
+ def table_name_compact = "test_update_compact"
+
+ sql_check.call(table_name_compact)
+ compaction.call(table_name_compact)
+ sql_check.call(table_name_compact)
+
+ def table_name_sc2 = "test_update_sc2"
+
+ sql_check.call(table_name_sc2)
+ schema_change.call(table_name_sc2)
+ sql_check.call(table_name_sc2)
+}
diff --git
a/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy
b/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy
index 5d753b97382..2051d819dd8 100644
---
a/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy
+++
b/regression-test/suites/variant_p1/compaction/compaction_sparse_column.groovy
@@ -47,7 +47,6 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") {
try {
set_be_config.call("write_buffer_size", "10240")
- set_be_config.call("variant_max_subcolumns_count", "2")
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
@@ -59,7 +58,8 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") {
DISTRIBUTED BY HASH(`k`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
- "disable_auto_compaction" = "true"
+ "disable_auto_compaction" = "true",
+ "variant_max_subcolumns_count" = "3"
);
"""
@@ -71,7 +71,7 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") {
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
- (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ (code, out, err) =
be_run_full_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out +
", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
@@ -163,7 +163,7 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") {
qt_select_6_1_bfcompact """ SELECT count(cast(v['b'] as int)) FROM
${tableName} where cast(v['b'] as int) = 42005;"""
qt_select_all_bfcompact """SELECT k, v['a'], v['b'], v['xxxx'],
v['point'], v['ddddd'] from ${tableName} where (cast(v['point'] as int) = 1);"""
-
GetDebugPoint().enableDebugPointForAllBEs("variant_column_writer_impl._get_subcolumn_paths_from_stats",
[stats: "24588,12292,12291,3",subcolumns:"a,b"])
+
GetDebugPoint().enableDebugPointForAllBEs("variant_column_writer_impl._get_subcolumn_paths_from_stats",
[stats: "24588,12292,12291,3",subcolumns:"a,b,xxxx"])
triger_compaction.call()
/**
variant_statistics {
diff --git a/regression-test/suites/variant_p2/load.groovy
b/regression-test/suites/variant_p2/load.groovy
index a737ef943bb..b193b63e927 100644
--- a/regression-test/suites/variant_p2/load.groovy
+++ b/regression-test/suites/variant_p2/load.groovy
@@ -65,7 +65,7 @@ suite("load_p2", "variant_type,p2"){
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(id) BUCKETS ${buckets}
- properties("replication_num" = "1", "disable_auto_compaction" =
"false");
+ properties("replication_num" = "1", "disable_auto_compaction" =
"false", "variant_max_subcolumns_count" = "500");
"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]