This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f18a4b749fcfdd8b5e9f4ac3e130ffa86380227b Author: lihangyu <[email protected]> AuthorDate: Sat Mar 14 01:18:50 2026 +0800 branch-4.1 cherry-pick [feat](format) support native format (#61286) cherry-pick #58711 --- be/src/service/internal_service.cpp | 6 + be/src/vec/exec/format/native/native_format.h | 57 + be/src/vec/exec/format/native/native_reader.cpp | 369 ++++++ be/src/vec/exec/format/native/native_reader.h | 107 ++ be/src/vec/exec/scan/file_scanner.cpp | 17 +- be/src/vec/functions/cast/cast_to_jsonb.h | 2 +- be/src/vec/functions/cast/cast_to_variant.h | 180 +-- be/src/vec/runtime/vnative_transformer.cpp | 133 ++ be/src/vec/runtime/vnative_transformer.h | 65 + be/src/vec/sink/writer/vfile_result_writer.cpp | 9 + .../data/vec/native/all_types_single_row.native | Bin 0 -> 1129 bytes .../format/native/native_reader_writer_test.cpp | 1350 ++++++++++++++++++++ .../doris/common/util/FileFormatConstants.java | 1 + .../java/org/apache/doris/common/util/Util.java | 3 + .../property/fileformat/FileFormatProperties.java | 3 + .../fileformat/NativeFileFormatProperties.java | 65 + .../nereids/load/NereidsLoadScanProvider.java | 5 +- .../ExternalFileTableValuedFunction.java | 14 +- gensrc/thrift/PlanNodes.thrift | 3 +- .../outfile/native/test_outfile_native.out | 25 + .../outfile/native/test_outfile_native.groovy | 100 ++ .../test_export_variant_10k_columns.groovy | 215 ++++ 22 files changed, 2646 insertions(+), 83 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index bbd86d9c56c..62ed4aec19c 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -123,6 +123,7 @@ #include "vec/exec/format/csv/csv_reader.h" #include "vec/exec/format/generic_reader.h" #include "vec/exec/format/json/new_json_reader.h" +#include "vec/exec/format/native/native_reader.h" #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/exec/format/text/text_reader.h" @@ -856,6 +857,11 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr reader = vectorized::OrcReader::create_unique(params, range, "", &io_ctx); break; } + case TFileFormatType::FORMAT_NATIVE: { + reader = vectorized::NativeReader::create_unique(profile.get(), params, range, &io_ctx, + nullptr); + break; + } case TFileFormatType::FORMAT_JSON: { reader = vectorized::NewJsonReader::create_unique(profile.get(), params, range, file_slots, &io_ctx); diff --git a/be/src/vec/exec/format/native/native_format.h b/be/src/vec/exec/format/native/native_format.h new file mode 100644 index 00000000000..e004a3c0f31 --- /dev/null +++ b/be/src/vec/exec/format/native/native_format.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> + +namespace doris::vectorized { + +// Doris Native format file-level constants. +// +// File layout (byte stream): +// +// +-------------------------------+---------------------------+---------------------------+ ... +// | File header | Data block #0 | Data block #1 | ... +// +-------------------------------+---------------------------+---------------------------+ ... +// +// File header (12 bytes total): +// - [0..7] : magic bytes "DORISN1\0" (DORIS_NATIVE_MAGIC) +// - [8..11] : uint32_t format_version (DORIS_NATIVE_FORMAT_VERSION, little-endian) +// +// Each data block i: +// - uint64_t block_size : length in bytes of serialized PBlock (little-endian) +// - uint8_t[block_size] : PBlock protobuf payload produced by Block::serialize() +// +// NativeReader: +// - Detects the optional file header by checking the first 8 bytes against DORIS_NATIVE_MAGIC. +// - If the header is present, it skips 12 bytes and then starts reading blocks as +// [uint64_t block_size][PBlock bytes]... +// - If the header is absent (legacy files), it starts reading blocks from offset 0. +// +// VNativeTransformer: +// - Writes the header once in open(), then appends each block in write() as +// [uint64_t block_size][PBlock bytes]... +// +// These constants are shared between writer, reader and tests to keep the on-disk +// format definition in a single place. +// Header layout: +// [magic bytes "DORISN1\0"][uint32_t format_version] +static constexpr char DORIS_NATIVE_MAGIC[8] = {'D', 'O', 'R', 'I', 'S', 'N', '1', '\0'}; +static constexpr uint32_t DORIS_NATIVE_FORMAT_VERSION = 1; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/native/native_reader.cpp b/be/src/vec/exec/format/native/native_reader.cpp new file mode 100644 index 00000000000..66cfe3a9f91 --- /dev/null +++ b/be/src/vec/exec/format/native/native_reader.cpp @@ -0,0 +1,369 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/native/native_reader.h" + +#include <gen_cpp/data.pb.h> + +#include "io/file_factory.h" +#include "io/fs/buffered_reader.h" +#include "io/fs/file_reader.h" +#include "io/fs/tracing_file_reader.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/exec/format/native/native_format.h" + +namespace doris::vectorized { + +#include "common/compile_check_begin.h" + +NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx, RuntimeState* state) + : _profile(profile), + _scan_params(params), + _scan_range(range), + _io_ctx(io_ctx), + _state(state) {} + +NativeReader::~NativeReader() { + (void)close(); +} + +namespace { + +Status validate_and_consume_header(io::FileReaderSPtr file_reader, const TFileRangeDesc& range, + int64_t* file_size, int64_t* current_offset, bool* eof) { + *file_size = file_reader->size(); + *current_offset = 0; + *eof = (*file_size == 0); + + // Validate and consume Doris Native file header. + // Expected layout: + // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t block_size]... + static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t); + if (*eof || *file_size < static_cast<int64_t>(HEADER_SIZE)) { + return Status::InternalError( + "invalid Doris Native file {}, file size {} is smaller than header size {}", + range.path, *file_size, HEADER_SIZE); + } + + char header[HEADER_SIZE]; + Slice header_slice(header, sizeof(header)); + size_t bytes_read = 0; + RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read)); + if (bytes_read != sizeof(header)) { + return Status::InternalError( + "failed to read Doris Native header from file {}, expect {} bytes, got {} bytes", + range.path, sizeof(header), bytes_read); + } + + if (memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) { + return Status::InternalError("invalid Doris Native magic header in file {}", range.path); + } + + uint32_t version = 0; + memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t)); + if (version != DORIS_NATIVE_FORMAT_VERSION) { + return Status::InternalError( + "unsupported Doris Native format version {} in file {}, expect {}", version, + range.path, DORIS_NATIVE_FORMAT_VERSION); + } + + *current_offset = sizeof(header); + *eof = (*file_size == *current_offset); + return Status::OK(); +} + +} // namespace + +Status NativeReader::init_reader() { + if (_file_reader != nullptr) { + return Status::OK(); + } + + // Create underlying file reader. For now we always use random access mode. + io::FileSystemProperties system_properties; + io::FileDescription file_description; + file_description.file_size = -1; + if (_scan_range.__isset.file_size) { + file_description.file_size = _scan_range.file_size; + } + file_description.path = _scan_range.path; + if (_scan_range.__isset.fs_name) { + file_description.fs_name = _scan_range.fs_name; + } + if (_scan_range.__isset.modification_time) { + file_description.mtime = _scan_range.modification_time; + } else { + file_description.mtime = 0; + } + + if (_scan_range.__isset.file_type) { + // For compatibility with older FE. + system_properties.system_type = _scan_range.file_type; + } else { + system_properties.system_type = _scan_params.file_type; + } + system_properties.properties = _scan_params.properties; + system_properties.hdfs_params = _scan_params.hdfs_params; + if (_scan_params.__isset.broker_addresses) { + system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(), + _scan_params.broker_addresses.end()); + } + + io::FileReaderOptions reader_options = + FileFactory::get_reader_options(_state, file_description); + auto reader_res = io::DelegateReader::create_file_reader( + _profile, system_properties, file_description, reader_options, + io::DelegateReader::AccessMode::RANDOM, _io_ctx); + if (!reader_res.has_value()) { + return reader_res.error(); + } + _file_reader = reader_res.value(); + + if (_io_ctx) { + _file_reader = + std::make_shared<io::TracingFileReader>(_file_reader, _io_ctx->file_reader_stats); + } + + RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range, &_file_size, + &_current_offset, &_eof)); + return Status::OK(); +} + +Status NativeReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_eof) { + *read_rows = 0; + *eof = true; + return Status::OK(); + } + + RETURN_IF_ERROR(init_reader()); + + std::string buff; + bool local_eof = false; + + // If we have already loaded the first block for schema probing, use it first. + if (_first_block_loaded && !_first_block_consumed) { + buff = _first_block_buf; + local_eof = false; + } else { + RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof)); + } + + // If we reach EOF and also read no data for this call, the whole file is considered finished. + if (local_eof && buff.empty()) { + *read_rows = 0; + *eof = true; + _eof = true; + return Status::OK(); + } + // If buffer is empty but we have not reached EOF yet, treat this as an error. + if (buff.empty()) { + return Status::InternalError("read empty native block from file {}", _scan_range.path); + } + + PBlock pblock; + if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) { + return Status::InternalError("Failed to parse native PBlock from file {}", + _scan_range.path); + } + + // Initialize schema from first block if not done yet. + if (!_schema_inited) { + RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); + } + + size_t uncompressed_bytes = 0; + int64_t decompress_time = 0; + RETURN_IF_ERROR(block->deserialize(pblock, &uncompressed_bytes, &decompress_time)); + + // For external file scan / TVF scenarios, unify all columns as nullable to match + // GenericReader/SlotDescriptor convention. This ensures schema consistency when + // some writers emit non-nullable columns. + for (size_t i = 0; i < block->columns(); ++i) { + auto& col_with_type = block->get_by_position(i); + if (!col_with_type.type->is_nullable()) { + col_with_type.column = make_nullable(col_with_type.column); + col_with_type.type = make_nullable(col_with_type.type); + } + } + + *read_rows = block->rows(); + *eof = false; + + if (_first_block_loaded && !_first_block_consumed) { + _first_block_consumed = true; + } + + // If we reached the physical end of file, mark eof for subsequent calls. + if (_current_offset >= _file_size) { + _eof = true; + } + + return Status::OK(); +} + +Status NativeReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, + std::unordered_set<std::string>* missing_cols) { + missing_cols->clear(); + RETURN_IF_ERROR(init_reader()); + + if (!_schema_inited) { + // Load first block lazily to initialize schema. + if (!_first_block_loaded) { + bool local_eof = false; + RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof)); + // Treat file as empty only if we reach EOF and there is no block data at all. + if (local_eof && _first_block_buf.empty()) { + return Status::EndOfFile("empty native file {}", _scan_range.path); + } + // Non-EOF but empty buffer means corrupted native file. + if (_first_block_buf.empty()) { + return Status::InternalError("first native block is empty {}", _scan_range.path); + } + _first_block_loaded = true; + } + + PBlock pblock; + if (!pblock.ParseFromArray(_first_block_buf.data(), + static_cast<int>(_first_block_buf.size()))) { + return Status::InternalError("Failed to parse native PBlock for schema from file {}", + _scan_range.path); + } + RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); + } + + for (size_t i = 0; i < _schema_col_names.size(); ++i) { + name_to_type->emplace(_schema_col_names[i], _schema_col_types[i]); + } + return Status::OK(); +} + +Status NativeReader::init_schema_reader() { + RETURN_IF_ERROR(init_reader()); + return Status::OK(); +} + +Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names, + std::vector<DataTypePtr>* col_types) { + RETURN_IF_ERROR(init_reader()); + + if (!_schema_inited) { + if (!_first_block_loaded) { + bool local_eof = false; + RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof)); + // Treat file as empty only if we reach EOF and there is no block data at all. + if (local_eof && _first_block_buf.empty()) { + return Status::EndOfFile("empty native file {}", _scan_range.path); + } + // Non-EOF but empty buffer means corrupted native file. + if (_first_block_buf.empty()) { + return Status::InternalError("first native block is empty {}", _scan_range.path); + } + _first_block_loaded = true; + } + + PBlock pblock; + if (!pblock.ParseFromArray(_first_block_buf.data(), + static_cast<int>(_first_block_buf.size()))) { + return Status::InternalError("Failed to parse native PBlock for schema from file {}", + _scan_range.path); + } + RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); + } + + *col_names = _schema_col_names; + *col_types = _schema_col_types; + return Status::OK(); +} + +Status NativeReader::close() { + _file_reader.reset(); + return Status::OK(); +} + +Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) { + *eof = false; + buff->clear(); + + if (_file_reader == nullptr) { + RETURN_IF_ERROR(init_reader()); + } + + if (_current_offset >= _file_size) { + *eof = true; + return Status::OK(); + } + + uint64_t len = 0; + Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len)); + size_t bytes_read = 0; + RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice, &bytes_read)); + if (bytes_read == 0) { + *eof = true; + return Status::OK(); + } + if (bytes_read != sizeof(len)) { + return Status::InternalError( + "Failed to read native block length from file {}, expect {}, " + "actual {}", + _scan_range.path, sizeof(len), bytes_read); + } + + _current_offset += sizeof(len); + if (len == 0) { + // Empty block, nothing to read. + *eof = (_current_offset >= _file_size); + return Status::OK(); + } + + buff->assign(len, '\0'); + Slice data_slice(buff->data(), len); + bytes_read = 0; + RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice, &bytes_read)); + if (bytes_read != len) { + return Status::InternalError( + "Failed to read native block body from file {}, expect {}, " + "actual {}", + _scan_range.path, len, bytes_read); + } + + _current_offset += len; + *eof = (_current_offset >= _file_size); + return Status::OK(); +} + +Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) { + _schema_col_names.clear(); + _schema_col_types.clear(); + + for (const auto& pcol_meta : pblock.column_metas()) { + DataTypePtr type = make_nullable(DataTypeFactory::instance().create_data_type(pcol_meta)); + VLOG_DEBUG << "init_schema_from_pblock, name=" << pcol_meta.name() + << ", type=" << type->get_name(); + _schema_col_names.emplace_back(pcol_meta.name()); + _schema_col_types.emplace_back(type); + } + _schema_inited = true; + return Status::OK(); +} + +#include "common/compile_check_end.h" + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/native/native_reader.h b/be/src/vec/exec/format/native/native_reader.h new file mode 100644 index 00000000000..bc1635e0ccb --- /dev/null +++ b/be/src/vec/exec/format/native/native_reader.h @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <string> +#include <unordered_map> +#include <unordered_set> + +#include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" +#include "io/fs/file_reader_writer_fwd.h" +#include "vec/exec/format/generic_reader.h" + +namespace doris { +class RuntimeProfile; +class RuntimeState; + +namespace io { +struct IOContext; +} // namespace io +} // namespace doris + +namespace doris::vectorized { +class Block; + +#include "common/compile_check_begin.h" + +// Doris Native format reader. +// it will read a sequence of Blocks encoded in Doris Native binary format. +// +// NOTE: current implementation is just a skeleton and will be filled step by step. +class NativeReader : public GenericReader { +public: + ENABLE_FACTORY_CREATOR(NativeReader); + + NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx, RuntimeState* state); + + ~NativeReader() override; + + // Initialize underlying file reader and any format specific state. + Status init_reader(); + + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + + Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, + std::unordered_set<std::string>* missing_cols) override; + + Status init_schema_reader() override; + + Status get_parsed_schema(std::vector<std::string>* col_names, + std::vector<DataTypePtr>* col_types) override; + + Status close() override; + + bool count_read_rows() override { return true; } + +protected: + void _collect_profile_before_close() override {} + +private: + RuntimeProfile* _profile = nullptr; + const TFileScanRangeParams& _scan_params; + const TFileRangeDesc& _scan_range; + + io::FileReaderSPtr _file_reader; + io::IOContext* _io_ctx = nullptr; + RuntimeState* _state = nullptr; + + bool _eof = false; + + // Current read offset in the underlying file. + int64_t _current_offset = 0; + int64_t _file_size = 0; + + // Cached schema information from the first PBlock. + bool _schema_inited = false; + std::vector<std::string> _schema_col_names; + std::vector<DataTypePtr> _schema_col_types; + + // Cached first block (serialized) to allow schema probing before data scan. + std::string _first_block_buf; + bool _first_block_loaded = false; + bool _first_block_consumed = false; + + Status _read_next_pblock(std::string* buff, bool* eof); + Status _init_schema_from_pblock(const PBlock& pblock); +}; + +#include "common/compile_check_end.h" +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 5124f8ea631..37f580b25ff 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -60,6 +60,7 @@ #include "vec/exec/format/avro/avro_jni_reader.h" #include "vec/exec/format/csv/csv_reader.h" #include "vec/exec/format/json/new_json_reader.h" +#include "vec/exec/format/native/native_reader.h" #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/exec/format/table/hive_reader.h" @@ -599,10 +600,6 @@ Status FileScanner::_cast_to_input_block(Block* block) { // skip columns which does not exist in file continue; } - if (slot_desc->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { - // skip variant type - continue; - } auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]); auto return_type = slot_desc->get_data_type_ptr(); // remove nullable here, let the get_function decide whether nullable @@ -617,8 +614,10 @@ Status FileScanner::_cast_to_input_block(Block* block) { return_type->get_name()); } idx = _src_block_name_to_idx[slot_desc->col_name()]; + DCHECK(_state != nullptr); + auto ctx = FunctionContext::create_context(_state, {}, {}); RETURN_IF_ERROR( - func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size())); + func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size())); _src_block_ptr->get_by_position(idx).type = std::move(return_type); } return Status::OK(); @@ -1118,6 +1117,14 @@ Status FileScanner::_get_next_reader() { init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc); break; } + case TFileFormatType::FORMAT_NATIVE: { + auto reader = + NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state); + init_status = reader->init_reader(); + _cur_reader = std::move(reader); + need_to_get_parsed_schema = false; + break; + } case TFileFormatType::FORMAT_ARROW: { if (range.__isset.table_format_params && range.table_format_params.table_format_type == "remote_doris") { diff --git a/be/src/vec/functions/cast/cast_to_jsonb.h b/be/src/vec/functions/cast/cast_to_jsonb.h index 92e3d4954a9..bd3cd8a329b 100644 --- a/be/src/vec/functions/cast/cast_to_jsonb.h +++ b/be/src/vec/functions/cast/cast_to_jsonb.h @@ -222,7 +222,7 @@ struct ParseJsonbFromString { } }; -// create cresponding jsonb value with type to_type +// create corresponding jsonb value with type to_type // use jsonb writer to create jsonb value WrapperType create_cast_to_jsonb_wrapper(const DataTypePtr& from_type, const DataTypeJsonb& to_type, bool string_as_jsonb_string) { diff --git a/be/src/vec/functions/cast/cast_to_variant.h b/be/src/vec/functions/cast/cast_to_variant.h index 1593f9023de..1c45bb46597 100644 --- a/be/src/vec/functions/cast/cast_to_variant.h +++ b/be/src/vec/functions/cast/cast_to_variant.h @@ -20,86 +20,102 @@ #include "cast_base.h" #include "cast_to_string.h" #include "vec/data_types/data_type_variant.h" + namespace doris::vectorized::CastWrapper { -struct CastFromVariant { - static Status execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, - uint32_t result, size_t input_rows_count, - const NullMap::value_type* null_map = nullptr) { - auto& data_type_to = block.get_by_position(result).type; - const auto& col_with_type_and_name = block.get_by_position(arguments[0]); - const auto& col_from = col_with_type_and_name.column; - const auto& variant = assert_cast<const ColumnVariant&>(*col_from); - ColumnPtr col_to = data_type_to->create_column(); - if (!variant.is_finalized()) { - // ColumnVariant should be finalized before parsing, finalize maybe modify original column structure - variant.assume_mutable()->finalize(); - } - // It's important to convert as many elements as possible in this context. For instance, - // if the root of this variant column is a number column, converting it to a number column - // is acceptable. However, if the destination type is a string and root is none scalar root, then - // we should convert the entire tree to a string. - bool is_root_valuable = variant.is_scalar_variant() || - (!variant.is_null_root() && - variant.get_root_type()->get_primitive_type() != INVALID_TYPE && - !is_string_type(data_type_to->get_primitive_type()) && - data_type_to->get_primitive_type() != TYPE_JSONB); - if (is_root_valuable) { - ColumnPtr nested = variant.get_root(); - auto nested_from_type = variant.get_root_type(); - // DCHECK(nested_from_type->is_nullable()); - DCHECK(!data_type_to->is_nullable()); - auto new_context = context->clone(); +// shared implementation for casting from variant to arbitrary non-nullable target type +inline Status cast_from_variant_impl(FunctionContext* context, Block& block, + const ColumnNumbers& arguments, uint32_t result, + size_t input_rows_count, + const NullMap::value_type* /*null_map*/, + const DataTypePtr& data_type_to) { + const auto& col_with_type_and_name = block.get_by_position(arguments[0]); + const auto& col_from = col_with_type_and_name.column; + const auto& variant = assert_cast<const ColumnVariant&>(*col_from); + ColumnPtr col_to = data_type_to->create_column(); + + if (!variant.is_finalized()) { + // ColumnVariant should be finalized before parsing, finalize maybe modify original column structure + variant.assume_mutable()->finalize(); + } + + // It's important to convert as many elements as possible in this context. For instance, + // if the root of this variant column is a number column, converting it to a number column + // is acceptable. However, if the destination type is a string and root is none scalar root, then + // we should convert the entire tree to a string. + bool is_root_valuable = variant.is_scalar_variant() || + (!variant.is_null_root() && + variant.get_root_type()->get_primitive_type() != INVALID_TYPE && + !is_string_type(data_type_to->get_primitive_type()) && + data_type_to->get_primitive_type() != TYPE_JSONB); + + if (is_root_valuable) { + ColumnPtr nested = variant.get_root(); + auto nested_from_type = variant.get_root_type(); + // DCHECK(nested_from_type->is_nullable()); + DCHECK(!data_type_to->is_nullable()); + auto new_context = context == nullptr ? nullptr : context->clone(); + if (new_context != nullptr) { new_context->set_jsonb_string_as_string(true); // Strict mode is for user INSERT validation, not variant internal conversion. new_context->set_enable_strict_mode(false); - // dst type nullable has been removed, so we should remove the inner nullable of root column - auto wrapper = prepare_impl(new_context.get(), remove_nullable(nested_from_type), - data_type_to); - Block tmp_block {{remove_nullable(nested), remove_nullable(nested_from_type), ""}}; - tmp_block.insert({nullptr, data_type_to, ""}); - /// Perform the requested conversion. - Status st = wrapper(new_context.get(), tmp_block, {0}, 1, input_rows_count, nullptr); - if (!st.ok()) { - // Fill with default values, which is null - col_to->assume_mutable()->insert_many_defaults(input_rows_count); - col_to = make_nullable(col_to, true); - } else { - col_to = tmp_block.get_by_position(1).column; - // Note: here we should return the nullable result column - col_to = wrap_in_nullable( - col_to, Block({{nested, nested_from_type, ""}, {col_to, data_type_to, ""}}), - {0}, input_rows_count); - } + } + // dst type nullable has been removed, so we should remove the inner nullable of root column + auto wrapper = + prepare_impl(new_context.get(), remove_nullable(nested_from_type), data_type_to); + Block tmp_block {{remove_nullable(nested), remove_nullable(nested_from_type), ""}}; + tmp_block.insert({nullptr, data_type_to, ""}); + /// Perform the requested conversion. + Status st = wrapper(new_context.get(), tmp_block, {0}, 1, input_rows_count, nullptr); + if (!st.ok()) { + // Fill with default values, which is null + col_to->assume_mutable()->insert_many_defaults(input_rows_count); + col_to = make_nullable(col_to, true); } else { - if (variant.only_have_default_values()) { - col_to->assume_mutable()->insert_many_defaults(input_rows_count); - col_to = make_nullable(col_to, true); - } else if (is_string_type(data_type_to->get_primitive_type())) { - // serialize to string - return CastToStringFunction::execute_impl(context, block, arguments, result, - input_rows_count); - } else if (data_type_to->get_primitive_type() == TYPE_JSONB) { - // serialize to json by parsing - return cast_from_generic_to_jsonb(context, block, arguments, result, - input_rows_count); - } else if (!data_type_to->is_nullable() && - !is_string_type(data_type_to->get_primitive_type())) { - // other types - col_to->assume_mutable()->insert_many_defaults(input_rows_count); - col_to = make_nullable(col_to, true); - } else { - assert_cast<ColumnNullable&>(*col_to->assume_mutable()) - .insert_many_defaults(input_rows_count); - } + col_to = tmp_block.get_by_position(1).column; + // Note: here we should return the nullable result column + col_to = wrap_in_nullable( + col_to, Block({{nested, nested_from_type, ""}, {col_to, data_type_to, ""}}), + {0}, input_rows_count); } - if (col_to->size() != input_rows_count) { - return Status::InternalError("Unmatched row count {}, expected {}", col_to->size(), - input_rows_count); + } else { + if (variant.only_have_default_values()) { + col_to->assume_mutable()->insert_many_defaults(input_rows_count); + col_to = make_nullable(col_to, true); + } else if (is_string_type(data_type_to->get_primitive_type())) { + // serialize to string + return CastToStringFunction::execute_impl(context, block, arguments, result, + input_rows_count); + } else if (data_type_to->get_primitive_type() == TYPE_JSONB) { + // serialize to json by parsing + return cast_from_generic_to_jsonb(context, block, arguments, result, input_rows_count); + } else if (!data_type_to->is_nullable() && + !is_string_type(data_type_to->get_primitive_type())) { + // other types + col_to->assume_mutable()->insert_many_defaults(input_rows_count); + col_to = make_nullable(col_to, true); + } else { + assert_cast<ColumnNullable&>(*col_to->assume_mutable()) + .insert_many_defaults(input_rows_count); } + } - block.replace_by_position(result, std::move(col_to)); - return Status::OK(); + if (col_to->size() != input_rows_count) { + return Status::InternalError("Unmatched row count {}, expected {}", col_to->size(), + input_rows_count); + } + + block.replace_by_position(result, std::move(col_to)); + return Status::OK(); +} + +struct CastFromVariant { + static Status execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count, + const NullMap::value_type* null_map = nullptr) { + auto& data_type_to = block.get_by_position(result).type; + return cast_from_variant_impl(context, block, arguments, result, input_rows_count, null_map, + data_type_to); } }; @@ -119,16 +135,32 @@ struct CastToVariant { } }; -// create cresponding variant value to wrap from_type +// create corresponding variant value to wrap from_type WrapperType create_cast_to_variant_wrapper(const DataTypePtr& from_type, const DataTypeVariant& to_type) { + if (from_type->get_primitive_type() == TYPE_VARIANT) { + // variant_max_subcolumns_count is not equal + return create_unsupport_wrapper(from_type->get_name(), to_type.get_name()); + } return &CastToVariant::execute; } -// create cresponding type convert from variant +// create corresponding type convert from variant WrapperType create_cast_from_variant_wrapper(const DataTypeVariant& from_type, const DataTypePtr& to_type) { - return &CastFromVariant::execute; + if (to_type->get_primitive_type() == TYPE_VARIANT) { + // variant_max_subcolumns_count is not equal + return create_unsupport_wrapper(from_type.get_name(), to_type->get_name()); + } + // Capture explicit target type to make the cast independent from Block[result].type. + DataTypePtr captured_to_type = to_type; + return [captured_to_type](FunctionContext* context, Block& block, + const ColumnNumbers& arguments, uint32_t result, + size_t input_rows_count, + const NullMap::value_type* null_map) -> Status { + return cast_from_variant_impl(context, block, arguments, result, input_rows_count, null_map, + captured_to_type); + }; } } // namespace doris::vectorized::CastWrapper diff --git a/be/src/vec/runtime/vnative_transformer.cpp b/be/src/vec/runtime/vnative_transformer.cpp new file mode 100644 index 00000000000..578364eff22 --- /dev/null +++ b/be/src/vec/runtime/vnative_transformer.cpp @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/runtime/vnative_transformer.h" + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/data.pb.h> +#include <glog/logging.h> + +#include "agent/be_exec_version_manager.h" +#include "io/fs/file_writer.h" +#include "runtime/runtime_state.h" +#include "util/slice.h" +#include "vec/core/block.h" +#include "vec/exec/format/native/native_format.h" + +namespace doris::vectorized { + +#include "common/compile_check_begin.h" + +namespace { + +// Map high-level TFileCompressType to low-level segment_v2::CompressionTypePB. +segment_v2::CompressionTypePB to_local_compression_type(TFileCompressType::type type) { + using CT = segment_v2::CompressionTypePB; + switch (type) { + case TFileCompressType::GZ: + case TFileCompressType::ZLIB: + case TFileCompressType::DEFLATE: + return CT::ZLIB; + case TFileCompressType::LZ4FRAME: + case TFileCompressType::LZ4BLOCK: + return CT::LZ4; + case TFileCompressType::SNAPPYBLOCK: + return CT::SNAPPY; + case TFileCompressType::ZSTD: + return CT::ZSTD; + default: + return CT::ZSTD; + } +} + +} // namespace + +VNativeTransformer::VNativeTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, + const VExprContextSPtrs& output_vexpr_ctxs, + bool output_object_data, + TFileCompressType::type compress_type) + : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data), + _file_writer(file_writer), + _compression_type(to_local_compression_type(compress_type)) {} + +Status VNativeTransformer::open() { + // Write Doris Native file header: + // [magic bytes "DORISN1\0"][uint32_t format_version] + DCHECK(_file_writer != nullptr); + uint32_t version = DORIS_NATIVE_FORMAT_VERSION; + + Slice magic_slice(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)); + Slice version_slice(reinterpret_cast<char*>(&version), sizeof(uint32_t)); + + RETURN_IF_ERROR(_file_writer->append(magic_slice)); + RETURN_IF_ERROR(_file_writer->append(version_slice)); + + _written_len += sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t); + return Status::OK(); +} + +Status VNativeTransformer::write(const Block& block) { + if (block.rows() == 0) { + return Status::OK(); + } + + // Serialize Block into PBlock using existing vec serialization logic. + PBlock pblock; + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + int64_t compressed_time = 0; + + RETURN_IF_ERROR(block.serialize(BeExecVersionManager::get_newest_version(), &pblock, + &uncompressed_bytes, &compressed_bytes, &compressed_time, + _compression_type)); + + std::string buff; + if (!pblock.SerializeToString(&buff)) { + auto err = Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>( + "serialize native block error. block rows: {}", block.rows()); + return err; + } + + // Layout of Doris Native file: + // [uint64_t block_size][PBlock bytes]... + uint64_t len = buff.size(); + Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len)); + RETURN_IF_ERROR(_file_writer->append(len_slice)); + RETURN_IF_ERROR(_file_writer->append(buff)); + + _written_len += sizeof(len) + buff.size(); + _cur_written_rows += block.rows(); + + return Status::OK(); +} + +Status VNativeTransformer::close() { + // Close underlying FileWriter to ensure data is flushed to disk. + if (_file_writer != nullptr && _file_writer->state() != doris::io::FileWriter::State::CLOSED) { + RETURN_IF_ERROR(_file_writer->close()); + } + + return Status::OK(); +} + +int64_t VNativeTransformer::written_len() { + return _written_len; +} + +#include "common/compile_check_end.h" + +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vnative_transformer.h b/be/src/vec/runtime/vnative_transformer.h new file mode 100644 index 00000000000..fdf7ff46ac9 --- /dev/null +++ b/be/src/vec/runtime/vnative_transformer.h @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/PlanNodes_types.h> + +#include <cstdint> + +#include "common/status.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vfile_format_transformer.h" + +namespace doris::io { +class FileWriter; +} // namespace doris::io + +namespace doris::vectorized { +class Block; + +#include "common/compile_check_begin.h" + +// Doris Native format writer. +// It serializes vectorized Blocks into Doris Native binary format. +class VNativeTransformer final : public VFileFormatTransformer { +public: + // |compress_type| controls how the PBlock is compressed on disk (ZSTD, LZ4, etc). + // Defaults to ZSTD to preserve the previous behavior. + VNativeTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, + const VExprContextSPtrs& output_vexpr_ctxs, bool output_object_data, + TFileCompressType::type compress_type = TFileCompressType::ZSTD); + + ~VNativeTransformer() override = default; + + Status open() override; + + Status write(const Block& block) override; + + Status close() override; + + int64_t written_len() override; + +private: + doris::io::FileWriter* _file_writer; // not owned + int64_t _written_len = 0; + // Compression type used for Block::serialize (PBlock compression). + segment_v2::CompressionTypePB _compression_type {segment_v2::CompressionTypePB::ZSTD}; +}; + +#include "common/compile_check_end.h" +} // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index 0a2aee6c811..edec869ab36 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -58,6 +58,7 @@ #include "vec/exprs/vexpr_context.h" #include "vec/functions/cast/cast_to_string.h" #include "vec/runtime/vcsv_transformer.h" +#include "vec/runtime/vnative_transformer.h" #include "vec/runtime/vorc_transformer.h" #include "vec/runtime/vparquet_transformer.h" #include "vec/sink/vmysql_result_writer.h" @@ -146,6 +147,12 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) { _state, _file_writer_impl.get(), _vec_output_expr_ctxs, _file_opts->orc_schema, {}, _output_object_data, _file_opts->orc_compression_type)); break; + case TFileFormatType::FORMAT_NATIVE: + // Doris Native binary format writer with configurable compression. + _vfile_writer.reset(new VNativeTransformer(_state, _file_writer_impl.get(), + _vec_output_expr_ctxs, _output_object_data, + _file_opts->compression_type)); + break; default: return Status::InternalError("unsupported file format: {}", _file_opts->file_format); } @@ -203,6 +210,8 @@ std::string VFileResultWriter::_file_format_to_name() { return "parquet"; case TFileFormatType::FORMAT_ORC: return "orc"; + case TFileFormatType::FORMAT_NATIVE: + return "native"; default: return "unknown"; } diff --git a/be/test/data/vec/native/all_types_single_row.native b/be/test/data/vec/native/all_types_single_row.native new file mode 100644 index 00000000000..90f825153ad Binary files /dev/null and b/be/test/data/vec/native/all_types_single_row.native differ diff --git a/be/test/vec/exec/format/native/native_reader_writer_test.cpp b/be/test/vec/exec/format/native/native_reader_writer_test.cpp new file mode 100644 index 00000000000..323f94944b3 --- /dev/null +++ b/be/test/vec/exec/format/native/native_reader_writer_test.cpp @@ -0,0 +1,1350 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <memory> +#include <string> + +#include "common/config.h" +#include "io/fs/local_file_system.h" +#include "io/fs/path.h" +#include "runtime/define_primitive_type.h" +#include "runtime/runtime_state.h" +#include "util/jsonb_writer.h" +#include "util/uid_util.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_map.h" +#include "vec/columns/column_struct.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/field.h" +#include "vec/data_types/data_type_array.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/data_type_map.h" +#include "vec/data_types/data_type_struct.h" +#include "vec/data_types/data_type_variant.h" +#include "vec/data_types/serde/data_type_serde.h" +#include "vec/exec/format/native/native_format.h" +#include "vec/exec/format/native/native_reader.h" +#include "vec/runtime/vnative_transformer.h" + +namespace doris::vectorized { + +class NativeReaderWriterTest : public ::testing::Test {}; + +static void fill_primitive_columns(Block& block, size_t rows) { + DataTypePtr int_type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false)); + DataTypePtr str_type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false)); + + { + MutableColumnPtr col = int_type->create_column(); + for (size_t i = 0; i < rows; ++i) { + if (i % 3 == 0) { + // null + col->insert_default(); + } else { + // insert int value via Field to match column interface + auto field = + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i * 10)); + col->insert(field); + } + } + block.insert(ColumnWithTypeAndName(std::move(col), int_type, "int_col")); + } + + { + MutableColumnPtr col = str_type->create_column(); + for (size_t i = 0; i < rows; ++i) { + if (i % 4 == 0) { + col->insert_default(); + } else { + std::string v = "s" + std::to_string(i); + // insert varchar value via Field + auto field = Field::create_field<PrimitiveType::TYPE_VARCHAR>(v); + col->insert(field); + } + } + block.insert(ColumnWithTypeAndName(std::move(col), str_type, "str_col")); + } +} + +static void fill_array_column(Block& block, size_t rows) { + // array<int> + DataTypePtr arr_nested_type = DataTypeFactory::instance().create_data_type(TYPE_INT, false); + DataTypePtr arr_type = make_nullable(std::make_shared<DataTypeArray>(arr_nested_type)); + + { + MutableColumnPtr col = arr_type->create_column(); + auto* nullable_col = assert_cast<ColumnNullable*>(col.get()); + auto& nested = nullable_col->get_nested_column(); + auto& null_map = nullable_col->get_null_map_data(); + auto& array_col = assert_cast<ColumnArray&>(nested); + auto& offsets = array_col.get_offsets(); + auto& data = array_col.get_data(); + auto mutable_data = data.assume_mutable(); + + for (size_t i = 0; i < rows; ++i) { + if (i % 5 == 0) { + // null array + nullable_col->insert_default(); + } else { + // non-null array with 3 elements: [i, i+1, i+2] + null_map.push_back(0); + mutable_data->insert( + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i))); + mutable_data->insert( + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1))); + mutable_data->insert( + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 2))); + offsets.push_back(offsets.empty() ? 3 : offsets.back() + 3); + } + } + block.insert(ColumnWithTypeAndName(std::move(col), arr_type, "arr_col")); + } +} + +static void fill_map_column(Block& block, size_t rows) { + // map<string, int> + DataTypePtr map_key_type = DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false); + DataTypePtr map_value_type = DataTypeFactory::instance().create_data_type(TYPE_INT, false); + DataTypePtr map_type = + make_nullable(std::make_shared<DataTypeMap>(map_key_type, map_value_type)); + + // map<string, int> column + { + MutableColumnPtr col = map_type->create_column(); + auto* nullable_col = assert_cast<ColumnNullable*>(col.get()); + auto& nested = nullable_col->get_nested_column(); + auto& null_map = nullable_col->get_null_map_data(); + + for (size_t i = 0; i < rows; ++i) { + if (i % 7 == 0) { + // null map + nullable_col->insert_default(); + } else { + null_map.push_back(0); + auto& offsets = assert_cast<ColumnMap&>(nested).get_offsets(); + auto& keys = assert_cast<ColumnMap&>(nested).get_keys(); + auto& values = assert_cast<ColumnMap&>(nested).get_values(); + + auto mutable_keys = keys.assume_mutable(); + auto mutable_values = values.assume_mutable(); + + std::string k1 = "k" + std::to_string(i); + std::string k2 = "k" + std::to_string(i + 1); + mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k1)); + mutable_values->insert( + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i))); + mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k2)); + mutable_values->insert( + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1))); + + offsets.push_back(offsets.empty() ? 2 : offsets.back() + 2); + } + } + block.insert(ColumnWithTypeAndName(std::move(col), map_type, "map_col")); + } +} + +static void fill_struct_column(Block& block, size_t rows) { + // struct<si:int, ss:string> + DataTypes struct_fields; + struct_fields.emplace_back( + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false))); + struct_fields.emplace_back( + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false))); + DataTypePtr struct_type = make_nullable(std::make_shared<DataTypeStruct>(struct_fields)); + + // struct<si:int, ss:string> column + { + MutableColumnPtr col = struct_type->create_column(); + auto* nullable_col = assert_cast<ColumnNullable*>(col.get()); + auto& nested = nullable_col->get_nested_column(); + auto& null_map = nullable_col->get_null_map_data(); + + auto& struct_col = assert_cast<ColumnStruct&>(nested); + const auto& fields = struct_col.get_columns(); + + for (size_t i = 0; i < rows; ++i) { + if (i % 6 == 0) { + nullable_col->insert_default(); + } else { + null_map.push_back(0); + auto mutable_field0 = fields[0]->assume_mutable(); + auto mutable_field1 = fields[1]->assume_mutable(); + // int field + mutable_field0->insert(Field::create_field<PrimitiveType::TYPE_INT>( + static_cast<int32_t>(i * 100))); + // string field + std::string vs = "ss" + std::to_string(i); + mutable_field1->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(vs)); + } + } + block.insert(ColumnWithTypeAndName(std::move(col), struct_type, "struct_col")); + } +} + +static void fill_variant_column(Block& block, size_t rows) { + // variant + DataTypePtr variant_type = make_nullable(std::make_shared<DataTypeVariant>()); + + // variant column: use JSON strings + deserialize_column_from_json_vector to populate ColumnVariant + { + MutableColumnPtr col = variant_type->create_column(); + auto* nullable_col = assert_cast<ColumnNullable*>(col.get()); + auto& nested = nullable_col->get_nested_column(); // ColumnVariant + + // Prepare JSON strings with variable number of keys per row + std::vector<std::string> json_rows; + json_rows.reserve(rows); + std::vector<Slice> slices; + slices.reserve(rows); + + size_t key_cnt = 100; + for (size_t i = 0; i < rows; ++i) { + // key count cycles between 1, 2, 3, ... for better coverage + std::string json = "{"; + for (size_t k = 0; k < key_cnt; ++k) { + if (k > 0) { + json += ","; + } + // keys: "k0", "k1", ... + json += "\"k" + std::to_string(k) + "\":"; + // mix int and string values + if ((i + k) % 2 == 0) { + json += std::to_string(static_cast<int32_t>(i * 10 + k)); + } else { + json += "\"v" + std::to_string(i * 10 + k) + "\""; + } + } + json += "}"; + json_rows.emplace_back(std::move(json)); + slices.emplace_back(json_rows.back().data(), json_rows.back().size()); + } + + // Use Variant SerDe to parse JSON into ColumnVariant + auto variant_type_inner = std::make_shared<DataTypeVariant>(); + auto serde = variant_type_inner->get_serde(); + auto* variant_serde = assert_cast<DataTypeVariantSerDe*>(serde.get()); + + uint64_t num_deserialized = 0; + DataTypeSerDe::FormatOptions options; + Status st = variant_serde->deserialize_column_from_json_vector(nested, slices, + &num_deserialized, options); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(rows, num_deserialized); + + // All rows are treated as non-null here + auto& null_map = nullable_col->get_null_map_data(); + null_map.clear(); + null_map.resize_fill(rows, 0); + + block.insert(ColumnWithTypeAndName(std::move(col), variant_type, "variant_col")); + } +} + +static void fill_complex_columns(Block& block, size_t rows) { + fill_array_column(block, rows); + fill_map_column(block, rows); + fill_struct_column(block, rows); + fill_variant_column(block, rows); +} + +static Block create_test_block(size_t rows) { + Block block; + // simple schema with primitive + complex types: + // int_col, str_col, arr_col, map_col, struct_col, variant_col + fill_primitive_columns(block, rows); + fill_complex_columns(block, rows); + + return block; +} + +TEST_F(NativeReaderWriterTest, round_trip_native_file) { + // Prepare a temporary local file path + UniqueId uid = UniqueId::gen_uid(); + std::string uuid = uid.to_string(); + std::string file_path = "./native_format_" + uuid + ".native"; + + // 1. Write block to Native file via VNativeTransformer + auto fs = io::global_local_filesystem(); + io::FileWriterPtr file_writer; + Status st = fs->create_file(file_path, &file_writer); + ASSERT_TRUE(st.ok()) << st; + + RuntimeState state; + VExprContextSPtrs exprs; // empty, VNativeTransformer won't use it directly + VNativeTransformer transformer(&state, file_writer.get(), exprs, false); + st = transformer.open(); + ASSERT_TRUE(st.ok()) << st; + + Block src_block = create_test_block(16); + st = transformer.write(src_block); + ASSERT_TRUE(st.ok()) << st; + // VNativeTransformer::close() will also close the underlying FileWriter, + // so we don't need (and must not) close file_writer again here. + st = transformer.close(); + ASSERT_TRUE(st.ok()) << st; + + // 2. Read back via NativeReader using normal file scan path + TFileScanRangeParams scan_params; + scan_params.__set_file_type(TFileType::FILE_LOCAL); + TFileRangeDesc scan_range; + scan_range.__set_path(file_path); + scan_range.__set_file_type(TFileType::FILE_LOCAL); + NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state); + + Block dst_block; + size_t read_rows = 0; + bool eof = false; + st = reader_impl.get_next_block(&dst_block, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(src_block.rows(), read_rows); + ASSERT_EQ(src_block.columns(), dst_block.columns()); + + // Compare column-wise values + for (size_t col = 0; col < src_block.columns(); ++col) { + const auto& src = src_block.get_by_position(col); + const auto& dst = dst_block.get_by_position(col); + ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name()); + ASSERT_EQ(src.column->size(), dst.column->size()); + for (size_t row = 0; row < src_block.rows(); ++row) { + auto src_field = (*src.column)[row]; + auto dst_field = (*dst.column)[row]; + ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " row=" << row; + } + } + + // Next call should hit EOF + Block dst_block2; + read_rows = 0; + eof = false; + st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + ASSERT_TRUE(eof); + ASSERT_EQ(read_rows, 0); + + // Clean up temp file + bool exists = false; + st = fs->exists(file_path, &exists); + if (st.ok() && exists) { + static_cast<void>(fs->delete_file(file_path)); + } +} + +// Edge cases: empty file and single-row file +TEST_F(NativeReaderWriterTest, round_trip_empty_and_single_row) { + auto fs = io::global_local_filesystem(); + + // 1. Empty native file (no data blocks) + { + UniqueId uid = UniqueId::gen_uid(); + std::string uuid = uid.to_string(); + std::string file_path = "./native_format_empty_" + uuid + ".native"; + + io::FileWriterPtr file_writer; + Status st = fs->create_file(file_path, &file_writer); + ASSERT_TRUE(st.ok()) << st; + + RuntimeState state; + VExprContextSPtrs exprs; + VNativeTransformer transformer(&state, file_writer.get(), exprs, false); + st = transformer.open(); + ASSERT_TRUE(st.ok()) << st; + + // Write an empty block, should not produce any data block in file. + Block empty_block; + st = transformer.write(empty_block); + ASSERT_TRUE(st.ok()) << st; + st = transformer.close(); + ASSERT_TRUE(st.ok()) << st; + + // Read back: should directly hit EOF with 0 rows. + TFileScanRangeParams scan_params; + scan_params.__set_file_type(TFileType::FILE_LOCAL); + TFileRangeDesc scan_range; + scan_range.__set_path(file_path); + scan_range.__set_file_type(TFileType::FILE_LOCAL); + NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state); + + Block dst_block; + size_t read_rows = 0; + bool eof = false; + st = reader_impl.get_next_block(&dst_block, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(0U, read_rows); + ASSERT_TRUE(eof); + + bool exists = false; + st = fs->exists(file_path, &exists); + if (st.ok() && exists) { + static_cast<void>(fs->delete_file(file_path)); + } + } + + // 2. Single-row native file + { + UniqueId uid = UniqueId::gen_uid(); + std::string uuid = uid.to_string(); + std::string file_path = "./native_format_single_row_" + uuid + ".native"; + + io::FileWriterPtr file_writer; + Status st = fs->create_file(file_path, &file_writer); + ASSERT_TRUE(st.ok()) << st; + + RuntimeState state; + VExprContextSPtrs exprs; + VNativeTransformer transformer(&state, file_writer.get(), exprs, false); + st = transformer.open(); + ASSERT_TRUE(st.ok()) << st; + + Block src_block = create_test_block(1); + st = transformer.write(src_block); + ASSERT_TRUE(st.ok()) << st; + st = transformer.close(); + ASSERT_TRUE(st.ok()) << st; + + TFileScanRangeParams scan_params; + scan_params.__set_file_type(TFileType::FILE_LOCAL); + TFileRangeDesc scan_range; + scan_range.__set_path(file_path); + scan_range.__set_file_type(TFileType::FILE_LOCAL); + NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state); + + Block dst_block; + size_t read_rows = 0; + bool eof = false; + st = reader_impl.get_next_block(&dst_block, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(src_block.rows(), read_rows); + ASSERT_EQ(src_block.columns(), dst_block.columns()); + ASSERT_FALSE(eof); + + // Verify data equality for the single row. + for (size_t col = 0; col < src_block.columns(); ++col) { + const auto& src = src_block.get_by_position(col); + const auto& dst = dst_block.get_by_position(col); + ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name()); + ASSERT_EQ(src.column->size(), dst.column->size()); + auto src_field = (*src.column)[0]; + auto dst_field = (*dst.column)[0]; + ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " row=0"; + } + + // Next call should hit EOF + Block dst_block2; + read_rows = 0; + eof = false; + st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + ASSERT_TRUE(eof); + ASSERT_EQ(read_rows, 0U); + + bool exists = false; + st = fs->exists(file_path, &exists); + if (st.ok() && exists) { + static_cast<void>(fs->delete_file(file_path)); + } + } +} + +// Large volume test: verify round-trip correctness with many rows. +TEST_F(NativeReaderWriterTest, round_trip_native_file_large_rows) { + UniqueId uid = UniqueId::gen_uid(); + std::string uuid = uid.to_string(); + std::string file_path = "./native_format_large_" + uuid + ".native"; + + auto fs = io::global_local_filesystem(); + io::FileWriterPtr file_writer; + Status st = fs->create_file(file_path, &file_writer); + ASSERT_TRUE(st.ok()) << st; + + RuntimeState state; + VExprContextSPtrs exprs; + VNativeTransformer transformer(&state, file_writer.get(), exprs, false); + st = transformer.open(); + ASSERT_TRUE(st.ok()) << st; + + // Use a relatively large number of rows to test stability and performance characteristics. + const size_t kRows = 9999; + Block src_block = create_test_block(kRows); + + // Split into multiple blocks and write them one by one to exercise multi-block path. + const size_t kBatchRows = 128; + for (size_t offset = 0; offset < kRows; offset += kBatchRows) { + size_t len = std::min(kBatchRows, kRows - offset); + // Create a sub block with the same schema and a row range [offset, offset+len) + Block sub_block = src_block.clone_empty(); + for (size_t col = 0; col < src_block.columns(); ++col) { + const auto& src_col = *src_block.get_by_position(col).column; + const auto& dst_col_holder = *sub_block.get_by_position(col).column; + auto dst_mutable = dst_col_holder.assume_mutable(); + dst_mutable->insert_range_from(src_col, offset, len); + } + st = transformer.write(sub_block); + ASSERT_TRUE(st.ok()) << st; + } + st = transformer.close(); + ASSERT_TRUE(st.ok()) << st; + + TFileScanRangeParams scan_params; + scan_params.__set_file_type(TFileType::FILE_LOCAL); + TFileRangeDesc scan_range; + scan_range.__set_path(file_path); + scan_range.__set_file_type(TFileType::FILE_LOCAL); + NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state); + + // Read back in multiple blocks and merge into a single result block. + Block merged_block; + bool first_block = true; + size_t total_read_rows = 0; + while (true) { + Block dst_block; + size_t read_rows = 0; + bool eof = false; + st = reader_impl.get_next_block(&dst_block, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + if (read_rows > 0) { + if (first_block) { + merged_block = dst_block; + total_read_rows = read_rows; + first_block = false; + } else { + MutableBlock merged_mutable(&merged_block); + Status add_st = merged_mutable.add_rows(&dst_block, 0, read_rows); + ASSERT_TRUE(add_st.ok()) << add_st; + total_read_rows += read_rows; + } + } + if (eof) { + break; + } + if (read_rows == 0) { + break; + } + } + + ASSERT_EQ(src_block.rows(), total_read_rows); + ASSERT_EQ(src_block.columns(), merged_block.columns()); + + // Compare column-wise values + for (size_t col = 0; col < src_block.columns(); ++col) { + const auto& src = src_block.get_by_position(col); + const auto& dst = merged_block.get_by_position(col); + ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name()); + ASSERT_EQ(src.column->size(), dst.column->size()); + for (size_t row = 0; row < src_block.rows(); row += 10) { + auto src_field = (*src.column)[row]; + auto dst_field = (*dst.column)[row]; + ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " row=" << row; + } + } + bool exists = false; + st = fs->exists(file_path, &exists); + if (st.ok() && exists) { + static_cast<void>(fs->delete_file(file_path)); + } +} + +// Verify that NativeReader forces all columns to nullable, even if the writer +// serialized non-nullable columns. +TEST_F(NativeReaderWriterTest, non_nullable_columns_forced_nullable) { + auto fs = io::global_local_filesystem(); + + UniqueId uid = UniqueId::gen_uid(); + std::string uuid = uid.to_string(); + std::string file_path = "./native_format_non_nullable_" + uuid + ".native"; + + io::FileWriterPtr file_writer; + Status st = fs->create_file(file_path, &file_writer); + ASSERT_TRUE(st.ok()) << st; + + RuntimeState state; + VExprContextSPtrs exprs; + // Use default compression type. + VNativeTransformer transformer(&state, file_writer.get(), exprs, false); + st = transformer.open(); + ASSERT_TRUE(st.ok()) << st; + + // Build a block with non-nullable columns. + Block src_block; + DataTypePtr int_type = + DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, false); + { + MutableColumnPtr col = int_type->create_column(); + for (int i = 0; i < 8; ++i) { + auto field = Field::create_field<PrimitiveType::TYPE_INT>(i * 3); + col->insert(field); + } + src_block.insert(ColumnWithTypeAndName(std::move(col), int_type, "int_nn")); + } + + DataTypePtr str_type = + DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_VARCHAR, false); + { + MutableColumnPtr col = str_type->create_column(); + for (int i = 0; i < 8; ++i) { + std::string v = "v" + std::to_string(i); + auto field = Field::create_field<PrimitiveType::TYPE_VARCHAR>(v); + col->insert(field); + } + src_block.insert(ColumnWithTypeAndName(std::move(col), str_type, "str_nn")); + } + + st = transformer.write(src_block); + ASSERT_TRUE(st.ok()) << st; + st = transformer.close(); + ASSERT_TRUE(st.ok()) << st; + + // Read back via NativeReader. + TFileScanRangeParams scan_params; + scan_params.__set_file_type(TFileType::FILE_LOCAL); + TFileRangeDesc scan_range; + scan_range.__set_path(file_path); + scan_range.__set_file_type(TFileType::FILE_LOCAL); + NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state); + + Block dst_block; + size_t read_rows = 0; + bool eof = false; + st = reader_impl.get_next_block(&dst_block, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(src_block.rows(), read_rows); + ASSERT_FALSE(eof); + + // All columns returned by NativeReader should be nullable. + for (size_t col = 0; col < dst_block.columns(); ++col) { + const auto& dst = dst_block.get_by_position(col); + ASSERT_TRUE(dst.type->is_nullable()) << "column " << col << " should be nullable"; + } + + // Values should be preserved. + for (size_t col = 0; col < src_block.columns(); ++col) { + const auto& src = src_block.get_by_position(col); + const auto& dst = dst_block.get_by_position(col); + ASSERT_EQ(src.column->size(), dst.column->size()); + for (size_t row = 0; row < src_block.rows(); ++row) { + auto src_field = (*src.column)[row]; + auto dst_field = (*dst.column)[row]; + ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << " row=" << row; + } + } + + bool exists = false; + st = fs->exists(file_path, &exists); + if (st.ok() && exists) { + static_cast<void>(fs->delete_file(file_path)); + } +} + +// Verify that VNativeTransformer writes the native file header and that +// NativeReader can transparently read files with this header. +TEST_F(NativeReaderWriterTest, transformer_writes_header_and_reader_handles_it) { + auto fs = io::global_local_filesystem(); + + UniqueId uid = UniqueId::gen_uid(); + std::string uuid = uid.to_string(); + std::string file_path = "./native_format_with_header_" + uuid + ".native"; + + // Write a small block via VNativeTransformer. + io::FileWriterPtr file_writer; + Status st = fs->create_file(file_path, &file_writer); + ASSERT_TRUE(st.ok()) << st; + + RuntimeState state; + VExprContextSPtrs exprs; + VNativeTransformer transformer(&state, file_writer.get(), exprs, false); + st = transformer.open(); + ASSERT_TRUE(st.ok()) << st; + + Block src_block = create_test_block(4); + st = transformer.write(src_block); + ASSERT_TRUE(st.ok()) << st; + st = transformer.close(); + ASSERT_TRUE(st.ok()) << st; + + // Read back raw bytes and verify the header. + { + io::FileReaderSPtr file_reader; + st = fs->open_file(file_path, &file_reader); + ASSERT_TRUE(st.ok()) << st; + size_t file_size = file_reader->size(); + ASSERT_GE(file_size, sizeof(uint64_t) + 12); + + char header[12]; + Slice header_slice(header, sizeof(header)); + size_t bytes_read = 0; + st = file_reader->read_at(0, header_slice, &bytes_read); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(sizeof(header), bytes_read); + + ASSERT_EQ(0, memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC))); + uint32_t version = 0; + memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t)); + ASSERT_EQ(DORIS_NATIVE_FORMAT_VERSION, version); + } + + // Now read via NativeReader; it should detect the header and skip it. + TFileScanRangeParams scan_params; + scan_params.__set_file_type(TFileType::FILE_LOCAL); + TFileRangeDesc scan_range; + scan_range.__set_path(file_path); + scan_range.__set_file_type(TFileType::FILE_LOCAL); + NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state); + + Block dst_block; + size_t read_rows = 0; + bool eof = false; + st = reader_impl.get_next_block(&dst_block, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(src_block.rows(), read_rows); + ASSERT_FALSE(eof); + + bool exists = false; + st = fs->exists(file_path, &exists); + if (st.ok() && exists) { + static_cast<void>(fs->delete_file(file_path)); + } +} + +// Verify get_columns and get_parsed_schema can probe schema without scanning +// the whole file. +TEST_F(NativeReaderWriterTest, get_columns_and_parsed_schema) { + auto fs = io::global_local_filesystem(); + + UniqueId uid = UniqueId::gen_uid(); + std::string uuid = uid.to_string(); + std::string file_path = "./native_format_schema_" + uuid + ".native"; + + io::FileWriterPtr file_writer; + Status st = fs->create_file(file_path, &file_writer); + ASSERT_TRUE(st.ok()) << st; + + RuntimeState state; + VExprContextSPtrs exprs; + VNativeTransformer transformer(&state, file_writer.get(), exprs, false); + st = transformer.open(); + ASSERT_TRUE(st.ok()) << st; + + Block src_block = create_test_block(5); + st = transformer.write(src_block); + ASSERT_TRUE(st.ok()) << st; + st = transformer.close(); + ASSERT_TRUE(st.ok()) << st; + + TFileScanRangeParams scan_params; + scan_params.__set_file_type(TFileType::FILE_LOCAL); + TFileRangeDesc scan_range; + scan_range.__set_path(file_path); + scan_range.__set_file_type(TFileType::FILE_LOCAL); + NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state); + + std::unordered_map<std::string, DataTypePtr> name_to_type; + std::unordered_set<std::string> missing_cols; + st = reader_impl.get_columns(&name_to_type, &missing_cols); + ASSERT_TRUE(st.ok()) << st; + ASSERT_TRUE(missing_cols.empty()); + + // All columns from src_block should appear in name_to_type. + for (size_t i = 0; i < src_block.columns(); ++i) { + const auto& col = src_block.get_by_position(i); + auto it = name_to_type.find(col.name); + ASSERT_TRUE(it != name_to_type.end()) << "missing column " << col.name; + ASSERT_TRUE(it->second->is_nullable()) << "schema type should be nullable for " << col.name; + } + + std::vector<std::string> col_names; + std::vector<DataTypePtr> col_types; + st = reader_impl.get_parsed_schema(&col_names, &col_types); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(col_names.size(), col_types.size()); + ASSERT_EQ(col_names.size(), src_block.columns()); + + for (size_t i = 0; i < col_names.size(); ++i) { + ASSERT_TRUE(col_types[i]->is_nullable()); + } + + bool exists = false; + st = fs->exists(file_path, &exists); + if (st.ok() && exists) { + static_cast<void>(fs->delete_file(file_path)); + } +} + +// Create a test block containing all known primitive and complex types with a single row. +// This function covers: +// - Basic integers: BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT +// - Floating point: FLOAT, DOUBLE +// - Date/Time: DATE, DATETIME, DATEV2, DATETIMEV2, TIMEV2 +// - String types: CHAR, VARCHAR, STRING +// - Decimal types: DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128I, DECIMAL256 +// - IP types: IPV4, IPV6 +// - JSON: JSONB +// - Complex types: ARRAY, MAP, STRUCT, VARIANT +static Block create_all_types_test_block() { + Block block; + + // 1. BOOLEAN + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_BOOLEAN, false)); + MutableColumnPtr col = type->create_column(); + col->insert(Field::create_field<PrimitiveType::TYPE_BOOLEAN>(UInt8 {1})); // true + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_boolean")); + } + + // 2. TINYINT + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_TINYINT, false)); + MutableColumnPtr col = type->create_column(); + col->insert(Field::create_field<PrimitiveType::TYPE_TINYINT>(static_cast<int8_t>(42))); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_tinyint")); + } + + // 3. SMALLINT + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, false)); + MutableColumnPtr col = type->create_column(); + col->insert( + Field::create_field<PrimitiveType::TYPE_SMALLINT>(static_cast<int16_t>(1234))); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_smallint")); + } + + // 4. INT + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false)); + MutableColumnPtr col = type->create_column(); + col->insert(Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(123456))); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_int")); + } + + // 5. BIGINT + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_BIGINT, false)); + MutableColumnPtr col = type->create_column(); + col->insert(Field::create_field<PrimitiveType::TYPE_BIGINT>( + static_cast<int64_t>(9876543210LL))); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_bigint")); + } + + // 6. LARGEINT + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, false)); + MutableColumnPtr col = type->create_column(); + Int128 large_val = Int128(123456789012345LL) * 1000000; + col->insert(Field::create_field<PrimitiveType::TYPE_LARGEINT>(large_val)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_largeint")); + } + + // 7. FLOAT + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_FLOAT, false)); + MutableColumnPtr col = type->create_column(); + col->insert(Field::create_field<PrimitiveType::TYPE_FLOAT>(3.14F)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_float")); + } + + // 8. DOUBLE + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DOUBLE, false)); + MutableColumnPtr col = type->create_column(); + col->insert(Field::create_field<PrimitiveType::TYPE_DOUBLE>(1.234567890123456789)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_double")); + } + + // 9. DATE (DateV1) + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATE, false)); + MutableColumnPtr col = type->create_column(); + VecDateTimeValue dt; + dt.from_date_int64(20231215); // 2023-12-15 + col->insert(Field::create_field<PrimitiveType::TYPE_DATE>(dt)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_date")); + } + + // 10. DATETIME (DateTimeV1) + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATETIME, false)); + MutableColumnPtr col = type->create_column(); + VecDateTimeValue dt; + dt.from_date_int64(20231215103045LL); // 2023-12-15 10:30:45 + col->insert(Field::create_field<PrimitiveType::TYPE_DATETIME>(dt)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_datetime")); + } + + // 11. DATEV2 + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATEV2, false)); + MutableColumnPtr col = type->create_column(); + DateV2Value<DateV2ValueType> dv2; + dv2.from_date_int64(20231215); + col->insert(Field::create_field<PrimitiveType::TYPE_DATEV2>(dv2)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_datev2")); + } + + // 12. DATETIMEV2 (scale=0) + { + DataTypePtr type = make_nullable( + DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, false, 0, 0)); + MutableColumnPtr col = type->create_column(); + DateV2Value<DateTimeV2ValueType> dtv2; + dtv2.from_date_int64(20231215103045LL); + col->insert(Field::create_field<PrimitiveType::TYPE_DATETIMEV2>(dtv2)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_datetimev2")); + } + + // 13. TIMEV2 (scale=0) + { + DataTypePtr type = make_nullable( + DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, false, 0, 0)); + MutableColumnPtr col = type->create_column(); + // TIMEV2 is stored as Float64 representing seconds + col->insert(Field::create_field<PrimitiveType::TYPE_TIMEV2>( + 37845.0)); // 10:30:45 in seconds + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_timev2")); + } + + // 14. CHAR + { + DataTypePtr type = make_nullable( + DataTypeFactory::instance().create_data_type(TYPE_CHAR, false, 0, 0, 20)); + MutableColumnPtr col = type->create_column(); + col->insert(Field::create_field<PrimitiveType::TYPE_CHAR>(std::string("fixed_char_val"))); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_char")); + } + + // 15. VARCHAR + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false)); + MutableColumnPtr col = type->create_column(); + col->insert( + Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("variable_string"))); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_varchar")); + } + + // 16. STRING + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_STRING, false)); + MutableColumnPtr col = type->create_column(); + col->insert(Field::create_field<PrimitiveType::TYPE_STRING>( + std::string("long_text_content_here"))); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_string")); + } + + // 17. DECIMALV2 (precision=27, scale=9) + { + DataTypePtr type = make_nullable( + DataTypeFactory::instance().create_data_type(TYPE_DECIMALV2, false, 27, 9)); + MutableColumnPtr col = type->create_column(); + DecimalV2Value dec_val(123456789, 123456789); + col->insert(Field::create_field<PrimitiveType::TYPE_DECIMALV2>(dec_val)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_decimalv2")); + } + + // 18. DECIMAL32 (precision=9, scale=2) + { + DataTypePtr type = make_nullable( + DataTypeFactory::instance().create_data_type(TYPE_DECIMAL32, false, 9, 2)); + MutableColumnPtr col = type->create_column(); + Decimal32 dec_val(static_cast<Int32>(12345678)); + col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL32>(dec_val)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_decimal32")); + } + + // 19. DECIMAL64 (precision=18, scale=4) + { + DataTypePtr type = make_nullable( + DataTypeFactory::instance().create_data_type(TYPE_DECIMAL64, false, 18, 4)); + MutableColumnPtr col = type->create_column(); + Decimal64 dec_val(static_cast<Int64>(123456789012345678LL)); + col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL64>(dec_val)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_decimal64")); + } + + // 20. DECIMAL128I (precision=38, scale=6) + { + DataTypePtr type = make_nullable( + DataTypeFactory::instance().create_data_type(TYPE_DECIMAL128I, false, 38, 6)); + MutableColumnPtr col = type->create_column(); + Decimal128V3 dec_val(static_cast<Int128>(123456789012345678LL) * 100); + col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL128I>(dec_val)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_decimal128")); + } + + // 21. DECIMAL256 (precision=76, scale=10) + { + DataTypePtr type = make_nullable( + DataTypeFactory::instance().create_data_type(TYPE_DECIMAL256, false, 76, 10)); + MutableColumnPtr col = type->create_column(); + wide::Int256 wide_val = wide::Int256(123456789012345678LL) * 10000000000LL; + Decimal256 dec_val(wide_val); + col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL256>(dec_val)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_decimal256")); + } + + // 22. IPV4 + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_IPV4, false)); + MutableColumnPtr col = type->create_column(); + IPv4 ip_val = (192U << 24) | (168U << 16) | (1U << 8) | 100U; // 192.168.1.100 + col->insert(Field::create_field<PrimitiveType::TYPE_IPV4>(ip_val)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_ipv4")); + } + + // 23. IPV6 + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_IPV6, false)); + MutableColumnPtr col = type->create_column(); + // ::ffff:192.168.1.100 in IPv6 + IPv6 ip6_val = 0; + ip6_val = (static_cast<IPv6>(0xFFFF) << 32) | + ((192U << 24) | (168U << 16) | (1U << 8) | 100U); + col->insert(Field::create_field<PrimitiveType::TYPE_IPV6>(ip6_val)); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_ipv6")); + } + + // 24. JSONB + { + DataTypePtr type = + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_JSONB, false)); + MutableColumnPtr col = type->create_column(); + // Use JsonbWriter to create JSONB binary data + JsonbWriter writer; + writer.writeStartObject(); + writer.writeKey("key", static_cast<uint8_t>(3)); + writer.writeStartString(); + writer.writeString("value"); + writer.writeEndString(); + writer.writeKey("num", static_cast<uint8_t>(3)); + writer.writeInt32(123); + writer.writeEndObject(); + const char* jsonb_data = writer.getOutput()->getBuffer(); + size_t jsonb_size = writer.getOutput()->getSize(); + JsonbField field(jsonb_data, jsonb_size); + col->insert(Field::create_field<PrimitiveType::TYPE_JSONB>(std::move(field))); + block.insert(ColumnWithTypeAndName(std::move(col), type, "col_jsonb")); + } + + // 25. ARRAY<INT> + { + DataTypePtr arr_nested_type = DataTypeFactory::instance().create_data_type(TYPE_INT, false); + DataTypePtr arr_type = make_nullable(std::make_shared<DataTypeArray>(arr_nested_type)); + MutableColumnPtr col = arr_type->create_column(); + auto* nullable_col = assert_cast<ColumnNullable*>(col.get()); + auto& nested = nullable_col->get_nested_column(); + auto& null_map = nullable_col->get_null_map_data(); + null_map.push_back(0); // non-null + + auto& array_col = assert_cast<ColumnArray&>(nested); + auto& offsets = array_col.get_offsets(); + auto& data = array_col.get_data(); + auto mutable_data = data.assume_mutable(); + + mutable_data->insert( + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(10))); + mutable_data->insert( + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(20))); + mutable_data->insert( + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(30))); + offsets.push_back(3); // 3 elements + block.insert(ColumnWithTypeAndName(std::move(col), arr_type, "col_array")); + } + + // 26. MAP<STRING, INT> + { + DataTypePtr map_key_type = + DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false); + DataTypePtr map_value_type = DataTypeFactory::instance().create_data_type(TYPE_INT, false); + DataTypePtr map_type = + make_nullable(std::make_shared<DataTypeMap>(map_key_type, map_value_type)); + MutableColumnPtr col = map_type->create_column(); + auto* nullable_col = assert_cast<ColumnNullable*>(col.get()); + auto& nested = nullable_col->get_nested_column(); + auto& null_map = nullable_col->get_null_map_data(); + null_map.push_back(0); // non-null + + auto& offsets = assert_cast<ColumnMap&>(nested).get_offsets(); + auto& keys = assert_cast<ColumnMap&>(nested).get_keys(); + auto& values = assert_cast<ColumnMap&>(nested).get_values(); + + auto mutable_keys = keys.assume_mutable(); + auto mutable_values = values.assume_mutable(); + + mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("key1"))); + mutable_values->insert( + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(100))); + mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("key2"))); + mutable_values->insert( + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(200))); + + offsets.push_back(2); // 2 key-value pairs + block.insert(ColumnWithTypeAndName(std::move(col), map_type, "col_map")); + } + + // 27. STRUCT<si:INT, ss:VARCHAR> + { + DataTypes struct_fields; + struct_fields.emplace_back( + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false))); + struct_fields.emplace_back( + make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false))); + Strings field_names = {"si", "ss"}; + DataTypePtr struct_type = + make_nullable(std::make_shared<DataTypeStruct>(struct_fields, field_names)); + MutableColumnPtr col = struct_type->create_column(); + auto* nullable_col = assert_cast<ColumnNullable*>(col.get()); + auto& nested = nullable_col->get_nested_column(); + auto& null_map = nullable_col->get_null_map_data(); + null_map.push_back(0); // non-null + + auto& struct_col = assert_cast<ColumnStruct&>(nested); + const auto& fields = struct_col.get_columns(); + auto mutable_field0 = fields[0]->assume_mutable(); + auto mutable_field1 = fields[1]->assume_mutable(); + + mutable_field0->insert( + Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(999))); + mutable_field1->insert( + Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("struct_val"))); + block.insert(ColumnWithTypeAndName(std::move(col), struct_type, "col_struct")); + } + + // 28. VARIANT (JSON object) + { + DataTypePtr variant_type = make_nullable(std::make_shared<DataTypeVariant>()); + MutableColumnPtr col = variant_type->create_column(); + auto* nullable_col = assert_cast<ColumnNullable*>(col.get()); + auto& nested = nullable_col->get_nested_column(); + + std::string json_str = R"({"name":"test","value":12345})"; + std::vector<Slice> slices = {Slice(json_str.data(), json_str.size())}; + + auto variant_type_inner = std::make_shared<DataTypeVariant>(); + auto serde = variant_type_inner->get_serde(); + auto* variant_serde = assert_cast<DataTypeVariantSerDe*>(serde.get()); + + uint64_t num_deserialized = 0; + DataTypeSerDe::FormatOptions options; + Status st = variant_serde->deserialize_column_from_json_vector(nested, slices, + &num_deserialized, options); + EXPECT_TRUE(st.ok()) << st; + EXPECT_EQ(1U, num_deserialized); + + auto& null_map = nullable_col->get_null_map_data(); + null_map.clear(); + null_map.resize_fill(1, 0); + + block.insert(ColumnWithTypeAndName(std::move(col), variant_type, "col_variant")); + } + + return block; +} + +// Pre-generated native file path for all types test. +// The file is stored in: be/test/data/vec/native/all_types_single_row.native +static std::string get_all_types_native_file_path() { + auto root_dir = std::string(getenv("ROOT")); + return root_dir + "/be/test/data/vec/native/all_types_single_row.native"; +} + +// Generator test: Generate native file with all types (DISABLED by default). +// Run this test manually to regenerate the test data file: +// ./run-be-ut.sh --run --filter=*DISABLED_generate_all_types_native_file* +// Then copy the generated file to: be/test/data/vec/native/all_types_single_row.native +TEST_F(NativeReaderWriterTest, generate_all_types_native_file) { + // Output to current directory, user needs to copy it to test data dir + std::string file_path = "./all_types_single_row.native"; + + auto fs = io::global_local_filesystem(); + io::FileWriterPtr file_writer; + Status st = fs->create_file(file_path, &file_writer); + ASSERT_TRUE(st.ok()) << st; + + RuntimeState state; + VExprContextSPtrs exprs; + VNativeTransformer transformer(&state, file_writer.get(), exprs, false); + st = transformer.open(); + ASSERT_TRUE(st.ok()) << st; + + Block src_block = create_all_types_test_block(); + ASSERT_EQ(1U, src_block.rows()) << "Source block should have exactly 1 row"; + ASSERT_EQ(28U, src_block.columns()) << "Source block should have 28 columns for all types"; + + st = transformer.write(src_block); + ASSERT_TRUE(st.ok()) << st; + st = transformer.close(); + ASSERT_TRUE(st.ok()) << st; + + std::cout << "Generated native file: " << file_path << std::endl; + std::cout << "Please copy it to: be/test/data/vec/native/all_types_single_row.native" + << std::endl; +} + +// Test: read pre-generated native file with all types and verify data +TEST_F(NativeReaderWriterTest, read_all_types_from_pregenerated_file) { + std::string file_path = get_all_types_native_file_path(); + + auto fs = io::global_local_filesystem(); + bool exists = false; + Status st = fs->exists(file_path, &exists); + DCHECK(exists) << "Pre-generated native file not found: " << file_path + << ". Run generate_all_types_native_file to generate it."; + + RuntimeState state; + TFileScanRangeParams scan_params; + scan_params.__set_file_type(TFileType::FILE_LOCAL); + TFileRangeDesc scan_range; + scan_range.__set_path(file_path); + scan_range.__set_file_type(TFileType::FILE_LOCAL); + NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state); + + Block dst_block; + size_t read_rows = 0; + bool eof = false; + st = reader_impl.get_next_block(&dst_block, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(1U, read_rows) << "Should read exactly 1 row"; + ASSERT_EQ(28U, dst_block.columns()) << "Should have 28 columns for all types"; + ASSERT_FALSE(eof); + + // Regenerate expected block and compare + Block expected_block = create_all_types_test_block(); + + // Verify data equality for the single row + for (size_t col = 0; col < expected_block.columns(); ++col) { + const auto& src = expected_block.get_by_position(col); + const auto& dst = dst_block.get_by_position(col); + + // Type family should match + ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name()) + << "Type mismatch at col=" << col << " (" << src.name << ")"; + ASSERT_EQ(src.column->size(), dst.column->size()) + << "Size mismatch at col=" << col << " (" << src.name << ")"; + + // Compare field values + auto src_field = (*src.column)[0]; + auto dst_field = (*dst.column)[0]; + ASSERT_EQ(src_field, dst_field) + << "Value mismatch at col=" << col << " (" << src.name << ")"; + } + + // Next call should hit EOF + Block dst_block2; + read_rows = 0; + eof = false; + st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + ASSERT_TRUE(eof); + ASSERT_EQ(read_rows, 0U); +} + +// Test: round-trip all known types with a single row (generates temp file each time) +TEST_F(NativeReaderWriterTest, round_trip_all_types_single_row) { + UniqueId uid = UniqueId::gen_uid(); + std::string uuid = uid.to_string(); + std::string file_path = "./native_format_all_types_" + uuid + ".native"; + + auto fs = io::global_local_filesystem(); + io::FileWriterPtr file_writer; + Status st = fs->create_file(file_path, &file_writer); + ASSERT_TRUE(st.ok()) << st; + + RuntimeState state; + VExprContextSPtrs exprs; + VNativeTransformer transformer(&state, file_writer.get(), exprs, false); + st = transformer.open(); + ASSERT_TRUE(st.ok()) << st; + + Block src_block = create_all_types_test_block(); + ASSERT_EQ(1U, src_block.rows()) << "Source block should have exactly 1 row"; + ASSERT_EQ(28U, src_block.columns()) << "Source block should have 28 columns for all types"; + + st = transformer.write(src_block); + ASSERT_TRUE(st.ok()) << st; + st = transformer.close(); + ASSERT_TRUE(st.ok()) << st; + + // Read back via NativeReader + TFileScanRangeParams scan_params; + scan_params.__set_file_type(TFileType::FILE_LOCAL); + TFileRangeDesc scan_range; + scan_range.__set_path(file_path); + scan_range.__set_file_type(TFileType::FILE_LOCAL); + NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr, &state); + + Block dst_block; + size_t read_rows = 0; + bool eof = false; + st = reader_impl.get_next_block(&dst_block, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(src_block.rows(), read_rows); + ASSERT_EQ(src_block.columns(), dst_block.columns()); + ASSERT_FALSE(eof); + + // Verify data equality for the single row + for (size_t col = 0; col < src_block.columns(); ++col) { + const auto& src = src_block.get_by_position(col); + const auto& dst = dst_block.get_by_position(col); + + // Type family should match + ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name()) + << "Type mismatch at col=" << col << " (" << src.name << ")"; + ASSERT_EQ(src.column->size(), dst.column->size()) + << "Size mismatch at col=" << col << " (" << src.name << ")"; + + // Compare field values + auto src_field = (*src.column)[0]; + auto dst_field = (*dst.column)[0]; + ASSERT_EQ(src_field, dst_field) + << "Value mismatch at col=" << col << " (" << src.name << ")"; + } + + // Next call should hit EOF + Block dst_block2; + read_rows = 0; + eof = false; + st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof); + ASSERT_TRUE(st.ok()) << st; + ASSERT_TRUE(eof); + ASSERT_EQ(read_rows, 0U); + + // Clean up temp file + bool exists = false; + st = fs->exists(file_path, &exists); + if (st.ok() && exists) { + static_cast<void>(fs->delete_file(file_path)); + } +} + +} // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java index 62345cd811d..774ee4e6e83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java @@ -30,6 +30,7 @@ public class FileFormatConstants { public static final String FORMAT_AVRO = "avro"; public static final String FORMAT_WAL = "wal"; public static final String FORMAT_ARROW = "arrow"; + public static final String FORMAT_NATIVE = "native"; public static final String PROP_FORMAT = "format"; public static final String PROP_COLUMN_SEPARATOR = "column_separator"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index b95f35ffb60..ae75e4dd68f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -576,6 +576,9 @@ public class Util { return TFileFormatType.FORMAT_WAL; } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ARROW)) { return TFileFormatType.FORMAT_ARROW; + } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_NATIVE)) { + // Doris Native binary columnar format + return TFileFormatType.FORMAT_NATIVE; } else { return TFileFormatType.FORMAT_UNKNOWN; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java index 9706982c1ba..ac2512d88b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java @@ -39,6 +39,7 @@ public abstract class FileFormatProperties { public static final String FORMAT_AVRO = "avro"; public static final String FORMAT_WAL = "wal"; public static final String FORMAT_ARROW = "arrow"; + public static final String FORMAT_NATIVE = "native"; public static final String PROP_COMPRESS_TYPE = "compress_type"; protected String formatName; @@ -102,6 +103,8 @@ public abstract class FileFormatProperties { return new WalFileFormatProperties(); case FORMAT_ARROW: return new ArrowFileFormatProperties(); + case FORMAT_NATIVE: + return new NativeFileFormatProperties(); default: throw new AnalysisException("format:" + formatString + " is not supported."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java new file mode 100644 index 00000000000..fe4306906d4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.fileformat; + +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileTextScanRangeParams; +import org.apache.doris.thrift.TResultFileSinkOptions; + +import java.util.Map; + +/** + * File format properties for Doris Native binary columnar format. + * + * This format is intended for high-performance internal data exchange + */ + +public class NativeFileFormatProperties extends FileFormatProperties { + + public NativeFileFormatProperties() { + super(TFileFormatType.FORMAT_NATIVE, FileFormatProperties.FORMAT_NATIVE); + } + + @Override + public void analyzeFileFormatProperties(Map<String, String> formatProperties, + boolean isRemoveOriginProperty) + throws AnalysisException { + // Currently no extra user visible properties for native format. + // Just ignore all other properties gracefully. + } + + @Override + public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) { + // No extra sink options are required for native format. + } + + @Override + public TFileAttributes toTFileAttributes() { + // For now we don't need text params for Native format, but TFileAttributes + // requires a text_params field to be non-null on BE side. + TFileAttributes fileAttributes = new TFileAttributes(); + TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); + fileAttributes.setTextParams(textParams); + return fileAttributes; + } +} + + + diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java index 52f5b3713a8..4ec74e001d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java @@ -368,7 +368,10 @@ public class NereidsLoadScanProvider { } } else { Column slotColumn; - if (fileGroup.getFileFormatProperties().getFileFormatType() == TFileFormatType.FORMAT_ARROW) { + TFileFormatType fileFormatType = fileGroup.getFileFormatProperties().getFileFormatType(); + // Use real column type for arrow/native format, other formats read as varchar first + if (fileFormatType == TFileFormatType.FORMAT_ARROW + || fileFormatType == TFileFormatType.FORMAT_NATIVE) { slotColumn = new Column(realColName, colToType.get(realColName), true); } else { if (fileGroupInfo.getUniqueKeyUpdateMode() == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index e08eaa2c825..3dc8468e846 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -31,6 +31,7 @@ import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Type; +import org.apache.doris.catalog.VariantType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; @@ -359,7 +360,8 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio * @return column type and the number of parsed PTypeNodes */ private Pair<Type, Integer> getColumnType(List<PTypeNode> typeNodes, int start) { - PScalarType columnType = typeNodes.get(start).getScalarType(); + PTypeNode typeNode = typeNodes.get(start); + PScalarType columnType = typeNode.getScalarType(); TPrimitiveType tPrimitiveType = TPrimitiveType.findByValue(columnType.getType()); Type type; int parsedNodes; @@ -391,6 +393,16 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio parsedNodes += fieldType.value(); } type = new StructType(fields); + } else if (tPrimitiveType == TPrimitiveType.VARIANT) { + // Preserve VARIANT-specific properties from PTypeNode, especially variant_max_subcolumns_count. + int maxSubcolumns = typeNode.getVariantMaxSubcolumnsCount(); + // Currently no predefined fields are carried in PTypeNode for VARIANT, so use empty list and default + // values for other properties. + type = new VariantType(new ArrayList<>(), maxSubcolumns, + /*enableTypedPathsToSparse*/ false, + /*variantMaxSparseColumnStatisticsSize*/ 10000, + /*variantSparseHashShardCount*/ 0); + parsedNodes = 1; } else { type = ScalarType.createType(PrimitiveType.fromThrift(tPrimitiveType), columnType.getLen(), columnType.getPrecision(), columnType.getScale()); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 0f9ccbf8b01..4435423f5c5 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -108,7 +108,8 @@ enum TFileFormatType { FORMAT_CSV_SNAPPYBLOCK = 14, FORMAT_WAL = 15, FORMAT_ARROW = 16, - FORMAT_TEXT = 17 + FORMAT_TEXT = 17, + FORMAT_NATIVE = 18 } // In previous versions, the data compression format and file format were stored together, as TFileFormatType, diff --git a/regression-test/data/export_p0/outfile/native/test_outfile_native.out b/regression-test/data/export_p0/outfile/native/test_outfile_native.out new file mode 100644 index 00000000000..fc60340a7e1 --- /dev/null +++ b/regression-test/data/export_p0/outfile/native/test_outfile_native.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- +1 2024-01-01 2024-01-01T00:00 s1 1 1 true 1.1 +2 2024-01-01 2024-01-01T00:00 s2 2 2 true 2.2 +3 2024-01-01 2024-01-01T00:00 s3 3 3 true 3.3 +4 2024-01-01 2024-01-01T00:00 s4 4 4 true 4.4 +5 2024-01-01 2024-01-01T00:00 s5 5 5 true 5.5 +6 2024-01-01 2024-01-01T00:00 s6 6 6 true 6.6 +7 2024-01-01 2024-01-01T00:00 s7 7 7 true 7.7 +8 2024-01-01 2024-01-01T00:00 s8 8 8 true 8.800000000000001 +9 2024-01-01 2024-01-01T00:00 s9 9 9 true 9.9 +10 2024-01-01 2024-01-01T00:00 s10 10 10 true 10.1 + +-- !select_s3_native -- +1 2024-01-01 2024-01-01T00:00 s1 1 1 true 1.1 +2 2024-01-01 2024-01-01T00:00 s2 2 2 true 2.2 +3 2024-01-01 2024-01-01T00:00 s3 3 3 true 3.3 +4 2024-01-01 2024-01-01T00:00 s4 4 4 true 4.4 +5 2024-01-01 2024-01-01T00:00 s5 5 5 true 5.5 +6 2024-01-01 2024-01-01T00:00 s6 6 6 true 6.6 +7 2024-01-01 2024-01-01T00:00 s7 7 7 true 7.7 +8 2024-01-01 2024-01-01T00:00 s8 8 8 true 8.800000000000001 +9 2024-01-01 2024-01-01T00:00 s9 9 9 true 9.9 +10 2024-01-01 2024-01-01T00:00 s10 10 10 true 10.1 + diff --git a/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy b/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy new file mode 100644 index 00000000000..334fc809b88 --- /dev/null +++ b/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_outfile_native", "p0") { + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + def tableName = "outfile_native_test" + def outFilePath = "${bucket}/outfile/native/exp_" + + // Export helper: write to S3 and return the URL output by FE + def outfile_to_s3 = { + def res = sql """ + SELECT * FROM ${tableName} t ORDER BY id + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS native + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + return res[0][3] + } + + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `id` INT NOT NULL, + `c_date` DATE NOT NULL, + `c_dt` DATETIME NOT NULL, + `c_str` VARCHAR(20), + `c_int` INT, + `c_tinyint` TINYINT, + `c_bool` boolean, + `c_double` double + ) + DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + """ + + // Insert 10 rows of test data (the last row is all NULL) + StringBuilder sb = new StringBuilder() + int i = 1 + for (; i < 10000; i ++) { + sb.append(""" + (${i}, '2024-01-01', '2024-01-01 00:00:00', 's${i}', ${i}, ${i % 128}, true, ${i}.${i}), + """) + } + sb.append(""" + (${i}, '2024-01-01', '2024-01-01 00:00:00', NULL, NULL, NULL, NULL, NULL) + """) + sql """ INSERT INTO ${tableName} VALUES ${sb.toString()} """ + + // baseline: local table query result + qt_select_default """ SELECT * FROM ${tableName} t ORDER BY id limit 10; """ + + // 1) 导出为 Native 文件到 S3 + def outfileUrl = outfile_to_s3() + + // 2) 从 S3 使用 S3 TVF (format=native) 查询回数据,并与 baseline 对比 + // outfileUrl 形如:s3://bucket/outfile/native/exp_xxx_* ,需要去掉 "s3://bucket" 前缀和末尾的 '*' + qt_select_s3_native """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${ + outfileUrl.substring(5 + bucket.length(), outfileUrl.length() - 1) + }0.native", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "native", + "region" = "${region}" + ) order by id limit 10; + """ + } finally { + try_sql("DROP TABLE IF EXISTS ${tableName}") + } +} \ No newline at end of file diff --git a/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy b/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy new file mode 100644 index 00000000000..911d821263e --- /dev/null +++ b/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.io.File +import org.awaitility.Awaitility +import static java.util.concurrent.TimeUnit.SECONDS; + +suite("test_export_variant_10k_columns", "p0") { + // open nereids + sql """ set enable_nereids_planner=true """ + sql """ set enable_fallback_to_original_planner=false """ + + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + // String bucket = context.config.otherConfigs.get("s3BucketName"); + String bucket = getS3BucketName() + + def table_export_name = "test_export_variant_10k" + def table_load_name = "test_load_variant_10k" + def outfile_path_prefix = """${bucket}/export/p0/variant_10k/exp""" + + def waiting_export = { export_label -> + while (true) { + def res = sql """ show export where label = "${export_label}" """ + logger.info("export state: " + res[0][2]) + if (res[0][2] == "FINISHED") { + def json = parseJson(res[0][11]) + assert json instanceof List + // assertEquals("1", json.fileNumber[0][0]) + log.info("outfile_path: ${json.url[0][0]}") + return json.url[0][0]; + } else if (res[0][2] == "CANCELLED") { + throw new IllegalStateException("""export failed: ${res[0][10]}""") + } else { + sleep(5000) + } + } + } + + // 1. Create table with variant column + sql """ DROP TABLE IF EXISTS ${table_export_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_export_name} ( + `id` INT NOT NULL, + `v` VARIANT<PROPERTIES ("variant_max_subcolumns_count" = "2048")> NULL + ) + DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + """ + + // 2. Generate data with 10000 keys in variant + // Generate N=100,000 rows. + // Total 10,000 columns, but each row only has 50 columns (sparse). + // This simulates a realistic sparse wide table scenario. + + File dataFile = File.createTempFile("variant_10k_data", ".json") + dataFile.deleteOnExit() + int num_rows = 1000 + try { + dataFile.withWriter { writer -> + StringBuilder sb = new StringBuilder() + for (int i = 1; i <= num_rows; i++) { + sb.setLength(0) + sb.append("{\"id\": ").append(i).append(", \"v\": {") + // Select 50 keys out of 10000 for each row + for (int k = 0; k < 50; k++) { + if (k > 0) sb.append(", ") + // Scatter the keys to ensure coverage of all 10000 columns across rows + int keyIdx = (i + k * 200) % 10000 + sb.append('"k').append(keyIdx).append('":').append(i) + } + sb.append("}}\n") + writer.write(sb.toString()) + } + } + + // 3. Stream Load + streamLoad { + table table_export_name + set 'format', 'json' + set 'read_json_by_line', 'true' + file dataFile.getAbsolutePath() + time 60000 // 60s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(num_rows, json.NumberTotalRows) + assertEquals(num_rows, json.NumberLoadedRows) + } + } + } finally { + dataFile.delete() + } + + // def format = "parquet" + def format = "native" + + // 4. Export to S3 (Parquet) + def uuid = UUID.randomUUID().toString() + // def outFilePath = """/tmp/variant_10k_export""" + def outFilePath = """${outfile_path_prefix}_${uuid}""" + def label = "label_${uuid}" + + try { + sql """ + EXPORT TABLE ${table_export_name} TO "s3://${outFilePath}/" + PROPERTIES( + "label" = "${label}", + "format" = "${format}" + ) + WITH S3( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}", + "provider" = "${getS3Provider()}" + ); + """ + + long startExport = System.currentTimeMillis() + def outfile_url = waiting_export.call(label) + long endExport = System.currentTimeMillis() + logger.info("Export ${num_rows} rows with variant took ${endExport - startExport} ms") + + // 5. Validate by S3 TVF + def s3_tvf_sql = """ s3( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "s3.access_key"= "${ak}", + "s3.secret_key" = "${sk}", + "format" = "${format}", + "provider" = "${getS3Provider()}", + "region" = "${region}" + ) """ + + // Verify row count + def res = sql """ SELECT count(*) FROM ${s3_tvf_sql} """ + assertEquals(num_rows, res[0][0]) + + def value_type = "VARIANT<PROPERTIES (\"variant_max_subcolumns_count\" = \"2048\")>" + if (new Random().nextInt(2) == 0) { + value_type = "text" + } + // 6. Load back into Doris (to a new table) to verify import performance/capability + sql """ DROP TABLE IF EXISTS ${table_load_name} """ + sql """ + CREATE TABLE ${table_load_name} ( + `id` INT, + `v` ${value_type} + ) + DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + """ + + // Use LOAD LABEL. The label contains '-' from UUID, so it must be quoted with back-quotes. + def load_label = "load_${uuid}" + + sql """ + LOAD LABEL `${load_label}` + ( + DATA INFILE("s3://${outFilePath}/*") + INTO TABLE ${table_load_name} + FORMAT AS "${format}" + (id, v) + ) + WITH S3 ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}", + "provider" = "${getS3Provider()}" + ); + """ + + Awaitility.await().atMost(240, SECONDS).pollInterval(5, SECONDS).until({ + def loadResult = sql """ + show load where label = '${load_label}' + """ + if (loadResult.get(0).get(2) == 'CANCELLED' || loadResult.get(0).get(2) == 'FAILED') { + println("load failed: " + loadResult.get(0)) + throw new RuntimeException("load failed"+ loadResult.get(0)) + } + return loadResult.get(0).get(2) == 'FINISHED' + }) + // Check if data loaded + def load_count = sql """ SELECT count(*) FROM ${table_load_name} """ + assertEquals(num_rows, load_count[0][0]) + + // Check variant data integrity (sample) + // Row 1 has keys: (1 + k*200) % 10000. For k=0, key is k1. Value is 1. + def check_v = sql """ SELECT cast(v['k1'] as int) FROM ${table_load_name} WHERE id = 1 """ + assertEquals(1, check_v[0][0]) + + } finally { + // try_sql("DROP TABLE IF EXISTS ${table_export_name}") + // try_sql("DROP TABLE IF EXISTS ${table_load_name}") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
