This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch branch-2.0-var
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0-var by this push:
new ce2db21b393 [Improvement](variant) limit the column size on tablet
schema (#27399)
ce2db21b393 is described below
commit ce2db21b393d745a0c1060c34de89fe998e1f90b
Author: lihangyu <[email protected]>
AuthorDate: Wed Nov 22 18:31:27 2023 +0800
[Improvement](variant) limit the column size on tablet schema (#27399)
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 3 ++
be/src/olap/delta_writer.cpp | 2 +-
be/src/olap/tablet.cpp | 8 ++--
be/src/olap/tablet.h | 2 +-
be/src/vec/common/schema_util.cpp | 10 ++++-
be/src/vec/common/schema_util.h | 4 +-
.../suites/variant_p0/column_size_limit.groovy | 49 ++++++++++++++++++++++
8 files changed, 71 insertions(+), 9 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 490e11266b4..8df34d088dd 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1083,6 +1083,8 @@ DEFINE_Bool(exit_on_exception, "false");
DEFINE_Bool(enable_scan_thread_low_thread_priority, "false");
DEFINE_mInt64(threshold_rows_to_estimate_sparse_column, "1000");
+
+DEFINE_mInt32(variant_max_merged_tablet_schema_size, "2048");
// clang-format off
#ifdef BE_TEST
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9681a9c952c..40acf4317d0 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1149,6 +1149,9 @@ DECLARE_mInt32(tablet_schema_cache_recycle_interval);
// typically employed to ensure CPU scheduling for write operations.
DECLARE_mBool(enable_scan_thread_low_thread_priority);
+// The max columns size for a tablet schema
+DECLARE_mInt32(variant_max_merged_tablet_schema_size);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 23f44aa60e6..1a9b542cfa2 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -538,7 +538,7 @@ Status DeltaWriter::commit_txn(const PSlaveTabletNodes&
slave_tablet_nodes,
// _tabelt->tablet_schema: A(bigint), B(double)
// => update_schema: A(bigint), B(double), C(int), D(int)
RowsetWriterContext& rw_ctx = _rowset_writer->mutable_context();
- _tablet->update_by_least_common_schema(rw_ctx.tablet_schema);
+
RETURN_IF_ERROR(_tablet->update_by_least_common_schema(rw_ctx.tablet_schema));
}
Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id,
_tablet, _req.txn_id,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index b6164f9ba17..9a13a81b754 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2652,14 +2652,16 @@ void Tablet::update_max_version_schema(const
TabletSchemaSPtr& tablet_schema) {
}
}
-void Tablet::update_by_least_common_schema(const TabletSchemaSPtr&
update_schema) {
+Status Tablet::update_by_least_common_schema(const TabletSchemaSPtr&
update_schema) {
std::lock_guard wrlock(_meta_lock);
auto final_schema = std::make_shared<TabletSchema>();
CHECK(_max_version_schema->schema_version() >=
update_schema->schema_version());
- vectorized::schema_util::get_least_common_schema(
- {_max_version_schema, update_schema}, final_schema);
+ bool check_max_schema_size = true;
+ RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
+ {_max_version_schema, update_schema}, final_schema,
check_max_schema_size));
_max_version_schema = final_schema;
VLOG_DEBUG << "dump updated tablet schema: " <<
final_schema->dump_structure();
+ return Status::OK();
}
// fetch value by row column
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index c84bc3b9d63..9125c4a954e 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -506,7 +506,7 @@ public:
void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);
- void update_by_least_common_schema(const TabletSchemaSPtr&
least_common_schema);
+ Status update_by_least_common_schema(const TabletSchemaSPtr&
least_common_schema);
void set_skip_compaction(bool skip,
CompactionType compaction_type =
CompactionType::CUMULATIVE_COMPACTION,
diff --git a/be/src/vec/common/schema_util.cpp
b/be/src/vec/common/schema_util.cpp
index 301fd01df30..ef11e89b62f 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -297,8 +297,8 @@ void update_least_common_schema(const
std::vector<TabletSchemaSPtr>& schemas,
}
}
-void get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
- TabletSchemaSPtr& common_schema) {
+Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+ TabletSchemaSPtr& common_schema, bool
check_schema_size) {
// Pick tablet schema with max schema version
const TabletSchemaSPtr base_schema =
*std::max_element(schemas.cbegin(), schemas.cend(),
@@ -323,6 +323,12 @@ void get_least_common_schema(const
std::vector<TabletSchemaSPtr>& schemas,
for (int32_t unique_id : variant_column_unique_id) {
update_least_common_schema(schemas, common_schema, unique_id);
}
+ if (check_schema_size &&
+ common_schema->columns().size() >
config::variant_max_merged_tablet_schema_size) {
+ return Status::DataQualityError("Reached max column size limit {}",
+
config::variant_max_merged_tablet_schema_size);
+ }
+ return Status::OK();
}
Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index d2039829c9d..36dd121b7ba 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -94,8 +94,8 @@ void encode_variant_sparse_subcolumns(Block& block, const
std::vector<int>& vari
// Pick the tablet schema with the highest schema version as the reference.
// Then update all variant columns to there least common types.
// Return the final merged schema as common schema
-void get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
- TabletSchemaSPtr& common_schema);
+Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+ TabletSchemaSPtr& common_schema, bool
check_schema_size = false);
// Get least common types for extracted columns which has Path info,
// with a speicified variant column's unique id
diff --git a/regression-test/suites/variant_p0/column_size_limit.groovy
b/regression-test/suites/variant_p0/column_size_limit.groovy
new file mode 100644
index 00000000000..8c8c9051585
--- /dev/null
+++ b/regression-test/suites/variant_p0/column_size_limit.groovy
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("regression_test_variant_column_limit"){
+ def set_be_config = { key, value ->
+ String backend_id;
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ", err="
+ err)
+ }
+ def table_name = "var_column_limit"
+ set_be_config.call("variant_max_merged_tablet_schema_size", "5")
+ sql "DROP TABLE IF EXISTS ${table_name}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ k bigint,
+ v variant
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS 1
+ properties("replication_num" = "1", "disable_auto_compaction" =
"false");
+ """
+ try {
+ sql """insert into ${table_name} values (1, '{"a" : 1, "b" : 2, "c" :
3, "d" : 4, "e" : 5}')"""
+ } catch(Exception ex) {
+ logger.info("""INSERT INTO ${table_name} failed: """ + ex)
+ }
+ sql """insert into ${table_name} values (1, '{"a" : 1, "b" : 2, "c" :
3}')"""
+
+ set_be_config.call("variant_max_merged_tablet_schema_size", "1024")
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]