This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch deduplicate_keys in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5101c2f720ff74c101a475493e571ba86c63ebb2 Author: Hu Shenggang <[email protected]> AuthorDate: Tue May 13 15:45:51 2025 +0800 [feat](map) remove duplicated keys in ColumnMap --- be/src/vec/columns/column_map.cpp | 98 ++++++++++ be/src/vec/columns/column_map.h | 15 ++ be/src/vec/exec/format/orc/vorc_reader.cpp | 7 +- .../exec/format/parquet/vparquet_column_reader.cpp | 1 + be/src/vec/exec/jni_connector.cpp | 5 +- be/src/vec/functions/function_cast.h | 8 +- be/src/vec/functions/function_map.cpp | 12 +- be/src/vec/sink/vtablet_block_convertor.cpp | 5 +- be/test/vec/core/column_map_test.cpp | 212 +++++++++++++++++++++ .../cast_function/test_cast_map_function.out | Bin 782 -> 743 bytes .../nereids_function_p0/scalar_function/Map.out | Bin 25901 -> 25865 bytes .../data/nereids_p0/datatype/test_map.out | Bin 746 -> 1251 bytes .../suites/nereids_p0/datatype/test_map.groovy | 5 + 13 files changed, 357 insertions(+), 11 deletions(-) diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index 7f60a9be46a..33392ebcd07 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -24,12 +24,20 @@ #include <algorithm> #include <boost/iterator/iterator_facade.hpp> +#include <cstddef> #include <limits> #include <memory> +#include <unordered_map> #include <vector> #include "common/status.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" #include "vec/common/arena.h" +#include "vec/common/assert_cast.h" +#include "vec/common/custom_allocator.h" +#include "vec/common/hash_table/phmap_fwd_decl.h" +#include "vec/common/string_ref.h" #include "vec/common/typeid_cast.h" #include "vec/common/unaligned.h" @@ -493,6 +501,96 @@ ColumnPtr ColumnMap::replicate(const Offsets& offsets) const { return res; } +Status ColumnMap::deduplicate_keys(bool recursive) { + const auto inner_rows = keys_column->size(); + const auto rows = offsets_column->size(); + + if (recursive) { + auto values_column_ = values_column; + if (values_column_->is_nullable()) { + values_column_ = (assert_cast<ColumnNullable&>(*values_column)).get_nested_column_ptr(); + } + + if (const auto* values_map = check_and_get_column<ColumnMap>(values_column_.get())) { + RETURN_IF_ERROR((const_cast<ColumnMap*>(values_map))->deduplicate_keys(recursive)); + } + } + + DorisVector<StringRef> serialized_keys(inner_rows); + + const size_t max_one_row_byte_size = keys_column->get_max_row_byte_size(); + + size_t total_bytes = max_one_row_byte_size * inner_rows; + Arena pool; + + if (total_bytes >= config::pre_serialize_keys_limit_bytes) { + // reach mem limit, don't serialize in batch + const char* begin = nullptr; + for (size_t i = 0; i != inner_rows; ++i) { + serialized_keys[i] = keys_column->serialize_value_into_arena(i, pool, begin); + } + } else { + auto* serialized_key_buffer = reinterpret_cast<uint8_t*>(pool.alloc(total_bytes)); + + for (size_t i = 0; i < inner_rows; ++i) { + serialized_keys[i].data = + reinterpret_cast<char*>(serialized_key_buffer + i * max_one_row_byte_size); + serialized_keys[i].size = 0; + } + + keys_column->serialize_vec(serialized_keys.data(), inner_rows, max_one_row_byte_size); + } + + auto new_offsets = COffsets::create(); + new_offsets->reserve(rows); + auto& new_offsets_data = new_offsets->get_data(); + + IColumn::Filter filter(inner_rows, 1); + auto* offsets = get_offsets().data(); + + Offset64 offset = 0; + bool has_duplicated_key = false; + + for (size_t i = 0; i != rows; ++i) { + const auto count = offsets[i] - offsets[i - 1]; + if (count == 0) { + new_offsets_data.push_back(offset); + continue; + } + + if (count == 1) { + filter[offsets[i - 1]] = 1; + ++offset; + new_offsets_data.push_back(offset); + continue; + } + + phmap::flat_hash_map<StringRef, size_t> keys_map; + keys_map.reserve(count); + for (size_t j = offsets[i - 1]; j < offsets[i]; ++j) { + const auto& serialized_key = serialized_keys[j]; + if (keys_map.find(serialized_key) == keys_map.end()) { + ++offset; + } else { + filter[keys_map[serialized_key]] = 0; + has_duplicated_key = true; + } + + filter[j] = 1; + keys_map[serialized_key] = j; + } + new_offsets_data.push_back(offset); + } + + if (has_duplicated_key) { + offsets_column = std::move(new_offsets); + keys_column->filter(filter); + values_column->filter(filter); + } + + return Status::OK(); +} + void ColumnMap::shrink_padding_chars() { keys_column->shrink_padding_chars(); values_column->shrink_padding_chars(); diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h index 84daf5d3277..eed0d0967c5 100644 --- a/be/src/vec/columns/column_map.h +++ b/be/src/vec/columns/column_map.h @@ -193,6 +193,21 @@ public: return get_offsets()[i] - get_offsets()[i - 1]; } + // Remove duplicate key-value pairs from each internal map in the ColumnMap. + // + // For each map stored in the ColumnMap, if multiple entries have the same key + // and identical value, only the **last** such key-value pair is retained; earlier + // duplicates are removed. This ensures that all keys within each map are unique. + // + // Note: This function modifies the internal state of the ColumnMap in-place. + // It is intended to be used after data loading or merging steps where + // redundant key-value pairs may have been introduced. + // + // Example: + // Input map: {{"a", 1}, {"b", 2}, {"a", 3}, {"c", 4}} + // Result: {{"b", 2}, {"a", 4}, {"c", 3}} + Status deduplicate_keys(bool recursive = false); + ColumnPtr convert_column_if_overflow() override { keys_column = keys_column->convert_column_if_overflow(); values_column = values_column->convert_column_if_overflow(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 9298a665f54..3b3c2de918e 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -1769,9 +1769,10 @@ Status OrcReader::_fill_doris_data_column(const std::string& col_name, RETURN_IF_ERROR(_orc_column_to_doris_column<false>(key_col_name, doris_key_column, doris_key_type, orc_key_type, orc_map->keys.get(), element_size)); - return _orc_column_to_doris_column<false>(value_col_name, doris_value_column, - doris_value_type, orc_value_type, - orc_map->elements.get(), element_size); + RETURN_IF_ERROR(_orc_column_to_doris_column<false>(value_col_name, doris_value_column, + doris_value_type, orc_value_type, + orc_map->elements.get(), element_size)); + return doris_map.deduplicate_keys(); } case PrimitiveType::TYPE_STRUCT: { if (orc_column_type->getKind() != orc::TypeKind::STRUCT) { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index e9a9886d026..4753f367afc 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -796,6 +796,7 @@ Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& t // fill offset and null map fill_array_offset(_field_schema, map.get_offsets(), null_map_ptr, _key_reader->get_rep_level(), _key_reader->get_def_level()); + RETURN_IF_ERROR(map.deduplicate_keys()); DCHECK_EQ(key_column->size(), map.get_offsets().back()); return Status::OK(); diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 707fd3dc364..44587bbade0 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -434,8 +434,9 @@ Status JniConnector::_fill_map_column(TableMetaAddress& address, MutableColumnPt RETURN_IF_ERROR(_fill_column(address, key_column, key_type, map_offsets[origin_size + num_rows - 1] - start_offset)); - return _fill_column(address, value_column, value_type, - map_offsets[origin_size + num_rows - 1] - start_offset); + RETURN_IF_ERROR(_fill_column(address, value_column, value_type, + map_offsets[origin_size + num_rows - 1] - start_offset)); + return map.deduplicate_keys(); } Status JniConnector::_fill_struct_column(TableMetaAddress& address, MutableColumnPtr& doris_column, diff --git a/be/src/vec/functions/function_cast.h b/be/src/vec/functions/function_cast.h index 726442f1f67..3223464e740 100644 --- a/be/src/vec/functions/function_cast.h +++ b/be/src/vec/functions/function_cast.h @@ -1927,8 +1927,12 @@ private: converted_columns[i] = block.get_by_position(element_result).column; } - block.get_by_position(result).column = ColumnMap::create( - converted_columns[0], converted_columns[1], from_col_map->get_offsets_ptr()); + auto map_column = + ColumnMap::create(std::move(*converted_columns[0]).mutate(), + std::move(*converted_columns[1]).mutate(), + std::move(*from_col_map->get_offsets_ptr()).mutate()); + RETURN_IF_ERROR(map_column->deduplicate_keys()); + block.get_by_position(result).column = std::move(map_column); return Status::OK(); }; } diff --git a/be/src/vec/functions/function_map.cpp b/be/src/vec/functions/function_map.cpp index dcf3e96fc0b..2981b3f0877 100644 --- a/be/src/vec/functions/function_map.cpp +++ b/be/src/vec/functions/function_map.cpp @@ -127,6 +127,8 @@ public: offset += num_element / 2; result_col_map_offsets[row] = offset; } + + RETURN_IF_ERROR(map_column->deduplicate_keys()); block.replace_by_position(result, std::move(result_col)); return Status::OK(); } @@ -389,9 +391,13 @@ private: result_col_map_offsets->insert_value(result_col_map_keys_data->size()); } - return ColumnMap::create(std::move(result_col_map_keys_data), - std::move(result_col_map_vals_data), - std::move(result_col_map_offsets)); + auto map_column = ColumnMap::create(std::move(result_col_map_keys_data), + std::move(result_col_map_vals_data), + std::move(result_col_map_offsets)); + + // `deduplicate_keys` always return ok + static_cast<void>(map_column->deduplicate_keys()); + return std::move(map_column); } static std::vector<std::string_view> split_pair_by_delim(const std::string_view& str, diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index e40938e45fe..079536f57cf 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -397,7 +397,7 @@ Status OlapTableBlockConvertor::_internal_validate_column( break; } case TYPE_MAP: { - const auto column_map = assert_cast<const vectorized::ColumnMap*>(real_column_ptr.get()); + const auto* column_map = assert_cast<const vectorized::ColumnMap*>(real_column_ptr.get()); const auto* type_map = assert_cast<const vectorized::DataTypeMap*>(remove_nullable(type).get()); auto key_type = type_map->get_key_type(); @@ -409,6 +409,9 @@ Status OlapTableBlockConvertor::_internal_validate_column( permutation[c] = rows ? (*rows)[r] : r; } } + + RETURN_IF_ERROR((const_cast<ColumnMap*>(column_map))->deduplicate_keys(true)); + fmt::format_to(error_prefix, "MAP type failed: "); RETURN_IF_ERROR(_validate_column(state, key_type, column_map->get_keys_ptr(), slot_index, error_prefix, permutation.size(), &permutation)); diff --git a/be/test/vec/core/column_map_test.cpp b/be/test/vec/core/column_map_test.cpp index 8af75766f18..58ec30aa979 100644 --- a/be/test/vec/core/column_map_test.cpp +++ b/be/test/vec/core/column_map_test.cpp @@ -17,14 +17,19 @@ #include "vec/columns/column_map.h" +#include <gtest/gtest-death-test.h> #include <gtest/gtest-message.h> #include <gtest/gtest-test-part.h> +#include <gtest/gtest.h> + +#include <cstdint> #include "gtest/gtest_pred_impl.h" #include "vec/columns/column.h" #include "vec/columns/column_string.h" #include "vec/columns/column_vector.h" #include "vec/core/field.h" +#include "vec/data_types/data_type_nullable.h" namespace doris::vectorized { TEST(ColumnMapTest2, StringKeyTest) { @@ -90,6 +95,213 @@ TEST(ColumnMapTest2, StringKeyTest) { } }; +TEST(ColumnMapTest2, StringKeyTestDuplicatedKeys) { + auto col_map_str = ColumnMap( + ColumnNullable::create(ColumnString::create(), ColumnVector<uint8_t>::create()), + ColumnInt32::create(), ColumnArray::ColumnOffsets::create()); + Array k1 = {"a", "b", "c", "a", "b", "c"}; + Array v1 = {1, 2, 3, 4, 5, 6}; + { + Map map; + map.push_back(k1); + map.push_back(v1); + col_map_str.insert(map); + } + { + Map map; + map.push_back(k1); + map.push_back(v1); + col_map_str.insert(map); + } + Array k2 = {"aa", "bb", "cc", "aa", "cc"}; + Array v2 = {11, 22, 33, 111, 333}; + { + Map map; + map.push_back(k2); + map.push_back(v2); + col_map_str.insert(map); + } + Array k3 = {"aaa", "bbb", Null(), "", "ccc", "ccc", "", Null()}; + Array v3 = {111, 222, 4321, 999, 333, 3333, 9988, 1234}; + { + Map map; + map.push_back(k3); + map.push_back(v3); + col_map_str.insert(map); + } + + ASSERT_EQ(col_map_str.size(), 4); + auto& keys = col_map_str.get_keys(); + auto& values = col_map_str.get_values(); + + ASSERT_EQ(keys.size(), 25); + ASSERT_EQ(keys.size(), values.size()); + + auto st = col_map_str.deduplicate_keys(); + ASSERT_TRUE(st.ok()) << st.to_string(); + + ASSERT_EQ(keys.size(), 14); + ASSERT_EQ(keys.size(), values.size()); + + auto& offsets = col_map_str.get_offsets(); + + auto& nullable_keys = assert_cast<ColumnNullable&>(keys); + auto& string_keys = assert_cast<ColumnString&>(nullable_keys.get_nested_column()); + auto& int_values = assert_cast<ColumnInt32&>(values); + + ASSERT_EQ(offsets.size(), 4); + ASSERT_EQ(offsets[0], 3); + ASSERT_EQ(offsets[1], 6); + ASSERT_EQ(offsets[2], 9); + ASSERT_EQ(offsets[3], 14); + + ASSERT_EQ(string_keys.get_element(0), "a"); + ASSERT_EQ(string_keys.get_element(1), "b"); + ASSERT_EQ(string_keys.get_element(2), "c"); + + ASSERT_EQ(string_keys.get_element(3), "a"); + ASSERT_EQ(string_keys.get_element(4), "b"); + ASSERT_EQ(string_keys.get_element(5), "c"); + + ASSERT_EQ(string_keys.get_element(6), "bb"); + ASSERT_EQ(string_keys.get_element(7), "aa"); + ASSERT_EQ(string_keys.get_element(8), "cc"); + + ASSERT_EQ(string_keys.get_element(9), "aaa"); + ASSERT_EQ(string_keys.get_element(10), "bbb"); + ASSERT_EQ(string_keys.get_element(11), "ccc"); + ASSERT_EQ(string_keys.get_element(12), ""); + ASSERT_TRUE(nullable_keys.is_null_at(13)); + + ASSERT_EQ(int_values.get_element(0), 4); + ASSERT_EQ(int_values.get_element(1), 5); + ASSERT_EQ(int_values.get_element(2), 6); + + ASSERT_EQ(int_values.get_element(3), 4); + ASSERT_EQ(int_values.get_element(4), 5); + ASSERT_EQ(int_values.get_element(5), 6); + + ASSERT_EQ(int_values.get_element(6), 22); + ASSERT_EQ(int_values.get_element(7), 111); + ASSERT_EQ(int_values.get_element(8), 333); + + ASSERT_EQ(int_values.get_element(9), 111); + ASSERT_EQ(int_values.get_element(10), 222); + ASSERT_EQ(int_values.get_element(11), 3333); + ASSERT_EQ(int_values.get_element(12), 9988); + ASSERT_EQ(int_values.get_element(13), 1234); +}; + +TEST(ColumnMapTest2, StringKeyTestDuplicatedKeysNestedMap) { + auto col_map_str = ColumnMap(ColumnString::create(), + ColumnMap::create(ColumnString::create(), ColumnInt32::create(), + ColumnArray::ColumnOffsets::create()), + ColumnArray::ColumnOffsets::create()); + + Map inner_map; + { + Array k1 = {"a", "b", "c", "a", "b", "c"}; + Array v1 = {1, 2, 3, 4, 5, 6}; + inner_map.push_back(k1); + inner_map.push_back(v1); + } + + Map inner_map2; + { + Array k1 = {"a", "b", "c", "a", "b", "c"}; + Array v1 = {1, 2, 3, 4, 5, 6}; + inner_map2.push_back(k1); + inner_map2.push_back(v1); + } + + Array k1 = {"a", "a"}; + Array v1 = {inner_map, inner_map2}; + { + Map map; + map.push_back(k1); + map.push_back(v1); + col_map_str.insert(map); + } + + Map inner_map3; + { + Array k2 = {"aa", "bb", "cc", "aa", "cc"}; + Array v2 = {11, 22, 33, 111, 333}; + inner_map3.push_back(k2); + inner_map3.push_back(v2); + } + + Map inner_map4; + { + Array k2 = {"aa", "cc", "cc"}; + Array v2 = {11, 33, 333}; + inner_map4.push_back(k2); + inner_map4.push_back(v2); + } + Array k2 = {"aa", "aa"}; + Array v2 = {inner_map3, inner_map4}; + { + Map map; + map.push_back(k2); + map.push_back(v2); + col_map_str.insert(map); + } + + ASSERT_EQ(col_map_str.size(), 2); + auto& keys = col_map_str.get_keys(); + auto& values = col_map_str.get_values(); + + ASSERT_EQ(keys.size(), 4); + ASSERT_EQ(keys.size(), values.size()); + + auto st = col_map_str.deduplicate_keys(true); + ASSERT_TRUE(st.ok()) << st.to_string(); + + ASSERT_EQ(keys.size(), 2); + ASSERT_EQ(keys.size(), values.size()); + + auto& offsets = col_map_str.get_offsets(); + auto& string_keys = assert_cast<ColumnString&>(keys); + auto& map_values = assert_cast<ColumnMap&>(values); + + ASSERT_EQ(offsets.size(), 2); + ASSERT_EQ(offsets[0], 1); + ASSERT_EQ(offsets[1], 2); + + ASSERT_EQ(string_keys.get_element(0), "a"); + ASSERT_EQ(string_keys.get_element(1), "aa"); + + auto map_value1 = get<Array>(map_values[0]); + auto map_value2 = get<Array>(map_values[1]); + + ASSERT_EQ(map_value1.size(), 2); + ASSERT_EQ(map_value2.size(), 2); + + // keys + auto v1_keys = get<Array>(map_value1[0]); + ASSERT_EQ(v1_keys.size(), 3); + ASSERT_EQ(get<std::string>(v1_keys[0]), "a"); + ASSERT_EQ(get<std::string>(v1_keys[1]), "b"); + ASSERT_EQ(get<std::string>(v1_keys[2]), "c"); + + auto v2_keys = get<Array>(map_value2[0]); + ASSERT_EQ(v2_keys.size(), 2); + ASSERT_EQ(get<std::string>(v2_keys[0]), "aa"); + ASSERT_EQ(get<std::string>(v2_keys[1]), "cc"); + + // values + auto v1_values = get<Array>(map_value1[1]); + ASSERT_EQ(v1_values.size(), 3); + ASSERT_EQ(get<int32_t>(v1_values[0]), 4); + ASSERT_EQ(get<int32_t>(v1_values[1]), 5); + ASSERT_EQ(get<int32_t>(v1_values[2]), 6); + + auto v2_values = get<Array>(map_value2[1]); + ASSERT_EQ(v2_values.size(), 2); + ASSERT_EQ(get<int32_t>(v2_values[0]), 11); + ASSERT_EQ(get<int32_t>(v2_values[1]), 333); +}; + TEST(ColumnMapTest2, StringValueTest) { auto col_map_str64 = ColumnMap(ColumnInt64::create(), ColumnString64::create(), ColumnArray::ColumnOffsets::create()); diff --git a/regression-test/data/nereids_function_p0/cast_function/test_cast_map_function.out b/regression-test/data/nereids_function_p0/cast_function/test_cast_map_function.out index 2b4e1abd302..51590dc2f19 100644 Binary files a/regression-test/data/nereids_function_p0/cast_function/test_cast_map_function.out and b/regression-test/data/nereids_function_p0/cast_function/test_cast_map_function.out differ diff --git a/regression-test/data/nereids_function_p0/scalar_function/Map.out b/regression-test/data/nereids_function_p0/scalar_function/Map.out index a272d3cf2c3..f661ec4c94f 100644 Binary files a/regression-test/data/nereids_function_p0/scalar_function/Map.out and b/regression-test/data/nereids_function_p0/scalar_function/Map.out differ diff --git a/regression-test/data/nereids_p0/datatype/test_map.out b/regression-test/data/nereids_p0/datatype/test_map.out index 4ac971fb3a1..554f66bf132 100644 Binary files a/regression-test/data/nereids_p0/datatype/test_map.out and b/regression-test/data/nereids_p0/datatype/test_map.out differ diff --git a/regression-test/suites/nereids_p0/datatype/test_map.groovy b/regression-test/suites/nereids_p0/datatype/test_map.groovy index f486342c379..966478abc72 100644 --- a/regression-test/suites/nereids_p0/datatype/test_map.groovy +++ b/regression-test/suites/nereids_p0/datatype/test_map.groovy @@ -35,6 +35,11 @@ suite("test_map") { sql 'insert into `test_map_table` values (5, 2, {"key2_2": "value2_2", "key22_2": "value22_2"});' sql 'insert into `test_map_table` values (6, 3, {"key3": "value3", "key33": "value33", "key3333": "value333"});' sql 'insert into `test_map_table` values (7, 4, {"key4": "value4", "key44": "value44", "key444": "value444", "key4444": "value4444"});' + sql 'insert into `test_map_table` values (7, 4, {"key5": "value5", "key44": "value44", null: "null", "key4": "value444", null: "null2", "key44": "value4444"});' + + qt_sql """ + select * from test_map_table order by 1, 2; + """ sql "DROP TABLE IF EXISTS `test_map_table_right`" sql """ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
