This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 48935c14e2c [Improvement](variant) limit the column size on tablet
schema (#27399) (#27785)
48935c14e2c is described below
commit 48935c14e2c835a20aba61f4eac02453aa7d6c9d
Author: lihangyu <[email protected]>
AuthorDate: Mon Dec 4 14:47:36 2023 +0800
[Improvement](variant) limit the column size on tablet schema (#27399)
(#27785)
1. limit the column count to default 2048
2. fix get_inverted_index return nullptr when variant's unique id is -1,
using it's parent unique id instead
3. avoid add same path subcolumn duplicately in tablet schema
4. make extracted column unique id -1
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 3 ++
be/src/olap/base_tablet.cpp | 11 ++--
be/src/olap/base_tablet.h | 2 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 5 +-
be/src/olap/rowset/segment_creator.cpp | 7 +--
be/src/olap/rowset/segment_v2/segment.cpp | 5 +-
be/src/olap/rowset_builder.cpp | 2 +-
be/src/olap/tablet.cpp | 3 +-
be/src/olap/tablet_schema.cpp | 3 +-
be/src/vec/common/schema_util.cpp | 56 +++++++++++---------
be/src/vec/common/schema_util.h | 5 +-
.../suites/variant_p0/column_size_limit.groovy | 59 ++++++++++++++++++++++
regression-test/suites/variant_p0/load.groovy | 10 ++--
14 files changed, 128 insertions(+), 45 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 293ba3c0282..8efd18248e7 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1128,6 +1128,8 @@ DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");
DEFINE_Bool(enable_snapshot_action, "false");
+DEFINE_mInt32(variant_max_merged_tablet_schema_size, "2048");
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ddc47c678c6..1b9fd62a3ad 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1212,6 +1212,9 @@ DECLARE_mInt32(buffered_reader_read_timeout_ms);
// whether to enable /api/snapshot api
DECLARE_Bool(enable_snapshot_action);
+// The max columns size for a tablet schema
+DECLARE_mInt32(variant_max_merged_tablet_schema_size);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index db17e68706a..011a901ba5b 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -19,6 +19,7 @@
#include <fmt/format.h>
+#include "olap/tablet_fwd.h"
#include "olap/tablet_schema_cache.h"
#include "util/doris_metrics.h"
#include "vec/common/schema_util.h"
@@ -66,13 +67,17 @@ void BaseTablet::update_max_version_schema(const
TabletSchemaSPtr& tablet_schema
}
}
-void BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr&
update_schema) {
+Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr&
update_schema) {
std::lock_guard wrlock(_meta_lock);
CHECK(_max_version_schema->schema_version() >=
update_schema->schema_version());
- auto final_schema = vectorized::schema_util::get_least_common_schema(
- {_max_version_schema, update_schema}, _max_version_schema);
+ TabletSchemaSPtr final_schema;
+ bool check_column_size = true;
+ RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
+ {_max_version_schema, update_schema}, _max_version_schema,
final_schema,
+ check_column_size));
_max_version_schema = final_schema;
VLOG_DEBUG << "dump updated tablet schema: " <<
final_schema->dump_structure();
+ return Status::OK();
}
} /* namespace doris */
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index b3fc9f8b9b7..2fa494b420a 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -65,7 +65,7 @@ public:
void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);
- void update_by_least_common_schema(const TabletSchemaSPtr& update_schema);
+ Status update_by_least_common_schema(const TabletSchemaSPtr&
update_schema);
TabletSchemaSPtr tablet_schema() const {
std::shared_lock rlock(_meta_lock);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index bcac89ad9a0..3b2852a283f 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -561,8 +561,9 @@ bool BetaRowsetWriter::_is_segment_overlapping(
// => update_schema: A(bigint), B(double), C(int), D(int)
void BetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) {
std::lock_guard<std::mutex> lock(*(_context.schema_lock));
- TabletSchemaSPtr update_schema =
vectorized::schema_util::get_least_common_schema(
- {_context.tablet_schema, flush_schema}, nullptr);
+ TabletSchemaSPtr update_schema;
+ static_cast<void>(vectorized::schema_util::get_least_common_schema(
+ {_context.tablet_schema, flush_schema}, nullptr, update_schema));
CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
<< "Rowset merge schema columns count is " <<
update_schema->num_columns()
<< ", but flush_schema is larger " << flush_schema->num_columns()
diff --git a/be/src/olap/rowset/segment_creator.cpp
b/be/src/olap/rowset/segment_creator.cpp
index 7e904478b1d..af7cec3e80e 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -136,7 +136,7 @@ Status
SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
.build();
TabletColumn tablet_column =
vectorized::schema_util::get_column_by_type(
final_data_type_from_object, column_name,
- vectorized::schema_util::ExtraInfo {.unique_id =
parent_variant.unique_id(),
+ vectorized::schema_util::ExtraInfo {.unique_id = -1,
.parent_unique_id =
parent_variant.unique_id(),
.path_info = full_path});
flush_schema->append_column(std::move(tablet_column));
@@ -194,8 +194,9 @@ Status
SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
// ctx.tablet_schema: A(bigint), B(double)
// => update_schema: A(bigint), B(double), C(int), D(int)
std::lock_guard<std::mutex> lock(*(_context->schema_lock));
- TabletSchemaSPtr update_schema =
vectorized::schema_util::get_least_common_schema(
- {_context->tablet_schema, flush_schema}, nullptr);
+ TabletSchemaSPtr update_schema;
+ static_cast<void>(vectorized::schema_util::get_least_common_schema(
+ {_context->tablet_schema, flush_schema}, nullptr,
update_schema));
CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
<< "Rowset merge schema columns count is " <<
update_schema->num_columns()
<< ", but flush_schema is larger " <<
flush_schema->num_columns()
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index c7bda4d3adf..d3a4f3db135 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -355,13 +355,14 @@ Status Segment::_create_column_readers(const
SegmentFooterPB& footer) {
column_path_to_footer_ordinal;
for (uint32_t ordinal = 0; ordinal < footer.columns().size(); ++ordinal) {
auto& column_pb = footer.columns(ordinal);
+ // column path for accessing subcolumns of variant
if (column_pb.has_column_path_info()) {
- // column path
vectorized::PathInData path;
path.from_protobuf(column_pb.column_path_info());
column_path_to_footer_ordinal.emplace(path, ordinal);
}
- if (column_pb.has_unique_id()) {
+ // unique_id is unsigned, -1 meaning no unique id(e.g. an extracted
column from variant)
+ if (static_cast<int>(column_pb.unique_id()) >= 0) {
// unique id
column_id_to_footer_ordinal.emplace(column_pb.unique_id(),
ordinal);
}
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 395d37f7150..21fbed78022 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -309,7 +309,7 @@ Status RowsetBuilder::commit_txn() {
// _tabelt->tablet_schema: A(bigint), B(double)
// => update_schema: A(bigint), B(double), C(int), D(int)
const RowsetWriterContext& rw_ctx = _rowset_writer->context();
- _tablet->update_by_least_common_schema(rw_ctx.tablet_schema);
+
RETURN_IF_ERROR(_tablet->update_by_least_common_schema(rw_ctx.tablet_schema));
}
// Transfer ownership of `PendingRowsetGuard` to `TxnManager`
Status res = storage_engine->txn_manager()->commit_txn(_req.partition_id,
*tablet, _req.txn_id,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 21d390650ab..0d056d2c235 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -641,7 +641,8 @@ TabletSchemaSPtr
Tablet::tablet_schema_with_merged_max_schema_version(
std::vector<TabletSchemaSPtr> schemas;
std::transform(rowset_metas.begin(), rowset_metas.end(),
std::back_inserter(schemas),
[](const RowsetMetaSharedPtr& rs_meta) { return
rs_meta->tablet_schema(); });
- target_schema =
vectorized::schema_util::get_least_common_schema(schemas, nullptr);
+ static_cast<void>(
+ vectorized::schema_util::get_least_common_schema(schemas,
nullptr, target_schema));
VLOG_DEBUG << "dump schema: " << target_schema->dump_structure();
}
return target_schema;
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 85203b0b12b..9346c3573ac 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -1186,7 +1186,8 @@ const TabletIndex*
TabletSchema::get_inverted_index(int32_t col_unique_id,
const TabletIndex* TabletSchema::get_inverted_index(const TabletColumn& col)
const {
// TODO use more efficient impl
- int32_t col_unique_id = col.unique_id();
+ // Use parent id if unique not assigned, this could happend when accessing
subcolumns of variants
+ int32_t col_unique_id = col.unique_id() < 0 ? col.parent_unique_id() :
col.unique_id();
const std::string& suffix_path =
!col.path_info().empty() ?
escape_for_path_name(col.path_info().get_path()) : "";
return get_inverted_index(col_unique_id, suffix_path);
diff --git a/be/src/vec/common/schema_util.cpp
b/be/src/vec/common/schema_util.cpp
index 2c678f7051e..d0fbb287a14 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -310,10 +310,11 @@ void update_least_common_schema(const
std::vector<TabletSchemaSPtr>& schemas,
TabletColumn common_column;
// const std::string& column_name = variant_col_name + "." +
tuple_paths[i].get_path();
get_column_by_type(tuple_types[i], tuple_paths[i].get_path(),
common_column,
- ExtraInfo {.unique_id = variant_col_unique_id,
+ ExtraInfo {.unique_id = -1,
.parent_unique_id =
variant_col_unique_id,
.path_info = tuple_paths[i]});
- common_schema->append_column(common_column);
+ // set ColumnType::VARIANT to occupy _field_path_to_index
+ common_schema->append_column(common_column,
TabletSchema::ColumnType::VARIANT);
}
}
@@ -350,23 +351,21 @@ void inherit_tablet_index(TabletSchemaSPtr& schema) {
}
}
-TabletSchemaSPtr get_least_common_schema(const std::vector<TabletSchemaSPtr>&
schemas,
- const TabletSchemaSPtr& base_schema) {
- auto output_schema = std::make_shared<TabletSchema>();
+Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+ const TabletSchemaSPtr& base_schema,
TabletSchemaSPtr& output_schema,
+ bool check_schema_size) {
std::vector<int32_t> variant_column_unique_id;
- if (base_schema == nullptr) {
- // Pick tablet schema with max schema version
- auto max_version_schema =
- *std::max_element(schemas.cbegin(), schemas.cend(),
- [](const TabletSchemaSPtr a, const
TabletSchemaSPtr b) {
- return a->schema_version() <
b->schema_version();
- });
- CHECK(max_version_schema);
- output_schema->copy_from(*max_version_schema);
+
+ // Construct a schema excluding the extracted columns and gather unique
identifiers for variants.
+ // Ensure that the output schema also excludes these extracted columns.
This approach prevents
+ // duplicated paths following the update_least_common_schema process.
+ auto build_schema_without_extracted_columns = [&](const TabletSchemaSPtr&
base_schema) {
+ output_schema = std::make_shared<TabletSchema>();
+ output_schema->copy_from(*base_schema);
// Merge columns from other schemas
output_schema->clear_columns();
// Get all columns without extracted columns and collect variant col
unique id
- for (const TabletColumn& col : max_version_schema->columns()) {
+ for (const TabletColumn& col : base_schema->columns()) {
if (col.is_variant_type()) {
variant_column_unique_id.push_back(col.unique_id());
}
@@ -374,15 +373,19 @@ TabletSchemaSPtr get_least_common_schema(const
std::vector<TabletSchemaSPtr>& sc
output_schema->append_column(col);
}
}
+ };
+ if (base_schema == nullptr) {
+ // Pick tablet schema with max schema version
+ auto max_version_schema =
+ *std::max_element(schemas.cbegin(), schemas.cend(),
+ [](const TabletSchemaSPtr a, const
TabletSchemaSPtr b) {
+ return a->schema_version() <
b->schema_version();
+ });
+ CHECK(max_version_schema);
+ build_schema_without_extracted_columns(max_version_schema);
} else {
- // use input common schema as base schema
- // Get all columns without extracted columns and collect variant col
unique id
- for (const TabletColumn& col : base_schema->columns()) {
- if (col.is_variant_type()) {
- variant_column_unique_id.push_back(col.unique_id());
- }
- }
- output_schema->copy_from(*base_schema);
+ // use input base_schema schema as base schema
+ build_schema_without_extracted_columns(base_schema);
}
for (int32_t unique_id : variant_column_unique_id) {
@@ -390,7 +393,12 @@ TabletSchemaSPtr get_least_common_schema(const
std::vector<TabletSchemaSPtr>& sc
}
inherit_tablet_index(output_schema);
- return output_schema;
+ if (check_schema_size &&
+ output_schema->columns().size() >
config::variant_max_merged_tablet_schema_size) {
+ return Status::DataQualityError("Reached max column size limit {}",
+
config::variant_max_merged_tablet_schema_size);
+ }
+ return Status::OK();
}
Status parse_and_encode_variant_columns(Block& block, const std::vector<int>&
variant_pos) {
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index d5d01b57ed9..de5778157df 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -96,8 +96,9 @@ void encode_variant_sparse_subcolumns(Block& block, const
std::vector<int>& vari
// Then update all variant columns to there least common types.
// Return the final merged schema as common schema.
// If base_schema == nullptr then, max schema version tablet schema will be
picked as base schema
-TabletSchemaSPtr get_least_common_schema(const std::vector<TabletSchemaSPtr>&
schemas,
- const TabletSchemaSPtr& base_schema);
+Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+ const TabletSchemaSPtr& base_schema,
TabletSchemaSPtr& result,
+ bool check_schema_size = false);
// Get least common types for extracted columns which has Path info,
// with a speicified variant column's unique id
diff --git a/regression-test/suites/variant_p0/column_size_limit.groovy
b/regression-test/suites/variant_p0/column_size_limit.groovy
new file mode 100644
index 00000000000..70567d89c07
--- /dev/null
+++ b/regression-test/suites/variant_p0/column_size_limit.groovy
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import groovy.json.JsonBuilder
+
+suite("regression_test_variant_column_limit", "nonConcurrent"){
+ def set_be_config = { key, value ->
+ String backend_id;
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ", err="
+ err)
+ }
+ def table_name = "var_column_limit"
+ sql "DROP TABLE IF EXISTS ${table_name}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ k bigint,
+ v variant
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS 1
+ properties("replication_num" = "1", "disable_auto_compaction" =
"false");
+ """
+ try {
+ def jsonBuilder = new JsonBuilder()
+ def root = jsonBuilder {
+ // Generate 2049 fields
+ (1..2049).each { fieldNumber ->
+ "field$fieldNumber" fieldNumber
+ }
+ }
+
+ String jsonString = jsonBuilder.toPrettyString()
+ sql """insert into ${table_name} values (1, '$jsonString')"""
+ } catch(Exception ex) {
+ logger.info("""INSERT INTO ${table_name} failed: """ + ex)
+ assertTrue(ex.toString().contains("Reached max column"));
+ } finally {
+ }
+ sql """insert into ${table_name} values (1, '{"a" : 1, "b" : 2, "c" :
3}')"""
+
+}
\ No newline at end of file
diff --git a/regression-test/suites/variant_p0/load.groovy
b/regression-test/suites/variant_p0/load.groovy
index 2310e7605fc..57737397b49 100644
--- a/regression-test/suites/variant_p0/load.groovy
+++ b/regression-test/suites/variant_p0/load.groovy
@@ -75,7 +75,7 @@ suite("regression_test_variant", "variant_type"){
}
try {
-
+ set_be_config.call("variant_ratio_of_defaults_as_sparse_column",
"0.95")
def key_types = ["DUPLICATE", "UNIQUE"]
for (int i = 0; i < key_types.size(); i++) {
def table_name = "simple_variant_${key_types[i]}"
@@ -311,18 +311,17 @@ suite("regression_test_variant", "variant_type"){
qt_sql_35_1 """select v:json.parseFailed from logdata where
cast(v:json.parseFailed as string) is not null and k = 162 limit 1;"""
// TODO add test case that some certain columns are materialized in
some file while others are not materilized(sparse)
- // unique table
+ // unique table
set_be_config.call("variant_ratio_of_defaults_as_sparse_column",
"0.95")
- table_name = "github_events_unique"
- sql """DROP TABLE IF EXISTS ${table_name}"""
table_name = "github_events"
+ sql """DROP TABLE IF EXISTS ${table_name}"""
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
k bigint,
v variant
)
UNIQUE KEY(`k`)
- DISTRIBUTED BY HASH(k) BUCKETS 4
+ DISTRIBUTED BY HASH(k) BUCKETS 4
properties("replication_num" = "1", "disable_auto_compaction" =
"true");
"""
load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
@@ -399,5 +398,6 @@ suite("regression_test_variant", "variant_type"){
} finally {
// reset flags
set_be_config.call("variant_ratio_of_defaults_as_sparse_column",
"0.95")
+ set_be_config.call("variant_max_merged_tablet_schema_size", "2048")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]