This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5d3fc457facecf49bcf8afcfaa9a2711fd565658 Author: daidai <[email protected]> AuthorDate: Tue Jul 11 22:37:48 2023 +0800 [feature](hive)add read of the hive table textfile format array type (#21514) --- be/src/exec/text_converter.cpp | 191 ++++++++++++++++++++- be/src/exec/text_converter.h | 5 +- be/src/vec/exec/format/csv/csv_reader.cpp | 10 +- be/src/vec/exec/format/csv/csv_reader.h | 2 + .../doris/planner/external/HiveScanNode.java | 22 ++- gensrc/thrift/PlanNodes.thrift | 1 + .../hive/test_hive_to_array.out | 22 +++ .../hive/test_hive_to_array.groovy | 46 +++++ 8 files changed, 291 insertions(+), 8 deletions(-) diff --git a/be/src/exec/text_converter.cpp b/be/src/exec/text_converter.cpp index 5d56c74249..609338d6cd 100644 --- a/be/src/exec/text_converter.cpp +++ b/be/src/exec/text_converter.cpp @@ -33,6 +33,7 @@ #include "runtime/types.h" #include "util/slice.h" #include "util/string_parser.hpp" +#include "vec/columns/column_array.h" #include "vec/columns/column_complex.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" @@ -42,7 +43,8 @@ namespace doris { -TextConverter::TextConverter(char escape_char) : _escape_char(escape_char) {} +TextConverter::TextConverter(char escape_char, char array_delimiter) + : _escape_char(escape_char), _array_delimiter(array_delimiter) {} void TextConverter::write_string_column(const SlotDescriptor* slot_desc, vectorized::MutableColumnPtr* column_ptr, const char* data, @@ -278,6 +280,193 @@ bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc, .resize_fill(origin_size + rows, value); break; } + case TYPE_ARRAY: { + std::function<vectorized::Array(int, int, char, const TypeDescriptor&)> func = + [&](int left, int right, char split, + const TypeDescriptor& type) -> vectorized::Array { + vectorized::Array array; + int fr = left; + for (int i = left; i <= right + 1; i++) { + auto Sub_type = type.children[0]; + if (i <= right && data[i] != split && data[i] != _array_delimiter) { + continue; + } + if (Sub_type.type == TYPE_ARRAY) { + array.push_back(func(fr, i - 1, split + 1, Sub_type)); + } else { + StringParser::ParseResult local_parse_result = StringParser::PARSE_SUCCESS; + switch (Sub_type.type) { + case TYPE_HLL: { + DCHECK(false) << "not support type: " + << "array<HyperLogLog>\n"; + break; + } + case TYPE_STRING: + case TYPE_VARCHAR: + case TYPE_CHAR: { + size_t sz = i - fr; + if (need_escape) { + unescape_string_on_spot(data + fr, &sz); + } + array.push_back(std::string(data + fr, sz)); + break; + } + case TYPE_BOOLEAN: { + bool num = StringParser::string_to_bool(data + fr, i - fr, + &local_parse_result); + array.push_back((uint8_t)num); + break; + } + case TYPE_TINYINT: { + int8_t num = StringParser::string_to_int<int8_t>(data + fr, i - fr, + &local_parse_result); + array.push_back(num); + break; + } + case TYPE_SMALLINT: { + int16_t num = StringParser::string_to_int<int16_t>(data + fr, i - fr, + &local_parse_result); + array.push_back(num); + break; + } + case TYPE_INT: { + int32_t num = StringParser::string_to_int<int32_t>(data + fr, i - fr, + &local_parse_result); + array.push_back(num); + break; + } + case TYPE_BIGINT: { + int64_t num = StringParser::string_to_int<int64_t>(data + fr, i - fr, + &local_parse_result); + array.push_back(num); + break; + } + case TYPE_LARGEINT: { + __int128 num = StringParser::string_to_int<__int128>(data + fr, i - fr, + &local_parse_result); + array.push_back(num); + break; + } + case TYPE_FLOAT: { + float num = StringParser::string_to_float<float>(data + fr, i - fr, + &local_parse_result); + array.push_back(num); + break; + } + case TYPE_DOUBLE: { + double num = StringParser::string_to_float<double>(data + fr, i - fr, + &local_parse_result); + array.push_back(num); + break; + } + case TYPE_DATE: { + vectorized::VecDateTimeValue ts_slot; + if (!ts_slot.from_date_str(data + fr, i - fr)) { + local_parse_result = StringParser::PARSE_FAILURE; + break; + } + ts_slot.cast_to_date(); + array.push_back(*reinterpret_cast<int64_t*>(&ts_slot)); + break; + } + case TYPE_DATEV2: { + vectorized::DateV2Value<vectorized::DateV2ValueType> ts_slot; + if (!ts_slot.from_date_str(data + fr, i - fr)) { + local_parse_result = StringParser::PARSE_FAILURE; + break; + } + uint32_t int_val = ts_slot.to_date_int_val(); + array.push_back(int_val); + break; + } + case TYPE_DATETIME: { + vectorized::VecDateTimeValue ts_slot; + if (!ts_slot.from_date_str(data + fr, i - fr)) { + local_parse_result = StringParser::PARSE_FAILURE; + break; + } + ts_slot.to_datetime(); + array.push_back((int64_t)ts_slot); + break; + } + case TYPE_DATETIMEV2: { + vectorized::DateV2Value<vectorized::DateTimeV2ValueType> ts_slot; + if (!ts_slot.from_date_str(data + fr, i - fr)) { + local_parse_result = StringParser::PARSE_FAILURE; + break; + } + uint64_t int_val = ts_slot.to_date_int_val(); + array.push_back(int_val); + break; + } + case TYPE_DECIMALV2: { + DecimalV2Value decimal_slot; + if (decimal_slot.parse_from_str(data + fr, i - fr)) { + local_parse_result = StringParser::PARSE_FAILURE; + break; + } + array.push_back(decimal_slot.value()); + break; + } + case TYPE_DECIMAL32: { + StringParser::ParseResult result = StringParser::PARSE_SUCCESS; + int32_t value = StringParser::string_to_decimal<int32_t>( + data + fr, i - fr, Sub_type.precision, Sub_type.scale, &result); + if (result != StringParser::PARSE_SUCCESS) { + local_parse_result = StringParser::PARSE_FAILURE; + break; + } + array.push_back(value); + break; + } + case TYPE_DECIMAL64: { + StringParser::ParseResult result = StringParser::PARSE_SUCCESS; + int64_t value = StringParser::string_to_decimal<int64_t>( + data + fr, i - fr, Sub_type.precision, Sub_type.scale, &result); + if (result != StringParser::PARSE_SUCCESS) { + local_parse_result = StringParser::PARSE_FAILURE; + break; + } + array.push_back(value); + break; + } + case TYPE_DECIMAL128I: { + StringParser::ParseResult result = StringParser::PARSE_SUCCESS; + vectorized::Int128 value = + StringParser::string_to_decimal<vectorized::Int128>( + data + fr, i - fr, Sub_type.precision, Sub_type.scale, + &result); + if (result != StringParser::PARSE_SUCCESS) { + local_parse_result = StringParser::PARSE_FAILURE; + break; + } + array.push_back(value); + break; + } + default: { + DCHECK(false) << "bad slot type: array<" << Sub_type << ">"; + break; + } + } + + if (local_parse_result != StringParser::PARSE_SUCCESS) { + parse_result = local_parse_result; + return array; + } + } + fr = i + 1; + } + return array; + }; + + auto array = func(0, len - 1, '\002', slot_desc->type()); + + for (int i = 0; i < rows; i++) { + reinterpret_cast<vectorized::ColumnArray*>(col_ptr)->insert(array); + } + + break; + } default: DCHECK(false) << "bad slot type: " << slot_desc->type(); break; diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h index 9615471a8f..b46594fd04 100644 --- a/be/src/exec/text_converter.h +++ b/be/src/exec/text_converter.h @@ -29,7 +29,7 @@ class SlotDescriptor; // numeric types, etc. class TextConverter { public: - TextConverter(char escape_char); + TextConverter(char escape_char, char array_delimiter = '\2'); void write_string_column(const SlotDescriptor* slot_desc, vectorized::MutableColumnPtr* column_ptr, const char* data, @@ -55,8 +55,11 @@ public: size_t rows); void unescape_string_on_spot(const char* src, size_t* len); + void set_array_delimiter(char array_delimiter) { _array_delimiter = array_delimiter; } + private: char _escape_char; + char _array_delimiter; }; } // namespace doris diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 8db8579402..7390d9103a 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -91,7 +91,7 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte _file_compress_type = _params.compress_type; _size = _range.size; - _text_converter.reset(new (std::nothrow) TextConverter('\\')); + _text_converter.reset(new (std::nothrow) TextConverter('\\', _array_delimiter[0])); _split_values.reserve(sizeof(Slice) * _file_slot_descs.size()); _init_system_properties(); _init_file_description(); @@ -187,6 +187,10 @@ Status CsvReader::init_reader(bool is_load) { _line_delimiter = _params.file_attributes.text_params.line_delimiter; _line_delimiter_length = _line_delimiter.size(); + //get array delimiter + _array_delimiter = _params.file_attributes.text_params.array_delimiter; + _text_converter->set_array_delimiter(_array_delimiter[0]); + if (_params.file_attributes.__isset.trim_double_quotes) { _trim_double_quotes = _params.file_attributes.trim_double_quotes; } @@ -672,6 +676,10 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { _line_delimiter = _params.file_attributes.text_params.line_delimiter; _line_delimiter_length = _line_delimiter.size(); + //get array delimiter + _array_delimiter = _params.file_attributes.text_params.array_delimiter; + _text_converter->set_array_delimiter(_array_delimiter[0]); + // create decompressor. // _decompressor may be nullptr if this is not a compressed file RETURN_IF_ERROR(_create_decompressor()); diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 83010af83b..da3505e09b 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -142,6 +142,8 @@ private: std::string _value_separator; std::string _line_delimiter; + std::string _array_delimiter; + int _value_separator_length; int _line_delimiter_length; bool _trim_double_quotes = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index ec85f8eb25..52bcc67c32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -67,8 +67,13 @@ public class HiveScanNode extends FileQueryScanNode { public static final String PROP_FIELD_DELIMITER = "field.delim"; public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01" + + public static final String PROP_LINE_DELIMITER = "line.delim"; public static final String DEFAULT_LINE_DELIMITER = "\n"; + public static final String PROP_ARRAY_DELIMITER_HIVE2 = "colelction.delim"; + public static final String PROP_ARRAY_DELIMITER_HIVE3 = "collection.delim"; + public static final String DEFAULT_ARRAY_DELIMITER = "\2"; protected final HMSExternalTable hmsTable; private HiveTransaction hiveTransaction = null; @@ -98,9 +103,9 @@ public class HiveScanNode extends FileQueryScanNode { String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); if (inputFormat.contains("TextInputFormat")) { for (SlotDescriptor slot : desc.getSlots()) { - if (!slot.getType().isScalarType()) { + if (slot.getType().isMapType() || slot.getType().isStructType()) { throw new UserException("For column `" + slot.getColumn().getName() - + "`, The column types ARRAY/MAP/STRUCT are not supported yet" + + "`, The column types MAP/STRUCT are not supported yet" + " for text input format of Hive. "); } } @@ -262,9 +267,16 @@ public class HiveScanNode extends FileQueryScanNode { @Override protected TFileAttributes getFileAttributes() throws UserException { TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() - .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER)); - textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER); + java.util.Map<String, String> delimiter = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); + textParams.setColumnSeparator(delimiter.getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER)); + textParams.setLineDelimiter(delimiter.getOrDefault(PROP_LINE_DELIMITER, DEFAULT_LINE_DELIMITER)); + if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE2) != null) { + textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE2)); + } else if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE3) != null) { + textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE3)); + } else { + textParams.setArrayDelimiter(DEFAULT_ARRAY_DELIMITER); + } TFileAttributes fileAttributes = new TFileAttributes(); fileAttributes.setTextParams(textParams); fileAttributes.setHeaderType(""); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 978efee422..1b74b0cc70 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -243,6 +243,7 @@ struct TEsScanRange { struct TFileTextScanRangeParams { 1: optional string column_separator; 2: optional string line_delimiter; + 3: optional string array_delimiter; } struct TFileScanSlotInfo { diff --git a/regression-test/data/external_table_emr_p2/hive/test_hive_to_array.out b/regression-test/data/external_table_emr_p2/hive/test_hive_to_array.out new file mode 100644 index 0000000000..f14ff73656 --- /dev/null +++ b/regression-test/data/external_table_emr_p2/hive/test_hive_to_array.out @@ -0,0 +1,22 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !types -- +[1, 2, 3, 4] [10, 20, 30, 40] [100, 200, 300] [100000000000000, 20000000000000, 30000000000000, 40000000000000] [1, 0, 1] [1.23, 4.56, 7.89] [10.1, 20.2, 30.3] ["apple", "banana", "orange"] [2023-07-04 12:00:00.000000, 2023-07-05 12:00:00.000000, 2023-07-06 12:00:00.000000] [2023-07-04, 2023-07-05, 2023-07-06] [1111111.111, 2111111.111, 3111111.111] ["a", "b", "c"] ["aa", "bb", "cc"] +[10, 20, 30] [100, 200, 300, 400] [1000, 2000, 3000] [1000000000000000, 200000000000000, 300000000000000, 400000000000000] [1, 1, 1, 1] [12.3, 45.6, 78.9] [100.1, 200.2, 300.3] ["grapes", "watermelon", "kiwi"] [2023-07-03 12:00:00.000000, 2023-07-03 12:00:00.000000, 2023-07-03 12:00:00.000000] [2021-07-05, 2021-07-05, 2021-07-05] [2222222.111, 2222222.112, 2222222.113] \N \N +[20, 30, 40, 50] [200, 300, 400, 500] [10000, 20000, 30000] [100000000000000, 20000000000000, 30000000000000] [1, 1, 1, 1, 0, 0] [120.3, 450.6, 780.9] [100.001, 200.002, 300.003] ["melon", "strawberry", "blueberry"] [2023-07-02 12:00:00.000000, 2023-07-02 12:00:00.000000, 2023-07-02 12:00:00.000000] [2021-07-06, 2021-07-06, 2021-07-06] [1111111.111, 2111111.111, 3111111.111] \N \N +[40, 50, 60, 70] [210, 310, 410, 510] [110000, 210000, 310000] [400000000000000, 50000000000000, 60000000000000] [1] [120.301, 450.602, 780.90009] [100.0000001, 200.0000002, 300.0000003] ["hello", "world"] [2023-07-02 12:00:00.000000, 2023-07-02 12:00:00.000000, 2023-07-02 12:00:00.000000] [2021-07-06, 2021-07-06, 2021-07-06] [3311111.111, 2211111.111, 3121111.111] ["1"] ["hello", "world"] +[4, 5, 6, 7] [2100, 3100, 4100, 5100] [110000, 220000, 330000] [60000000000000, 60000000000000, 60000000000000] [1] [120.301, 450.602, 780.90009] [100.0000001, 200.0000002, 300.0000003] ["hive", "text", "file", "format"] [2023-07-09 12:00:00.000000, 2023-07-09 12:00:00.000000, 2023-07-09 12:00:00.000000] [2021-07-09, 2021-07-09, 2021-07-09] [3311111.111, 2211111.111, 3121111.111] ["d", "d", "d", "d"] ["ffffffff"] + +-- !array -- +1 [[[1, 2, 3], [4, 5, 6], [7, 8, 9]], [[10, 11, 12], [13, 14, 15], [16, 17, 18]]] +2 [[[19, 20, 21], [22, 23, 24], [25, 26, 27]], [[28], [31], [34]], [[28, 29], [31, 32], [34, 35]]] +3 [[[1, 2, 3], [4, 5, 6], [7, 8, 9]]] +4 [[[1]]] +5 [[[1, 2, 3], [4, 5, 6], [7, 8, 9]], [[10, 11, 12], [13, 14, 15], [16, 17, 18]], [[1, 2, 3], [4, 5, 6], [7, 8, 9]], [[10, 11, 12], [13, 14, 15], [16, 17, 18]]] + +-- !delimiter -- +[1, 2, 3, 4] [10, 20, 30, 40] [100, 200, 300] [100000000000000, 20000000000000, 30000000000000, 40000000000000] [1, 0, 1] [1.23, 4.56, 7.89] [10.1, 20.2, 30.3] ["apple", "banana", "orange"] [2023-07-04 12:00:00.000000, 2023-07-05 12:00:00.000000, 2023-07-06 12:00:00.000000] [2023-07-04, 2023-07-05, 2023-07-06] [1111111.111, 2111111.111, 3111111.111] 1 [[[1, 2, 3], [4, 5, 6], [7, 8, 9]], [[10, 11, 12], [13, 14, 15], [16, 17, 18]]] +[10, 20, 30] [100, 200, 300, 400] [1000, 2000, 3000] [1000000000000000, 200000000000000, 300000000000000, 400000000000000] [1, 1, 1, 1] [12.3, 45.6, 78.9] [100.1, 200.2, 300.3] ["grapes", "watermelon", "kiwi"] [2023-07-03 12:00:00.000000, 2023-07-03 12:00:00.000000, 2023-07-03 12:00:00.000000] [2021-07-05, 2021-07-05, 2021-07-05] [2222222.111, 2222222.112, 2222222.113] 2 [[[19, 20, 21], [22, 23, 24], [25, 26, 27]], [[28], [31], [34]], [[28, 29], [31, 32], [34, 35]]] +[20, 30, 40, 50] [200, 300, 400, 500] [10000, 20000, 30000] [100000000000000, 20000000000000, 30000000000000] [1, 1, 1, 1, 0, 0] [120.3, 450.6, 780.9] [100.001, 200.002, 300.003] ["melon", "strawberry", "blueberry"] [2023-07-02 12:00:00.000000, 2023-07-02 12:00:00.000000, 2023-07-02 12:00:00.000000] [2021-07-06, 2021-07-06, 2021-07-06] [1111111.111, 2111111.111, 3111111.111] 3 [[[1, 2, 3], [4, 5, 6], [7, 8, 9]]] +[40, 50, 60, 70] [210, 310, 410, 510] [110000, 210000, 310000] [400000000000000, 50000000000000, 60000000000000] [1] [120.301, 450.602, 780.90009] [100.0000001, 200.0000002, 300.0000003] ["hello", "world"] [2023-07-02 12:00:00.000000, 2023-07-02 12:00:00.000000, 2023-07-02 12:00:00.000000] [2021-07-06, 2021-07-06, 2021-07-06] [3311111.111, 2211111.111, 3121111.111] 4 [[[1]]] +[4, 5, 6, 7] [2100, 3100, 4100, 5100] [110000, 220000, 330000] [60000000000000, 60000000000000, 60000000000000] [1] [120.301, 450.602, 780.90009] [100.0000001, 200.0000002, 300.0000003] ["hive", "text", "file", "format"] [2023-07-09 12:00:00.000000, 2023-07-09 12:00:00.000000, 2023-07-09 12:00:00.000000] [2021-07-09, 2021-07-09, 2021-07-09] [3311111.111, 2211111.111, 3121111.111] 5 [[[1, 2, 3], [4, 5, 6], [7, 8, 9]], [[10, 11, 12], [13, 14, 15], [16, 17, 18]], [[1, 2, 3], [4, 5, 6], [7, [...] + diff --git a/regression-test/suites/external_table_emr_p2/hive/test_hive_to_array.groovy b/regression-test/suites/external_table_emr_p2/hive/test_hive_to_array.groovy new file mode 100644 index 0000000000..eb9384e6c8 --- /dev/null +++ b/regression-test/suites/external_table_emr_p2/hive/test_hive_to_array.groovy @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_to_array", "p2") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "test_hive_to_array" + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hadoop.username' = 'hadoop', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + + sql """ use multi_catalog """ + + qt_types """ select * from hive_textfile_array_all_types""" + + qt_array """ select * from hive_textfile_nestedarray""" + + qt_delimiter """ select * from hive_textfile_array_delimiter""" + + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
