This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new caf6efd9363 [Pick](branch-2.0) pick two PRs to fix memtable shrink 
#28536 #28660 (#28671)
caf6efd9363 is described below

commit caf6efd936351187f54828bf7d834dce005cc8bb
Author: lihangyu <[email protected]>
AuthorDate: Wed Dec 20 11:47:49 2023 +0800

    [Pick](branch-2.0) pick two PRs to fix memtable shrink #28536 #28660 
(#28671)
    
    * [Fix](memtable) fix `shrink_memtable_by_agg` should also update 
`_row_in_blocks` (#28536)
    * [Fix](memtable) fix `shrink_memtable_by_agg` without duplicated keys 
(#28660)
---
 be/src/common/config.cpp                           |   2 +-
 be/src/common/config.h                             |   2 +-
 be/src/olap/memtable.cpp                           |   7 +-
 .../test_insert_with_aggregation_memtable.out      |  17 +++
 .../test_insert_with_aggregation_memtable.groovy   | 141 +++++++++++++++++++++
 5 files changed, 163 insertions(+), 6 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index dafc613d77c..40f5a2b6b66 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1030,7 +1030,7 @@ DEFINE_mInt32(s3_write_buffer_whole_size, "524288000");
 DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
 
 //disable shrink memory by default
-DEFINE_Bool(enable_shrink_memory, "false");
+DEFINE_mBool(enable_shrink_memory, "false");
 DEFINE_mInt32(schema_cache_capacity, "1024");
 DEFINE_mInt32(schema_cache_sweep_time_sec, "100");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 65b9de8f12e..7d7119ecfc1 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1067,7 +1067,7 @@ DECLARE_mInt32(s3_write_buffer_whole_size);
 // the max number of cached file handle for block segemnt
 DECLARE_mInt64(file_cache_max_file_reader_cache_size);
 //enable shrink memory
-DECLARE_Bool(enable_shrink_memory);
+DECLARE_mBool(enable_shrink_memory);
 // enable cache for high concurrent point query work load
 DECLARE_mInt32(schema_cache_capacity);
 DECLARE_mInt32(schema_cache_sweep_time_sec);
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 880609b053f..ea3c0fcf6b6 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -425,6 +425,8 @@ void MemTable::_aggregate() {
         _output_mutable_block =
                 
vectorized::MutableBlock::build_mutable_block(empty_input_block.get());
         _output_mutable_block.clear_column_data();
+        _row_in_blocks = temp_row_in_blocks;
+        _last_sorted_pos = _row_in_blocks.size();
     }
 }
 
@@ -434,10 +436,7 @@ void MemTable::shrink_memtable_by_agg() {
         return;
     }
     size_t same_keys_num = _sort();
-    if (same_keys_num == 0) {
-        vectorized::Block in_block = _input_mutable_block.to_block();
-        _put_into_output(in_block);
-    } else {
+    if (same_keys_num != 0) {
         _aggregate<false>();
     }
 }
diff --git 
a/regression-test/data/load_p0/insert/test_insert_with_aggregation_memtable.out 
b/regression-test/data/load_p0/insert/test_insert_with_aggregation_memtable.out
new file mode 100644
index 00000000000..030677444b3
--- /dev/null
+++ 
b/regression-test/data/load_p0/insert/test_insert_with_aggregation_memtable.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+10000  2017-10-01      北京      20      0       2017-10-01T07:00        35      
10      2
+10001  2017-10-01      北京      30      1       2017-10-01T17:05:45     2       
22      22
+10002  2017-10-02      上海      20      1       2017-10-02T12:59:12     200     
5       5
+10003  2017-10-02      广州      32      0       2017-10-02T11:20        30      
11      11
+10004  2017-10-01      深圳      35      0       2017-10-01T10:00:15     100     
3       3
+10004  2017-10-03      深圳      35      0       2017-10-03T10:20:22     11      
6       6
+
+-- !sql --
+10000  2017-10-01      北京      20      0       2017-10-01T07:00        35      
10      2
+10001  2017-10-01      北京      30      1       2017-10-01T17:05:45     2       
22      22
+10002  2017-10-02      上海      20      1       2017-10-02T12:59:12     200     
5       5
+10003  2017-10-02      广州      32      0       2017-10-02T11:20        30      
11      11
+10004  2017-10-01      深圳      35      0       2017-10-01T10:00:15     100     
3       3
+10004  2017-10-03      深圳      35      0       2017-10-03T10:20:22     11      
6       6
+
diff --git 
a/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy
 
b/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy
new file mode 100644
index 00000000000..bbfca8fa5f6
--- /dev/null
+++ 
b/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_insert_with_aggregation_memtable", "nonConcurrent") {
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    def backendId_to_params = [string:[:]]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    def set_be_param = { paramName, paramValue ->
+        // for eache be node, set paramName=paramValue
+        for (String id in backendId_to_backendIP.keySet()) {
+                   def beIp = backendId_to_backendIP.get(id)
+                   def bePort = backendId_to_backendHttpPort.get(id)
+                   def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
paramValue))
+                   assertTrue(out.contains("OK"))
+           }
+    }
+
+    def reset_be_param = { paramName ->
+        // for eache be node, reset paramName to default
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            def original_value = backendId_to_params.get(id).get(paramName)
+            def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
original_value))
+            assertTrue(out.contains("OK"))
+        }
+    }
+
+    def get_be_param = { paramName ->
+        // for eache be node, get param value by default
+        def paramValue = ""
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            // get the config value from be
+            def (code, out, err) = curl("GET", 
String.format("http://%s:%s/api/show_config?conf_item=%s";, beIp, bePort, 
paramName))
+            assertTrue(code == 0)
+            assertTrue(out.contains(paramName))
+            // parsing
+            def resultList = parseJson(out)[0]
+            assertTrue(resultList.size() == 4)
+            // get original value
+            paramValue = resultList[2]
+            backendId_to_params.get(id, [:]).put(paramName, paramValue)
+        }
+    }
+    
+    def testTable = "test_memtable_enable_with_aggregate"
+    sql """ DROP TABLE IF EXISTS ${testTable}"""
+    def testTableDDL = """
+        create table ${testTable} 
+        (
+            `id` LARGEINT NOT NULL,
+            `k1` DATE NOT NULL,
+            `k2` VARCHAR(20),
+            `k3` SMALLINT,
+            `k4` TINYINT,
+            `k5` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00",
+            `k6` BIGINT SUM DEFAULT "0",
+            `k7` INT MAX DEFAULT "0",
+            `k8` INT MIN DEFAULT "99999"
+        )
+        AGGREGATE KEY(`id`, `k1`, `k2`, `k3`, `k4`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+        );
+    """
+    def insert_sql = """
+        insert into ${testTable} values
+        (10000,"2017-10-01","北京",20,0,"2017-10-01 06:00:00",20,10,10),
+        (10000,"2017-10-01","北京",20,0,"2017-10-01 07:00:00",15,2,2),
+        (10001,"2017-10-01","北京",30,1,"2017-10-01 17:05:45",2,22,22),
+        (10002,"2017-10-02","上海",20,1,"2017-10-02 12:59:12",200,5,5),
+        (10003,"2017-10-02","广州",32,0,"2017-10-02 11:20:00",30,11,11),
+        (10004,"2017-10-01","深圳",35,0,"2017-10-01 10:00:15",100,3,3),
+        (10004,"2017-10-03","深圳",35,0,"2017-10-03 10:20:22",11,6,6);
+    """
+  
+    sql testTableDDL
+    sql "sync"
+    sql insert_sql
+    sql "sync"
+    qt_sql "select * from ${testTable} order by id asc"
+    
+    // store the original value
+    get_be_param("enable_shrink_memory")
+    get_be_param("write_buffer_size_for_agg")
+
+    // the original value is false
+    set_be_param("enable_shrink_memory", "true")
+    // the original value is 400MB
+    set_be_param("write_buffer_size_for_agg", "512") // change it to 0.5KB
+    sql """ DROP TABLE IF EXISTS ${testTable}"""
+    sql testTableDDL
+    sql "sync"
+    sql insert_sql
+    sql "sync"
+    qt_sql "select * from ${testTable} order by id asc"
+
+    // test with mv
+    def table_name = "agg_shrink"
+    sql "DROP TABLE IF EXISTS ${table_name}"
+    sql """
+        CREATE TABLE IF NOT EXISTS ${table_name} (
+            k bigint,
+            v text 
+        )
+        DUPLICATE KEY(`k`)
+        DISTRIBUTED BY HASH(k) BUCKETS 4
+        properties("replication_num" = "1");
+    """
+    set_be_param("write_buffer_size_for_agg", "10240") // change it to 10KB
+    sql """INSERT INTO ${table_name} SELECT *, '{"k1":1, "k2": "hello world", 
"k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}' FROM numbers("number" = 
"4096")"""
+    sql """INSERT INTO ${table_name} SELECT k, v from ${table_name}"""
+    sql """INSERT INTO ${table_name} SELECT k, v from ${table_name}"""
+    createMV("""create materialized view var_cnt as select k, count(k) from 
${table_name} group by k""")    
+    sql """INSERT INTO ${table_name} SELECT k, v from ${table_name} limit 
8101"""
+    // insert with no duplicate
+    sql """INSERT INTO ${table_name} SELECT *, '{"k1":1, "k2": "hello world", 
"k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}' FROM numbers("number" = 
"4096"); """
+
+    reset_be_param("enable_shrink_memory")
+    reset_be_param("write_buffer_size_for_agg")
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to