This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch branch-2.0-var
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0-var by this push:
     new ce2db21b393 [Improvement](variant) limit the column size on tablet 
schema (#27399)
ce2db21b393 is described below

commit ce2db21b393d745a0c1060c34de89fe998e1f90b
Author: lihangyu <[email protected]>
AuthorDate: Wed Nov 22 18:31:27 2023 +0800

    [Improvement](variant) limit the column size on tablet schema (#27399)
---
 be/src/common/config.cpp                           |  2 +
 be/src/common/config.h                             |  3 ++
 be/src/olap/delta_writer.cpp                       |  2 +-
 be/src/olap/tablet.cpp                             |  8 ++--
 be/src/olap/tablet.h                               |  2 +-
 be/src/vec/common/schema_util.cpp                  | 10 ++++-
 be/src/vec/common/schema_util.h                    |  4 +-
 .../suites/variant_p0/column_size_limit.groovy     | 49 ++++++++++++++++++++++
 8 files changed, 71 insertions(+), 9 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 490e11266b4..8df34d088dd 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1083,6 +1083,8 @@ DEFINE_Bool(exit_on_exception, "false");
 DEFINE_Bool(enable_scan_thread_low_thread_priority, "false");
 
 DEFINE_mInt64(threshold_rows_to_estimate_sparse_column, "1000");
+
+DEFINE_mInt32(variant_max_merged_tablet_schema_size, "2048");
 // clang-format off
 
 #ifdef BE_TEST
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9681a9c952c..40acf4317d0 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1149,6 +1149,9 @@ DECLARE_mInt32(tablet_schema_cache_recycle_interval);
 // typically employed to ensure CPU scheduling for write operations.
 DECLARE_mBool(enable_scan_thread_low_thread_priority);
 
+// The max columns size for a tablet schema
+DECLARE_mInt32(variant_max_merged_tablet_schema_size);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 23f44aa60e6..1a9b542cfa2 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -538,7 +538,7 @@ Status DeltaWriter::commit_txn(const PSlaveTabletNodes& 
slave_tablet_nodes,
         // _tabelt->tablet_schema:  A(bigint), B(double)
         //  => update_schema:       A(bigint), B(double), C(int), D(int)
         RowsetWriterContext& rw_ctx = _rowset_writer->mutable_context();
-        _tablet->update_by_least_common_schema(rw_ctx.tablet_schema);
+        
RETURN_IF_ERROR(_tablet->update_by_least_common_schema(rw_ctx.tablet_schema));
     }
 
     Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, 
_tablet, _req.txn_id,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index b6164f9ba17..9a13a81b754 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2652,14 +2652,16 @@ void Tablet::update_max_version_schema(const 
TabletSchemaSPtr& tablet_schema) {
     }
 }
 
-void Tablet::update_by_least_common_schema(const TabletSchemaSPtr& 
update_schema) {
+Status Tablet::update_by_least_common_schema(const TabletSchemaSPtr& 
update_schema) {
     std::lock_guard wrlock(_meta_lock);
     auto final_schema = std::make_shared<TabletSchema>();
     CHECK(_max_version_schema->schema_version() >= 
update_schema->schema_version());
-    vectorized::schema_util::get_least_common_schema(
-            {_max_version_schema, update_schema}, final_schema);
+    bool check_max_schema_size = true;
+    RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
+            {_max_version_schema, update_schema}, final_schema, 
check_max_schema_size));
     _max_version_schema = final_schema;
     VLOG_DEBUG << "dump updated tablet schema: " << 
final_schema->dump_structure();
+    return Status::OK();
 }
 
 // fetch value by row column
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index c84bc3b9d63..9125c4a954e 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -506,7 +506,7 @@ public:
 
     void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);
 
-    void update_by_least_common_schema(const TabletSchemaSPtr& 
least_common_schema);
+    Status update_by_least_common_schema(const TabletSchemaSPtr& 
least_common_schema);
 
     void set_skip_compaction(bool skip,
                              CompactionType compaction_type = 
CompactionType::CUMULATIVE_COMPACTION,
diff --git a/be/src/vec/common/schema_util.cpp 
b/be/src/vec/common/schema_util.cpp
index 301fd01df30..ef11e89b62f 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -297,8 +297,8 @@ void update_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
     }
 }
 
-void get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
-                             TabletSchemaSPtr& common_schema) {
+Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+                               TabletSchemaSPtr& common_schema, bool 
check_schema_size) {
     // Pick tablet schema with max schema version
     const TabletSchemaSPtr base_schema =
             *std::max_element(schemas.cbegin(), schemas.cend(),
@@ -323,6 +323,12 @@ void get_least_common_schema(const 
std::vector<TabletSchemaSPtr>& schemas,
     for (int32_t unique_id : variant_column_unique_id) {
         update_least_common_schema(schemas, common_schema, unique_id);
     }
+    if (check_schema_size &&
+        common_schema->columns().size() > 
config::variant_max_merged_tablet_schema_size) {
+        return Status::DataQualityError("Reached max column size limit {}",
+                                        
config::variant_max_merged_tablet_schema_size);
+    }
+    return Status::OK();
 }
 
 Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index d2039829c9d..36dd121b7ba 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -94,8 +94,8 @@ void encode_variant_sparse_subcolumns(Block& block, const 
std::vector<int>& vari
 // Pick the tablet schema with the highest schema version as the reference.
 // Then update all variant columns to there least common types.
 // Return the final merged schema as common schema
-void get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
-                             TabletSchemaSPtr& common_schema);
+Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+                               TabletSchemaSPtr& common_schema, bool 
check_schema_size = false);
 
 // Get least common types for extracted columns which has Path info,
 // with a speicified variant column's unique id
diff --git a/regression-test/suites/variant_p0/column_size_limit.groovy 
b/regression-test/suites/variant_p0/column_size_limit.groovy
new file mode 100644
index 00000000000..8c8c9051585
--- /dev/null
+++ b/regression-test/suites/variant_p0/column_size_limit.groovy
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("regression_test_variant_column_limit"){
+    def set_be_config = { key, value ->
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+        logger.info("update config: code=" + code + ", out=" + out + ", err=" 
+ err)
+    }
+    def table_name = "var_column_limit"
+    set_be_config.call("variant_max_merged_tablet_schema_size", "5")
+    sql "DROP TABLE IF EXISTS ${table_name}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${table_name} (
+            k bigint,
+            v variant
+        )
+        DUPLICATE KEY(`k`)
+        DISTRIBUTED BY HASH(k) BUCKETS 1
+        properties("replication_num" = "1", "disable_auto_compaction" = 
"false");
+    """
+    try {
+        sql """insert into ${table_name} values (1, '{"a" : 1, "b" : 2, "c" : 
3, "d" : 4, "e" : 5}')"""
+    } catch(Exception ex) {
+        logger.info("""INSERT INTO ${table_name} failed: """ + ex)
+    }
+    sql """insert into ${table_name} values (1, '{"a" : 1, "b" : 2, "c" : 
3}')"""
+
+    set_be_config.call("variant_max_merged_tablet_schema_size", "1024")
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to