This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 1d39e16eda00763d4dea7bea5de6c24d64c7079f
Author: Pxl <[email protected]>
AuthorDate: Sun Feb 4 11:27:07 2024 +0800

    [Bug](compaction) pass arena to function->add_batch_range (#30709)
---
 be/src/vec/olap/vertical_block_reader.cpp          |  13 +-
 be/src/vec/olap/vertical_block_reader.h            |   1 +
 .../test_vertical_compaction_agg_state.out         |  12 ++
 .../test_vertical_compaction_agg_state.groovy      | 131 +++++++++++++++++++++
 4 files changed, 152 insertions(+), 5 deletions(-)

diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index 5f2d856fe79..ce93ef6fedc 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -74,7 +74,7 @@ Status VerticalBlockReader::_get_segment_iterators(const 
ReaderParams& read_para
         return res;
     }
     _reader_context.is_vertical_compaction = true;
-    for (auto& rs_split : read_params.rs_splits) {
+    for (const auto& rs_split : read_params.rs_splits) {
         // segment iterator will be inited here
         // In vertical compaction, every group will load segment so we should 
cache
         // segment to avoid tot many s3 head request
@@ -190,7 +190,7 @@ void VerticalBlockReader::_init_agg_state(const 
ReaderParams& read_params) {
         DCHECK(function != nullptr);
         _agg_functions.push_back(function);
         // create aggregate data
-        AggregateDataPtr place = new char[function->size_of_data()];
+        auto* place = new char[function->size_of_data()];
         SAFE_CREATE(function->create(place), {
             _agg_functions.pop_back();
             delete[] place;
@@ -305,11 +305,11 @@ void 
VerticalBlockReader::_update_agg_value(MutableColumns& columns, int begin,
     for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
         AggregateFunctionPtr function = _agg_functions[idx];
         AggregateDataPtr place = _agg_places[idx];
-        auto column_ptr = _stored_data_columns[idx].get();
+        auto* column_ptr = _stored_data_columns[idx].get();
 
         if (begin <= end) {
             function->add_batch_range(begin, end, place, const_cast<const 
IColumn**>(&column_ptr),
-                                      nullptr, _stored_has_null_tag[idx]);
+                                      &_arena, _stored_has_null_tag[idx]);
         }
 
         if (is_close) {
@@ -318,6 +318,9 @@ void VerticalBlockReader::_update_agg_value(MutableColumns& 
columns, int begin,
             function->reset(place);
         }
     }
+    if (is_close) {
+        _arena.clear();
+    }
 }
 
 size_t VerticalBlockReader::_copy_agg_data() {
@@ -339,7 +342,7 @@ size_t VerticalBlockReader::_copy_agg_data() {
         } else {
             for (auto& it : _temp_ref_map) {
                 if (!it.second.empty()) {
-                    auto& src_column = *it.first->get_by_position(idx).column;
+                    const auto& src_column = 
*it.first->get_by_position(idx).column;
                     for (auto& pos : it.second) {
                         dst_column->replace_column_data(src_column, pos.first, 
pos.second);
                     }
diff --git a/be/src/vec/olap/vertical_block_reader.h 
b/be/src/vec/olap/vertical_block_reader.h
index d3c500cbd8e..2c65fd616b1 100644
--- a/be/src/vec/olap/vertical_block_reader.h
+++ b/be/src/vec/olap/vertical_block_reader.h
@@ -107,6 +107,7 @@ private:
     // for agg mode
     std::vector<AggregateFunctionPtr> _agg_functions;
     std::vector<AggregateDataPtr> _agg_places;
+    Arena _arena;
 
     std::vector<int> _normal_columns_idx;
     std::vector<int> _agg_columns_idx;
diff --git 
a/regression-test/data/compaction/test_vertical_compaction_agg_state.out 
b/regression-test/data/compaction/test_vertical_compaction_agg_state.out
new file mode 100644
index 00000000000..62a7a629187
--- /dev/null
+++ b/regression-test/data/compaction/test_vertical_compaction_agg_state.out
@@ -0,0 +1,12 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_default --
+a      ["aa", "a"]
+
+-- !select_default --
+a      ["aaa", "aa", "a"]
+b      ["b"]
+
+-- !select_default --
+a      ["aaa", "aa", "a"]
+b      ["b"]
+
diff --git 
a/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy 
b/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy
new file mode 100644
index 00000000000..9e0f99dd0c3
--- /dev/null
+++ 
b/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_vertical_compaction_agg_state") {
+    def tableName = "vertical_compaction_agg_state_regression_test"
+
+    try {
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+        logger.info("Show config: code=" + code + ", out=" + out + ", err=" + 
err)
+        assertEquals(code, 0)
+        def configList = parseJson(out.trim())
+        assert configList instanceof List
+
+        boolean disableAutoCompaction = true
+        for (Object ele in (List) configList) {
+            assert ele instanceof List<String>
+            if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                disableAutoCompaction = Boolean.parseBoolean(((List<String>) 
ele)[2])
+            }
+        }
+
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql "set enable_agg_state=true"
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+                user_id                         VARCHAR,
+                agg_user_id                     agg_state collect_set(string)
+                )ENGINE=OLAP
+        AGGREGATE  KEY(`user_id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
+        PROPERTIES ( "replication_num" = "1" );
+        """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             ('a',collect_set_state('a'))
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES
+            ('a',collect_set_state('aa'))
+            """
+
+        qt_select_default """ SELECT user_id,collect_set_merge(agg_user_id) 
FROM ${tableName} t group by user_id ORDER BY user_id;"""
+
+        sql """ INSERT INTO ${tableName} VALUES
+             ('b',collect_set_state('b'))
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES
+            ('a',collect_set_state('aaa'))
+            """
+
+        qt_select_default """ SELECT user_id,collect_set_merge(agg_user_id) 
FROM ${tableName} t group by user_id ORDER BY user_id;"""
+
+        def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
+
+        // trigger compactions for all tablets in ${tableName}
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            backend_id = tablet.BackendId
+            (code, out, err) = 
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+            def compactJson = parseJson(out.trim())
+            if (compactJson.status.toLowerCase() == "fail") {
+                assertEquals(disableAutoCompaction, false)
+                logger.info("Compaction was done automatically!")
+            }
+            if (disableAutoCompaction) {
+                assertEquals("success", compactJson.status.toLowerCase())
+            }
+        }
+
+        // wait for all compactions done
+        for (def tablet in tablets) {
+            boolean running = true
+            do {
+                Thread.sleep(1000)
+                String tablet_id = tablet.TabletId
+                backend_id = tablet.BackendId
+                (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                logger.info("Get compaction status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def compactionStatus = parseJson(out.trim())
+                assertEquals("success", compactionStatus.status.toLowerCase())
+                running = compactionStatus.run_status
+            } while (running)
+        }
+
+        def replicaNum = get_table_replica_num(tableName)
+        logger.info("get table replica num: " + replicaNum)
+        int rowCount = 0
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            (code, out, err) = curl("GET", tablet.CompactionStatus)
+            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
+            assertEquals(code, 0)
+            def tabletJson = parseJson(out.trim())
+            assert tabletJson.rowsets instanceof List
+            for (String rowset in (List<String>) tabletJson.rowsets) {
+                rowCount += Integer.parseInt(rowset.split(" ")[1])
+            }
+        }
+        assert (rowCount < 8 * replicaNum)
+        qt_select_default """ SELECT user_id,collect_set_merge(agg_user_id) 
FROM ${tableName} t group by user_id ORDER BY user_id;"""
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${tableName}")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to