This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new dbd1da4825a [FIX](agg) fix vertical_compaction_reader for agg table
with array/map type #33130 (#33216)
dbd1da4825a is described below
commit dbd1da4825afb10a9ad871d1e3d595936408c08e
Author: amory <[email protected]>
AuthorDate: Wed Apr 3 20:06:23 2024 +0800
[FIX](agg) fix vertical_compaction_reader for agg table with array/map type
#33130 (#33216)
---
be/src/vec/columns/column.h | 3 +
be/src/vec/columns/column_array.h | 1 +
be/src/vec/columns/column_const.h | 2 +
be/src/vec/columns/column_map.h | 1 +
be/src/vec/columns/column_nullable.h | 2 +-
be/src/vec/columns/column_string.h | 4 +-
be/src/vec/olap/block_reader.cpp | 17 +--
be/src/vec/olap/vertical_block_reader.cpp | 15 +-
be/src/vec/olap/vertical_block_reader.h | 2 +-
.../test_compaction_agg_keys_with_array_map.out | 13 ++
.../test_compaction_agg_keys_with_array_map.groovy | 153 +++++++++++++++++++++
11 files changed, 184 insertions(+), 29 deletions(-)
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 702c974967b..96df039f653 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -659,6 +659,9 @@ public:
/// Implies is_fixed_and_contiguous.
virtual bool is_numeric() const { return false; }
+ // Column is ColumnString/ColumnArray/ColumnMap or other variable length
column at every row
+ virtual bool is_variable_length() const { return false; }
+
virtual bool is_column_string() const { return false; }
virtual bool is_column_decimal() const { return false; }
diff --git a/be/src/vec/columns/column_array.h
b/be/src/vec/columns/column_array.h
index f66f437b968..0dc24c2d86d 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -127,6 +127,7 @@ public:
const char* get_family_name() const override { return "Array"; }
bool is_column_array() const override { return true; }
bool can_be_inside_nullable() const override { return true; }
+ bool is_variable_length() const override { return true; }
MutableColumnPtr clone_resized(size_t size) const override;
size_t size() const override;
void resize(size_t n) override;
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index 0f414cca3f8..7dfda996808 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -75,6 +75,8 @@ public:
ColumnPtr convert_to_full_column_if_const() const override { return
convert_to_full_column(); }
+ bool is_variable_length() const override { return
data->is_variable_length(); }
+
ColumnPtr remove_low_cardinality() const;
std::string get_name() const override { return "Const(" + data->get_name()
+ ")"; }
diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h
index a1eacfcfb8e..fb5c6e535c3 100644
--- a/be/src/vec/columns/column_map.h
+++ b/be/src/vec/columns/column_map.h
@@ -93,6 +93,7 @@ public:
}
MutableColumnPtr clone_resized(size_t size) const override;
+ bool is_variable_length() const override { return true; }
bool can_be_inside_nullable() const override { return true; }
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index d183ce03853..a2356cffb26 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -94,7 +94,7 @@ public:
}
MutableColumnPtr get_shrinked_column() override;
-
+ bool is_variable_length() const override { return
nested_column->is_variable_length(); }
const char* get_family_name() const override { return "Nullable"; }
std::string get_name() const override { return "Nullable(" +
nested_column->get_name() + ")"; }
MutableColumnPtr clone_resized(size_t size) const override;
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index 941ec573759..03d81ae725b 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -107,7 +107,7 @@ private:
public:
void sanity_check() const;
-
+ bool is_variable_length() const override { return true; }
const char* get_family_name() const override { return "String"; }
size_t size() const override { return offsets.size(); }
@@ -557,6 +557,8 @@ public:
auto data = r.get_data_at(row);
if (!self_row) {
+ // self_row == 0 means we first call replace_column_data() with
batch column data. so we
+ // should clean last batch column data.
chars.clear();
offsets[self_row] = data.size;
} else {
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 541954ab3bf..0778a6b8eaa 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -182,22 +182,7 @@ void BlockReader::_init_agg_state(const ReaderParams&
read_params) {
_agg_places.push_back(place);
// calculate `_has_variable_length_tag` tag. like string, array, map
- _stored_has_variable_length_tag[idx] =
- _stored_data_columns[idx]->is_column_string() ||
- (_stored_data_columns[idx]->is_nullable() &&
-
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
- ->get_nested_column_ptr()
- ->is_column_string()) ||
- _stored_data_columns[idx]->is_column_array() ||
- (_stored_data_columns[idx]->is_nullable() &&
-
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
- ->get_nested_column_ptr()
- ->is_column_array()) ||
- _stored_data_columns[idx]->is_column_map() ||
- (_stored_data_columns[idx]->is_nullable() &&
-
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
- ->get_nested_column_ptr()
- ->is_column_map());
+ _stored_has_variable_length_tag[idx] =
_stored_data_columns[idx]->is_variable_length();
}
}
diff --git a/be/src/vec/olap/vertical_block_reader.cpp
b/be/src/vec/olap/vertical_block_reader.cpp
index 5a203e913ad..f1f4f3a3895 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -171,7 +171,7 @@ void VerticalBlockReader::_init_agg_state(const
ReaderParams& read_params) {
_next_row.block->create_same_struct_block(_reader_context.batch_size)->mutate_columns();
_stored_has_null_tag.resize(_stored_data_columns.size());
- _stored_has_string_tag.resize(_stored_data_columns.size());
+ _stored_has_variable_length_tag.resize(_stored_data_columns.size());
auto& tablet_schema = *_tablet_schema;
for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
@@ -188,13 +188,8 @@ void VerticalBlockReader::_init_agg_state(const
ReaderParams& read_params) {
});
_agg_places.push_back(place);
- // calculate `has_string` tag.
- _stored_has_string_tag[idx] =
- _stored_data_columns[idx]->is_column_string() ||
- (_stored_data_columns[idx]->is_nullable() &&
-
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
- ->get_nested_column_ptr()
- ->is_column_string());
+ // calculate `_has_variable_length_tag` tag. like string, array, map
+ _stored_has_variable_length_tag[idx] =
_stored_data_columns[idx]->is_variable_length();
}
}
@@ -319,8 +314,8 @@ size_t VerticalBlockReader::_copy_agg_data() {
}
for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
auto& dst_column = _stored_data_columns[idx];
- if (_stored_has_string_tag[idx]) {
- //string type should replace ordered
+ if (_stored_has_variable_length_tag[idx]) {
+ //variable length type should replace ordered
for (size_t i = 0; i < copy_size; i++) {
auto& ref = _stored_row_ref[i];
dst_column->replace_column_data(*ref.block->get_by_position(idx).column,
diff --git a/be/src/vec/olap/vertical_block_reader.h
b/be/src/vec/olap/vertical_block_reader.h
index 4c85c1da770..8e1af9db2a8 100644
--- a/be/src/vec/olap/vertical_block_reader.h
+++ b/be/src/vec/olap/vertical_block_reader.h
@@ -126,7 +126,7 @@ private:
std::vector<IteratorRowRef> _stored_row_ref;
std::vector<bool> _stored_has_null_tag;
- std::vector<bool> _stored_has_string_tag;
+ std::vector<bool> _stored_has_variable_length_tag;
phmap::flat_hash_map<const Block*, std::vector<std::pair<int, int>>>
_temp_ref_map;
diff --git
a/regression-test/data/compaction/test_compaction_agg_keys_with_array_map.out
b/regression-test/data/compaction/test_compaction_agg_keys_with_array_map.out
new file mode 100644
index 00000000000..5c5ee3ed329
--- /dev/null
+++
b/regression-test/data/compaction/test_compaction_agg_keys_with_array_map.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_default --
+1 2017-10-01 2017-10-01 2017-10-01T11:11:11.110
2017-10-01T11:11:11.110111 Beijing 10 1 ["a", "b", "c"] {"b":1,
"c":2}
+2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110
2017-10-01T11:11:11.110111 Beijing 10 1 ["amory", "doris",
"2024-04-29"] {"c":2}
+3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110
2017-10-01T11:11:11.110111 Beijing 10 1 \N \N
+4 2017-10-01 2017-10-01 2017-10-01T11:11:11.110
2017-10-01T11:11:11.110111 Beijing 10 1 [null, "sdf"] \N
+
+-- !select_default2 --
+1 2017-10-01 2017-10-01 2017-10-01T11:11:11.110
2017-10-01T11:11:11.110111 Beijing 10 1 ["a", "b", "c"] {"b":1,
"c":2}
+2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110
2017-10-01T11:11:11.110111 Beijing 10 1 ["amory", "doris",
"2024-04-29"] {"c":2}
+3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110
2017-10-01T11:11:11.110111 Beijing 10 1 \N \N
+4 2017-10-01 2017-10-01 2017-10-01T11:11:11.110
2017-10-01T11:11:11.110111 Beijing 10 1 [null, "sdf"] \N
+
diff --git
a/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
b/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
new file mode 100644
index 00000000000..1408d3ec9bb
--- /dev/null
+++
b/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_compaction_agg_keys_with_array_map") {
+ def tableName = "compaction_agg_keys_regression_test_complex"
+
+ try {
+ String backend_id;
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+ backend_id = backendId_to_backendIP.keySet()[0]
+
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ boolean disableAutoCompaction = true
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "disable_auto_compaction") {
+ disableAutoCompaction = Boolean.parseBoolean(((List<String>)
ele)[2])
+ }
+ }
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `user_id` LARGEINT NOT NULL COMMENT "用户id",
+ `date` DATE NOT NULL COMMENT "数据灌入日期时间",
+ `datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间",
+ `datetimev2_1` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间",
+ `datetimev2_2` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间",
+ `city` VARCHAR(20) COMMENT "用户所在城市",
+ `age` SMALLINT COMMENT "用户年龄",
+ `sex` TINYINT COMMENT "用户性别",
+ `array_col` ARRAY<STRING> REPLACE NULL COMMENT "array column",
+ `map_col` MAP<STRING, INT> REPLACE NULL COMMENT "map column")
+ AGGREGATE KEY(`user_id`, `date`, `datev2`, `datetimev2_1`,
`datetimev2_2`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`)
+ PROPERTIES ( "replication_num" = "1" );
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES
+ (1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000',
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['a', 'b'], map('a', 1));
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES
+ (1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000',
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['a', 'b', 'c'], map('b', 1,
'c', 2));
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES
+ (2, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000',
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['amory', 'doris', 'commiter'],
map('b', 1));
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES
+ (2, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000',
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['amory', 'doris',
'2024-04-29'], map('c', 2));
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES
+ (3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000',
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['e', 'f', 'g', 'd'], map());
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES
+ (3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000',
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['e', 'f', 'g', 'd'], map('a',
1, 'b', 2));
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES
+ (3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000',
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, NULL, NULL);
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES
+ (4, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000',
'2017-10-01 11:11:11.110111', 'Beijing', 10, 1, [NULL, 'sdf'], NULL);
+ """
+
+ qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
+
+
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,QueryHits,VersionCount,PathHash,MetaUrl,CompactionStatus
+ def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
+
+ // trigger compactions for all tablets in ${tableName}
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ if (compactJson.status.toLowerCase() == "fail") {
+ assertEquals(disableAutoCompaction, false)
+ logger.info("Compaction was done automatically!")
+ }
+ if (disableAutoCompaction) {
+ assertEquals("success", compactJson.status.toLowerCase())
+ }
+ }
+
+ // wait for all compactions done
+ for (def tablet in tablets) {
+ boolean running = true
+ do {
+ Thread.sleep(1000)
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+
+ int rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def compactionStatusUrlIndex = 18
+
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
+
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
+ }
+ assert (rowCount < 8)
+ qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id;
"""
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${tableName}")
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]