This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3acffaa2053d9f0ce4ab82542234196fc9a5b482 Author: Pxl <[email protected]> AuthorDate: Mon Feb 26 10:36:35 2024 +0800 [Feature](agg-state) support write_column_to_pb from DataTypeFixedLengthObjectSerDe (#31171) --- be/src/vec/data_types/data_type_agg_state.h | 2 +- .../vec/data_types/data_type_fixed_length_object.h | 2 +- .../serde/data_type_fixedlengthobject_serde.cpp | 22 -- .../serde/data_type_fixedlengthobject_serde.h | 110 -------- .../data_types/serde/data_type_string_serde.cpp | 281 --------------------- .../vec/data_types/serde/data_type_string_serde.h | 233 +++++++++++++++-- .../agg_state/avg/test_agg_state_avg.out | 14 + .../test_agg_state_avg.groovy} | 13 +- .../test_agg_state_quantile_union.groovy | 3 + .../datatype_p0/agg_state/test_agg_state.groovy | 1 - 10 files changed, 241 insertions(+), 440 deletions(-) diff --git a/be/src/vec/data_types/data_type_agg_state.h b/be/src/vec/data_types/data_type_agg_state.h index 7531adc106b..ff6f1975e58 100644 --- a/be/src/vec/data_types/data_type_agg_state.h +++ b/be/src/vec/data_types/data_type_agg_state.h @@ -29,7 +29,7 @@ #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_fixed_length_object.h" #include "vec/data_types/data_type_string.h" -#include "vec/data_types/serde/data_type_fixedlengthobject_serde.h" +#include "vec/data_types/serde/data_type_string_serde.h" namespace doris::vectorized { diff --git a/be/src/vec/data_types/data_type_fixed_length_object.h b/be/src/vec/data_types/data_type_fixed_length_object.h index c7d5ab65290..935e0a93043 100644 --- a/be/src/vec/data_types/data_type_fixed_length_object.h +++ b/be/src/vec/data_types/data_type_fixed_length_object.h @@ -25,7 +25,7 @@ #include <typeinfo> #include "runtime/define_primitive_type.h" -#include "serde/data_type_fixedlengthobject_serde.h" +#include "serde/data_type_string_serde.h" #include "vec/columns/column_fixed_length_object.h" #include "vec/core/field.h" #include "vec/core/types.h" diff --git a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.cpp b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.cpp deleted file mode 100644 index 84a89ba3f20..00000000000 --- a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.cpp +++ /dev/null @@ -1,22 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#include "data_type_fixedlengthobject_serde.h" - -namespace doris { - -namespace vectorized {} -} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h b/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h deleted file mode 100644 index f26bfb8ef71..00000000000 --- a/be/src/vec/data_types/serde/data_type_fixedlengthobject_serde.h +++ /dev/null @@ -1,110 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <glog/logging.h> -#include <stdint.h> - -#include <ostream> - -#include "common/status.h" -#include "data_type_serde.h" -#include "util/jsonb_writer.h" - -namespace doris { -class PValues; -class JsonbValue; - -namespace vectorized { -class IColumn; -class Arena; - -class DataTypeFixedLengthObjectSerDe : public DataTypeSerDe { -public: - DataTypeFixedLengthObjectSerDe(int nesting_level = 1) : DataTypeSerDe(nesting_level) {}; - - Status serialize_one_cell_to_json(const IColumn& column, int row_num, BufferWritable& bw, - FormatOptions& options) const override { - return Status::NotSupported("serialize_one_cell_to_json with type [{}]", column.get_name()); - } - - Status serialize_column_to_json(const IColumn& column, int start_idx, int end_idx, - BufferWritable& bw, FormatOptions& options) const override { - return Status::NotSupported("serialize_column_to_json with type [{}]", column.get_name()); - } - Status deserialize_one_cell_from_json(IColumn& column, Slice& slice, - const FormatOptions& options) const override { - return Status::NotSupported("deserialize_one_cell_from_text with type " + - column.get_name()); - } - - Status deserialize_column_from_json_vector(IColumn& column, std::vector<Slice>& slices, - int* num_deserialized, - const FormatOptions& options) const override { - return Status::NotSupported("deserialize_column_from_text_vector with type " + - column.get_name()); - } - - Status write_column_to_pb(const IColumn& column, PValues& result, int start, - int end) const override { - return Status::NotSupported("write_column_to_pb with type " + column.get_name()); - } - Status read_column_from_pb(IColumn& column, const PValues& arg) const override { - return Status::NotSupported("read_column_from_pb with type " + column.get_name()); - }; - void write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, Arena* mem_pool, - int32_t col_id, int row_num) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "write_one_cell_to_jsonb with type " + column.get_name()); - } - - void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "read_one_cell_from_jsonb with type " + column.get_name()); - } - void write_column_to_arrow(const IColumn& column, const NullMap* null_map, - arrow::ArrayBuilder* array_builder, int start, - int end) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "write_column_to_arrow with type " + column.get_name()); - } - void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, - int end, const cctz::time_zone& ctz) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "read_column_from_arrow with type " + column.get_name()); - } - - Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer, - int row_idx, bool col_const) const override { - return Status::NotSupported("write_column_to_mysql with type " + column.get_name()); - } - - Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer, - int row_idx, bool col_const) const override { - return Status::NotSupported("write_column_to_pb with type " + column.get_name()); - } - - Status write_column_to_orc(const std::string& timezone, const IColumn& column, - const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch, - int start, int end, - std::vector<StringRef>& buffer_list) const override { - return Status::NotSupported("write_column_to_orc with type [{}]", column.get_name()); - } -}; -} // namespace vectorized -} // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_string_serde.cpp b/be/src/vec/data_types/serde/data_type_string_serde.cpp deleted file mode 100644 index 66d7e136592..00000000000 --- a/be/src/vec/data_types/serde/data_type_string_serde.cpp +++ /dev/null @@ -1,281 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "data_type_string_serde.h" - -#include <assert.h> -#include <gen_cpp/types.pb.h> -#include <rapidjson/document.h> -#include <rapidjson/stringbuffer.h> -#include <rapidjson/writer.h> -#include <stddef.h> - -#include "arrow/array/builder_binary.h" -#include "util/jsonb_document.h" -#include "util/jsonb_utils.h" -#include "vec/columns/column.h" -#include "vec/columns/column_const.h" -#include "vec/columns/column_string.h" -#include "vec/common/string_ref.h" - -namespace doris { -namespace vectorized { -class Arena; - -Status DataTypeStringSerDe::serialize_column_to_json(const IColumn& column, int start_idx, - int end_idx, BufferWritable& bw, - FormatOptions& options) const { - SERIALIZE_COLUMN_TO_JSON(); -} - -Status DataTypeStringSerDe::serialize_one_cell_to_json(const IColumn& column, int row_num, - BufferWritable& bw, - FormatOptions& options) const { - auto result = check_column_const_set_readability(column, row_num); - ColumnPtr ptr = result.first; - row_num = result.second; - - if (_nesting_level > 1) { - bw.write('"'); - } - const auto& value = assert_cast<const ColumnString&>(*ptr).get_data_at(row_num); - bw.write(value.data, value.size); - if (_nesting_level > 1) { - bw.write('"'); - } - return Status::OK(); -} - -Status DataTypeStringSerDe::deserialize_column_from_json_vector( - IColumn& column, std::vector<Slice>& slices, int* num_deserialized, - const FormatOptions& options) const { - DESERIALIZE_COLUMN_FROM_JSON_VECTOR() - return Status::OK(); -} -static void escape_string(const char* src, size_t& len, char escape_char) { - const char* start = src; - char* dest_ptr = const_cast<char*>(src); - const char* end = src + len; - bool escape_next_char = false; - - while (src < end) { - if (*src == escape_char) { - escape_next_char = !escape_next_char; - } else { - escape_next_char = false; - } - - if (escape_next_char) { - ++src; - } else { - *dest_ptr++ = *src++; - } - } - - len = dest_ptr - start; -} - -Status DataTypeStringSerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice, - const FormatOptions& options) const { - auto& column_data = assert_cast<ColumnString&>(column); - - /* - * For strings in the json complex type, we remove double quotes by default. - * - * Because when querying complex types, such as selecting complexColumn from table, - * we will add double quotes to the strings in the complex type. - * - * For the map<string,int> column, insert { "abc" : 1, "hello",2 }. - * If you do not remove the double quotes, it will display {""abc"":1,""hello"": 2 }, - * remove the double quotes to display { "abc" : 1, "hello",2 }. - * - */ - if (_nesting_level >= 2) { - slice.trim_quote(); - } - if (options.escape_char != 0) { - escape_string(slice.data, slice.size, options.escape_char); - } - column_data.insert_data(slice.data, slice.size); - return Status::OK(); -} - -Status DataTypeStringSerDe::write_column_to_pb(const IColumn& column, PValues& result, int start, - int end) const { - result.mutable_bytes_value()->Reserve(end - start); - auto ptype = result.mutable_type(); - ptype->set_id(PGenericType::STRING); - for (size_t row_num = start; row_num < end; ++row_num) { - StringRef data = column.get_data_at(row_num); - result.add_string_value(data.to_string()); - } - return Status::OK(); -} -Status DataTypeStringSerDe::read_column_from_pb(IColumn& column, const PValues& arg) const { - auto& col = reinterpret_cast<ColumnString&>(column); - col.reserve(arg.string_value_size()); - for (int i = 0; i < arg.string_value_size(); ++i) { - column.insert_data(arg.string_value(i).c_str(), arg.string_value(i).size()); - } - return Status::OK(); -} - -void DataTypeStringSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, - Arena* mem_pool, int32_t col_id, - int row_num) const { - result.writeKey(col_id); - const auto& data_ref = column.get_data_at(row_num); - result.writeStartBinary(); - result.writeBinary(reinterpret_cast<const char*>(data_ref.data), data_ref.size); - result.writeEndBinary(); -} -void DataTypeStringSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const { - assert(arg->isBinary()); - auto& col = reinterpret_cast<ColumnString&>(column); - auto blob = static_cast<const JsonbBlobVal*>(arg); - col.insert_data(blob->getBlob(), blob->getBlobLen()); -} - -void DataTypeStringSerDe::write_column_to_arrow(const IColumn& column, const NullMap* null_map, - arrow::ArrayBuilder* array_builder, int start, - int end) const { - const auto& string_column = assert_cast<const ColumnString&>(column); - auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder); - for (size_t string_i = start; string_i < end; ++string_i) { - if (null_map && (*null_map)[string_i]) { - checkArrowStatus(builder.AppendNull(), column.get_name(), - array_builder->type()->name()); - continue; - } - std::string_view string_ref = string_column.get_data_at(string_i).to_string_view(); - checkArrowStatus(builder.Append(string_ref.data(), string_ref.size()), column.get_name(), - array_builder->type()->name()); - } -} - -void DataTypeStringSerDe::read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, - int start, int end, - const cctz::time_zone& ctz) const { - auto& column_chars_t = assert_cast<ColumnString&>(column).get_chars(); - auto& column_offsets = assert_cast<ColumnString&>(column).get_offsets(); - if (arrow_array->type_id() == arrow::Type::STRING || - arrow_array->type_id() == arrow::Type::BINARY) { - auto concrete_array = dynamic_cast<const arrow::BinaryArray*>(arrow_array); - std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data(); - - for (size_t offset_i = start; offset_i < end; ++offset_i) { - if (!concrete_array->IsNull(offset_i)) { - const auto* raw_data = buffer->data() + concrete_array->value_offset(offset_i); - column_chars_t.insert(raw_data, raw_data + concrete_array->value_length(offset_i)); - } - column_offsets.emplace_back(column_chars_t.size()); - } - } else if (arrow_array->type_id() == arrow::Type::FIXED_SIZE_BINARY) { - auto concrete_array = dynamic_cast<const arrow::FixedSizeBinaryArray*>(arrow_array); - uint32_t width = concrete_array->byte_width(); - const auto* array_data = concrete_array->GetValue(start); - - for (size_t offset_i = 0; offset_i < end - start; ++offset_i) { - if (!concrete_array->IsNull(offset_i)) { - const auto* raw_data = array_data + (offset_i * width); - column_chars_t.insert(raw_data, raw_data + width); - } - column_offsets.emplace_back(column_chars_t.size()); - } - } -} - -template <bool is_binary_format> -Status DataTypeStringSerDe::_write_column_to_mysql(const IColumn& column, - MysqlRowBuffer<is_binary_format>& result, - int row_idx, bool col_const) const { - auto& col = assert_cast<const ColumnString&>(column); - const auto col_index = index_check_const(row_idx, col_const); - const auto string_val = col.get_data_at(col_index); - if (string_val.data == nullptr) { - if (string_val.size == 0) { - // 0x01 is a magic num, not useful actually, just for present "" - char* tmp_val = reinterpret_cast<char*>(0x01); - if (UNLIKELY(0 != result.push_string(tmp_val, string_val.size))) { - return Status::InternalError("pack mysql buffer failed."); - } - } else { - if (UNLIKELY(0 != result.push_null())) { - return Status::InternalError("pack mysql buffer failed."); - } - } - } else { - if (UNLIKELY(0 != result.push_string(string_val.data, string_val.size))) { - return Status::InternalError("pack mysql buffer failed."); - } - } - return Status::OK(); -} - -Status DataTypeStringSerDe::write_column_to_mysql(const IColumn& column, - MysqlRowBuffer<true>& row_buffer, int row_idx, - bool col_const) const { - return _write_column_to_mysql(column, row_buffer, row_idx, col_const); -} - -Status DataTypeStringSerDe::write_column_to_mysql(const IColumn& column, - MysqlRowBuffer<false>& row_buffer, int row_idx, - bool col_const) const { - return _write_column_to_mysql(column, row_buffer, row_idx, col_const); -} - -Status DataTypeStringSerDe::write_column_to_orc(const std::string& timezone, const IColumn& column, - const NullMap* null_map, - orc::ColumnVectorBatch* orc_col_batch, int start, - int end, - std::vector<StringRef>& buffer_list) const { - auto& col_data = assert_cast<const ColumnString&>(column); - orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); - - for (size_t row_id = start; row_id < end; row_id++) { - const auto& ele = col_data.get_data_at(row_id); - cur_batch->data[row_id] = const_cast<char*>(ele.data); - cur_batch->length[row_id] = ele.size; - } - - cur_batch->numElements = end - start; - return Status::OK(); -} - -void DataTypeStringSerDe::write_one_cell_to_json(const IColumn& column, rapidjson::Value& result, - rapidjson::Document::AllocatorType& allocator, - int row_num) const { - const auto& col = static_cast<const ColumnString&>(column); - const auto& data_ref = col.get_data_at(row_num); - result.SetString(data_ref.data, data_ref.size); -} - -void DataTypeStringSerDe::read_one_cell_from_json(IColumn& column, - const rapidjson::Value& result) const { - auto& col = static_cast<ColumnString&>(column); - if (!result.IsString()) { - rapidjson::StringBuffer buffer; - rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); - result.Accept(writer); - col.insert_data(buffer.GetString(), buffer.GetSize()); - return; - } - col.insert_data(result.GetString(), result.GetStringLength()); -} - -} // namespace vectorized -} // namespace doris \ No newline at end of file diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h b/be/src/vec/data_types/serde/data_type_string_serde.h index b2fbcf5a81b..c795d57b800 100644 --- a/be/src/vec/data_types/serde/data_type_string_serde.h +++ b/be/src/vec/data_types/serde/data_type_string_serde.h @@ -17,11 +17,20 @@ #pragma once -#include <stdint.h> +#include <arrow/array/array_base.h> +#include <arrow/array/array_binary.h> +#include <arrow/array/builder_base.h> +#include <arrow/array/builder_binary.h> +#include <rapidjson/stringbuffer.h> +#include <rapidjson/writer.h> #include "common/status.h" #include "data_type_serde.h" #include "util/jsonb_writer.h" +#include "vec/columns/column_const.h" +#include "vec/columns/column_fixed_length_object.h" +#include "vec/columns/column_string.h" +#include "vec/core/types.h" namespace doris { class PValues; @@ -31,55 +40,241 @@ namespace vectorized { class IColumn; class Arena; -class DataTypeStringSerDe : public DataTypeSerDe { +inline void escape_string(const char* src, size_t& len, char escape_char) { + const char* start = src; + char* dest_ptr = const_cast<char*>(src); + const char* end = src + len; + bool escape_next_char = false; + + while (src < end) { + if (*src == escape_char) { + escape_next_char = !escape_next_char; + } else { + escape_next_char = false; + } + + if (escape_next_char) { + ++src; + } else { + *dest_ptr++ = *src++; + } + } + + len = dest_ptr - start; +} + +template <typename ColumnType> +class DataTypeStringSerDeBase : public DataTypeSerDe { public: - DataTypeStringSerDe(int nesting_level = 1) : DataTypeSerDe(nesting_level) {}; + DataTypeStringSerDeBase(int nesting_level = 1) : DataTypeSerDe(nesting_level) {}; Status serialize_one_cell_to_json(const IColumn& column, int row_num, BufferWritable& bw, - FormatOptions& options) const override; + FormatOptions& options) const override { + auto result = check_column_const_set_readability(column, row_num); + ColumnPtr ptr = result.first; + row_num = result.second; + + if (_nesting_level > 1) { + bw.write('"'); + } + + const auto& value = assert_cast<const ColumnType&>(*ptr).get_data_at(row_num); + bw.write(value.data, value.size); + if (_nesting_level > 1) { + bw.write('"'); + } + return Status::OK(); + } Status serialize_column_to_json(const IColumn& column, int start_idx, int end_idx, - BufferWritable& bw, FormatOptions& options) const override; + BufferWritable& bw, FormatOptions& options) const override { + SERIALIZE_COLUMN_TO_JSON(); + } Status deserialize_one_cell_from_json(IColumn& column, Slice& slice, - const FormatOptions& options) const override; + const FormatOptions& options) const override { + /* + * For strings in the json complex type, we remove double quotes by default. + * + * Because when querying complex types, such as selecting complexColumn from table, + * we will add double quotes to the strings in the complex type. + * + * For the map<string,int> column, insert { "abc" : 1, "hello",2 }. + * If you do not remove the double quotes, it will display {""abc"":1,""hello"": 2 }, + * remove the double quotes to display { "abc" : 1, "hello",2 }. + * + */ + if (_nesting_level >= 2) { + slice.trim_quote(); + } + if (options.escape_char != 0) { + escape_string(slice.data, slice.size, options.escape_char); + } + assert_cast<ColumnType&>(column).insert_data(slice.data, slice.size); + return Status::OK(); + } Status deserialize_column_from_json_vector(IColumn& column, std::vector<Slice>& slices, int* num_deserialized, - const FormatOptions& options) const override; + const FormatOptions& options) const override { + DESERIALIZE_COLUMN_FROM_JSON_VECTOR() + return Status::OK(); + } Status write_column_to_pb(const IColumn& column, PValues& result, int start, - int end) const override; - Status read_column_from_pb(IColumn& column, const PValues& arg) const override; + int end) const override { + result.mutable_bytes_value()->Reserve(end - start); + auto* ptype = result.mutable_type(); + ptype->set_id(PGenericType::STRING); + for (size_t row_num = start; row_num < end; ++row_num) { + StringRef data = column.get_data_at(row_num); + result.add_string_value(data.to_string()); + } + return Status::OK(); + } + Status read_column_from_pb(IColumn& column, const PValues& arg) const override { + column.reserve(arg.string_value_size()); + for (int i = 0; i < arg.string_value_size(); ++i) { + assert_cast<ColumnType&>(column).insert_data(arg.string_value(i).c_str(), + arg.string_value(i).size()); + } + return Status::OK(); + } void write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, Arena* mem_pool, - int32_t col_id, int row_num) const override; + int32_t col_id, int row_num) const override { + result.writeKey(col_id); + const auto& data_ref = column.get_data_at(row_num); + result.writeStartBinary(); + result.writeBinary(reinterpret_cast<const char*>(data_ref.data), data_ref.size); + result.writeEndBinary(); + } - void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override; + void read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const override { + assert(arg->isBinary()); + const auto* blob = static_cast<const JsonbBlobVal*>(arg); + assert_cast<ColumnType&>(column).insert_data(blob->getBlob(), blob->getBlobLen()); + } void write_column_to_arrow(const IColumn& column, const NullMap* null_map, arrow::ArrayBuilder* array_builder, int start, - int end) const override; + int end) const override { + const auto& string_column = assert_cast<const ColumnType&>(column); + auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder); + for (size_t string_i = start; string_i < end; ++string_i) { + if (null_map && (*null_map)[string_i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + continue; + } + auto string_ref = string_column.get_data_at(string_i); + checkArrowStatus(builder.Append(string_ref.data, string_ref.size), column.get_name(), + array_builder->type()->name()); + } + } void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, - int end, const cctz::time_zone& ctz) const override; + int end, const cctz::time_zone& ctz) const override { + if (arrow_array->type_id() == arrow::Type::STRING || + arrow_array->type_id() == arrow::Type::BINARY) { + const auto* concrete_array = dynamic_cast<const arrow::BinaryArray*>(arrow_array); + std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data(); + + for (size_t offset_i = start; offset_i < end; ++offset_i) { + if (!concrete_array->IsNull(offset_i)) { + const auto* raw_data = buffer->data() + concrete_array->value_offset(offset_i); + assert_cast<ColumnType&>(column).insert_data( + (char*)raw_data, concrete_array->value_length(offset_i)); + } else { + assert_cast<ColumnType&>(column).insert_default(); + } + } + } else if (arrow_array->type_id() == arrow::Type::FIXED_SIZE_BINARY) { + const auto* concrete_array = + dynamic_cast<const arrow::FixedSizeBinaryArray*>(arrow_array); + uint32_t width = concrete_array->byte_width(); + const auto* array_data = concrete_array->GetValue(start); + + for (size_t offset_i = 0; offset_i < end - start; ++offset_i) { + if (!concrete_array->IsNull(offset_i)) { + const auto* raw_data = array_data + (offset_i * width); + assert_cast<ColumnType&>(column).insert_data((char*)raw_data, width); + } else { + assert_cast<ColumnType&>(column).insert_default(); + } + } + } + } Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer, - int row_idx, bool col_const) const override; + int row_idx, bool col_const) const override { + return _write_column_to_mysql(column, row_buffer, row_idx, col_const); + } Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer, - int row_idx, bool col_const) const override; + int row_idx, bool col_const) const override { + return _write_column_to_mysql(column, row_buffer, row_idx, col_const); + } Status write_column_to_orc(const std::string& timezone, const IColumn& column, const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch, int start, int end, - std::vector<StringRef>& buffer_list) const override; + std::vector<StringRef>& buffer_list) const override { + auto* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch); + + for (size_t row_id = start; row_id < end; row_id++) { + const auto& ele = assert_cast<const ColumnType&>(column).get_data_at(row_id); + cur_batch->data[row_id] = const_cast<char*>(ele.data); + cur_batch->length[row_id] = ele.size; + } + + cur_batch->numElements = end - start; + return Status::OK(); + } void write_one_cell_to_json(const IColumn& column, rapidjson::Value& result, rapidjson::Document::AllocatorType& allocator, - int row_num) const override; - void read_one_cell_from_json(IColumn& column, const rapidjson::Value& result) const override; + int row_num) const override { + const auto& col = assert_cast<const ColumnType&>(column); + const auto& data_ref = col.get_data_at(row_num); + result.SetString(data_ref.data, data_ref.size); + } + void read_one_cell_from_json(IColumn& column, const rapidjson::Value& result) const override { + auto& col = assert_cast<ColumnType&>(column); + if (!result.IsString()) { + rapidjson::StringBuffer buffer; + rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); + result.Accept(writer); + col.insert_data(buffer.GetString(), buffer.GetSize()); + return; + } + col.insert_data(result.GetString(), result.GetStringLength()); + } private: template <bool is_binary_format> Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result, - int row_idx, bool col_const) const; + int row_idx, bool col_const) const { + const auto col_index = index_check_const(row_idx, col_const); + const auto string_val = assert_cast<const ColumnType&>(column).get_data_at(col_index); + if (string_val.data == nullptr) { + if (string_val.size == 0) { + // 0x01 is a magic num, not useful actually, just for present "" + char* tmp_val = reinterpret_cast<char*>(0x01); + if (UNLIKELY(0 != result.push_string(tmp_val, string_val.size))) { + return Status::InternalError("pack mysql buffer failed."); + } + } else { + if (UNLIKELY(0 != result.push_null())) { + return Status::InternalError("pack mysql buffer failed."); + } + } + } else { + if (UNLIKELY(0 != result.push_string(string_val.data, string_val.size))) { + return Status::InternalError("pack mysql buffer failed."); + } + } + return Status::OK(); + } }; + +using DataTypeStringSerDe = DataTypeStringSerDeBase<ColumnString>; +using DataTypeFixedLengthObjectSerDe = DataTypeStringSerDeBase<ColumnFixedLengthObject>; } // namespace vectorized } // namespace doris diff --git a/regression-test/data/datatype_p0/agg_state/avg/test_agg_state_avg.out b/regression-test/data/datatype_p0/agg_state/avg/test_agg_state_avg.out new file mode 100644 index 00000000000..5b464e1e3f2 --- /dev/null +++ b/regression-test/data/datatype_p0/agg_state/avg/test_agg_state_avg.out @@ -0,0 +1,14 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +0 499.5 +1 1499.5 +2 2499.5 +3 3499.5 +4 4499.5 +5 5499.5 +6 6499.5 +7 7499.5 + +-- !select -- +3999.5 + diff --git a/regression-test/suites/datatype_p0/agg_state/quantile_union/test_agg_state_quantile_union.groovy b/regression-test/suites/datatype_p0/agg_state/avg/test_agg_state_avg.groovy similarity index 74% copy from regression-test/suites/datatype_p0/agg_state/quantile_union/test_agg_state_quantile_union.groovy copy to regression-test/suites/datatype_p0/agg_state/avg/test_agg_state_avg.groovy index eafccc64917..977f855f52d 100644 --- a/regression-test/suites/datatype_p0/agg_state/quantile_union/test_agg_state_quantile_union.groovy +++ b/regression-test/suites/datatype_p0/agg_state/avg/test_agg_state_avg.groovy @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -suite("test_agg_state_quantile_union") { +suite("test_agg_state_avg") { sql "set global enable_agg_state=true" sql """ DROP TABLE IF EXISTS a_table; """ sql """ create table a_table( k1 int not null, - k2 agg_state quantile_union(quantile_state not null) + k2 agg_state avg(int not null) ) aggregate key (k1) distributed BY hash(k1) @@ -29,13 +29,16 @@ suite("test_agg_state_quantile_union") { """ sql """insert into a_table - select e1/1000,quantile_union_state(TO_QUANTILE_STATE(e1, 2048)) from + select e1/1000,avg_state(e1) from (select 1 k1) as t lateral view explode_numbers(8000) tmp1 as e1;""" sql"set enable_nereids_planner=true;" - qt_select """ select k1,quantile_percent(quantile_union_merge(k2),0.5) from a_table group by k1 order by k1; + qt_select """ select k1,avg_merge(k2) from a_table group by k1 order by k1; """ - qt_select """ select quantile_percent(quantile_union_merge(tmp),0.5) from (select k1,quantile_union_union(k2) tmp from a_table group by k1)t; + qt_select """ select avg_merge(tmp) from (select k1,avg_union(k2) tmp from a_table group by k1)t; """ + test { + sql "select * from a_table;" + } } diff --git a/regression-test/suites/datatype_p0/agg_state/quantile_union/test_agg_state_quantile_union.groovy b/regression-test/suites/datatype_p0/agg_state/quantile_union/test_agg_state_quantile_union.groovy index eafccc64917..92f9f80a38d 100644 --- a/regression-test/suites/datatype_p0/agg_state/quantile_union/test_agg_state_quantile_union.groovy +++ b/regression-test/suites/datatype_p0/agg_state/quantile_union/test_agg_state_quantile_union.groovy @@ -38,4 +38,7 @@ suite("test_agg_state_quantile_union") { """ qt_select """ select quantile_percent(quantile_union_merge(tmp),0.5) from (select k1,quantile_union_union(k2) tmp from a_table group by k1)t; """ + test { + sql "select * from a_table;" + } } diff --git a/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy b/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy index 0b05d4312f9..19443191588 100644 --- a/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy +++ b/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy @@ -71,7 +71,6 @@ suite("test_agg_state") { test { sql "select avg_state(1) from d_table;" - exception "write_column_to_pb with type ColumnFixedLengthObject" } qt_ndv """select ndv_merge(t) from (select ndv_union(ndv_state(1)) as t from d_table group by k1)p;""" --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
