This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new c572db1bd6d branch-4.1 cherry-pick [feat](format) support native
format (#61286)
c572db1bd6d is described below
commit c572db1bd6d032688958e3165f5d62b8faf2e32c
Author: lihangyu <[email protected]>
AuthorDate: Sat Mar 14 01:18:50 2026 +0800
branch-4.1 cherry-pick [feat](format) support native format (#61286)
cherry-pick #58711
---
be/src/service/internal_service.cpp | 6 +
be/src/vec/exec/format/native/native_format.h | 57 +
be/src/vec/exec/format/native/native_reader.cpp | 369 ++++++
be/src/vec/exec/format/native/native_reader.h | 107 ++
be/src/vec/exec/scan/file_scanner.cpp | 17 +-
be/src/vec/functions/cast/cast_to_jsonb.h | 2 +-
be/src/vec/functions/cast/cast_to_variant.h | 180 +--
be/src/vec/runtime/vnative_transformer.cpp | 133 ++
be/src/vec/runtime/vnative_transformer.h | 65 +
be/src/vec/sink/writer/vfile_result_writer.cpp | 9 +
.../data/vec/native/all_types_single_row.native | Bin 0 -> 1129 bytes
.../format/native/native_reader_writer_test.cpp | 1350 ++++++++++++++++++++
.../doris/common/util/FileFormatConstants.java | 1 +
.../java/org/apache/doris/common/util/Util.java | 3 +
.../property/fileformat/FileFormatProperties.java | 3 +
.../fileformat/NativeFileFormatProperties.java | 65 +
.../nereids/load/NereidsLoadScanProvider.java | 5 +-
.../ExternalFileTableValuedFunction.java | 14 +-
gensrc/thrift/PlanNodes.thrift | 3 +-
.../outfile/native/test_outfile_native.out | 25 +
.../outfile/native/test_outfile_native.groovy | 100 ++
.../test_export_variant_10k_columns.groovy | 215 ++++
22 files changed, 2646 insertions(+), 83 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index bbd86d9c56c..62ed4aec19c 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -123,6 +123,7 @@
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/json/new_json_reader.h"
+#include "vec/exec/format/native/native_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/text/text_reader.h"
@@ -856,6 +857,11 @@ void
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
reader = vectorized::OrcReader::create_unique(params, range, "",
&io_ctx);
break;
}
+ case TFileFormatType::FORMAT_NATIVE: {
+ reader = vectorized::NativeReader::create_unique(profile.get(),
params, range, &io_ctx,
+ nullptr);
+ break;
+ }
case TFileFormatType::FORMAT_JSON: {
reader = vectorized::NewJsonReader::create_unique(profile.get(),
params, range,
file_slots,
&io_ctx);
diff --git a/be/src/vec/exec/format/native/native_format.h
b/be/src/vec/exec/format/native/native_format.h
new file mode 100644
index 00000000000..e004a3c0f31
--- /dev/null
+++ b/be/src/vec/exec/format/native/native_format.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+namespace doris::vectorized {
+
+// Doris Native format file-level constants.
+//
+// File layout (byte stream):
+//
+//
+-------------------------------+---------------------------+---------------------------+
...
+// | File header | Data block #0 | Data block
#1 | ...
+//
+-------------------------------+---------------------------+---------------------------+
...
+//
+// File header (12 bytes total):
+// - [0..7] : magic bytes "DORISN1\0" (DORIS_NATIVE_MAGIC)
+// - [8..11] : uint32_t format_version (DORIS_NATIVE_FORMAT_VERSION,
little-endian)
+//
+// Each data block i:
+// - uint64_t block_size : length in bytes of serialized PBlock
(little-endian)
+// - uint8_t[block_size] : PBlock protobuf payload produced by
Block::serialize()
+//
+// NativeReader:
+// - Detects the optional file header by checking the first 8 bytes against
DORIS_NATIVE_MAGIC.
+// - If the header is present, it skips 12 bytes and then starts reading
blocks as
+// [uint64_t block_size][PBlock bytes]...
+// - If the header is absent (legacy files), it starts reading blocks from
offset 0.
+//
+// VNativeTransformer:
+// - Writes the header once in open(), then appends each block in write() as
+// [uint64_t block_size][PBlock bytes]...
+//
+// These constants are shared between writer, reader and tests to keep the
on-disk
+// format definition in a single place.
+// Header layout:
+// [magic bytes "DORISN1\0"][uint32_t format_version]
+static constexpr char DORIS_NATIVE_MAGIC[8] = {'D', 'O', 'R', 'I', 'S', 'N',
'1', '\0'};
+static constexpr uint32_t DORIS_NATIVE_FORMAT_VERSION = 1;
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/native/native_reader.cpp
b/be/src/vec/exec/format/native/native_reader.cpp
new file mode 100644
index 00000000000..66cfe3a9f91
--- /dev/null
+++ b/be/src/vec/exec/format/native/native_reader.cpp
@@ -0,0 +1,369 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/native/native_reader.h"
+
+#include <gen_cpp/data.pb.h>
+
+#include "io/file_factory.h"
+#include "io/fs/buffered_reader.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/tracing_file_reader.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/exec/format/native/native_format.h"
+
+namespace doris::vectorized {
+
+#include "common/compile_check_begin.h"
+
+NativeReader::NativeReader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
+ const TFileRangeDesc& range, io::IOContext* io_ctx,
RuntimeState* state)
+ : _profile(profile),
+ _scan_params(params),
+ _scan_range(range),
+ _io_ctx(io_ctx),
+ _state(state) {}
+
+NativeReader::~NativeReader() {
+ (void)close();
+}
+
+namespace {
+
+Status validate_and_consume_header(io::FileReaderSPtr file_reader, const
TFileRangeDesc& range,
+ int64_t* file_size, int64_t*
current_offset, bool* eof) {
+ *file_size = file_reader->size();
+ *current_offset = 0;
+ *eof = (*file_size == 0);
+
+ // Validate and consume Doris Native file header.
+ // Expected layout:
+ // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t
block_size]...
+ static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) +
sizeof(uint32_t);
+ if (*eof || *file_size < static_cast<int64_t>(HEADER_SIZE)) {
+ return Status::InternalError(
+ "invalid Doris Native file {}, file size {} is smaller than
header size {}",
+ range.path, *file_size, HEADER_SIZE);
+ }
+
+ char header[HEADER_SIZE];
+ Slice header_slice(header, sizeof(header));
+ size_t bytes_read = 0;
+ RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read));
+ if (bytes_read != sizeof(header)) {
+ return Status::InternalError(
+ "failed to read Doris Native header from file {}, expect {}
bytes, got {} bytes",
+ range.path, sizeof(header), bytes_read);
+ }
+
+ if (memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) {
+ return Status::InternalError("invalid Doris Native magic header in
file {}", range.path);
+ }
+
+ uint32_t version = 0;
+ memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t));
+ if (version != DORIS_NATIVE_FORMAT_VERSION) {
+ return Status::InternalError(
+ "unsupported Doris Native format version {} in file {}, expect
{}", version,
+ range.path, DORIS_NATIVE_FORMAT_VERSION);
+ }
+
+ *current_offset = sizeof(header);
+ *eof = (*file_size == *current_offset);
+ return Status::OK();
+}
+
+} // namespace
+
+Status NativeReader::init_reader() {
+ if (_file_reader != nullptr) {
+ return Status::OK();
+ }
+
+ // Create underlying file reader. For now we always use random access mode.
+ io::FileSystemProperties system_properties;
+ io::FileDescription file_description;
+ file_description.file_size = -1;
+ if (_scan_range.__isset.file_size) {
+ file_description.file_size = _scan_range.file_size;
+ }
+ file_description.path = _scan_range.path;
+ if (_scan_range.__isset.fs_name) {
+ file_description.fs_name = _scan_range.fs_name;
+ }
+ if (_scan_range.__isset.modification_time) {
+ file_description.mtime = _scan_range.modification_time;
+ } else {
+ file_description.mtime = 0;
+ }
+
+ if (_scan_range.__isset.file_type) {
+ // For compatibility with older FE.
+ system_properties.system_type = _scan_range.file_type;
+ } else {
+ system_properties.system_type = _scan_params.file_type;
+ }
+ system_properties.properties = _scan_params.properties;
+ system_properties.hdfs_params = _scan_params.hdfs_params;
+ if (_scan_params.__isset.broker_addresses) {
+
system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(),
+
_scan_params.broker_addresses.end());
+ }
+
+ io::FileReaderOptions reader_options =
+ FileFactory::get_reader_options(_state, file_description);
+ auto reader_res = io::DelegateReader::create_file_reader(
+ _profile, system_properties, file_description, reader_options,
+ io::DelegateReader::AccessMode::RANDOM, _io_ctx);
+ if (!reader_res.has_value()) {
+ return reader_res.error();
+ }
+ _file_reader = reader_res.value();
+
+ if (_io_ctx) {
+ _file_reader =
+ std::make_shared<io::TracingFileReader>(_file_reader,
_io_ctx->file_reader_stats);
+ }
+
+ RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range,
&_file_size,
+ &_current_offset, &_eof));
+ return Status::OK();
+}
+
+Status NativeReader::get_next_block(Block* block, size_t* read_rows, bool*
eof) {
+ if (_eof) {
+ *read_rows = 0;
+ *eof = true;
+ return Status::OK();
+ }
+
+ RETURN_IF_ERROR(init_reader());
+
+ std::string buff;
+ bool local_eof = false;
+
+ // If we have already loaded the first block for schema probing, use it
first.
+ if (_first_block_loaded && !_first_block_consumed) {
+ buff = _first_block_buf;
+ local_eof = false;
+ } else {
+ RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof));
+ }
+
+ // If we reach EOF and also read no data for this call, the whole file is
considered finished.
+ if (local_eof && buff.empty()) {
+ *read_rows = 0;
+ *eof = true;
+ _eof = true;
+ return Status::OK();
+ }
+ // If buffer is empty but we have not reached EOF yet, treat this as an
error.
+ if (buff.empty()) {
+ return Status::InternalError("read empty native block from file {}",
_scan_range.path);
+ }
+
+ PBlock pblock;
+ if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) {
+ return Status::InternalError("Failed to parse native PBlock from file
{}",
+ _scan_range.path);
+ }
+
+ // Initialize schema from first block if not done yet.
+ if (!_schema_inited) {
+ RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+ }
+
+ size_t uncompressed_bytes = 0;
+ int64_t decompress_time = 0;
+ RETURN_IF_ERROR(block->deserialize(pblock, &uncompressed_bytes,
&decompress_time));
+
+ // For external file scan / TVF scenarios, unify all columns as nullable
to match
+ // GenericReader/SlotDescriptor convention. This ensures schema
consistency when
+ // some writers emit non-nullable columns.
+ for (size_t i = 0; i < block->columns(); ++i) {
+ auto& col_with_type = block->get_by_position(i);
+ if (!col_with_type.type->is_nullable()) {
+ col_with_type.column = make_nullable(col_with_type.column);
+ col_with_type.type = make_nullable(col_with_type.type);
+ }
+ }
+
+ *read_rows = block->rows();
+ *eof = false;
+
+ if (_first_block_loaded && !_first_block_consumed) {
+ _first_block_consumed = true;
+ }
+
+ // If we reached the physical end of file, mark eof for subsequent calls.
+ if (_current_offset >= _file_size) {
+ _eof = true;
+ }
+
+ return Status::OK();
+}
+
+Status NativeReader::get_columns(std::unordered_map<std::string, DataTypePtr>*
name_to_type,
+ std::unordered_set<std::string>*
missing_cols) {
+ missing_cols->clear();
+ RETURN_IF_ERROR(init_reader());
+
+ if (!_schema_inited) {
+ // Load first block lazily to initialize schema.
+ if (!_first_block_loaded) {
+ bool local_eof = false;
+ RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
+ // Treat file as empty only if we reach EOF and there is no block
data at all.
+ if (local_eof && _first_block_buf.empty()) {
+ return Status::EndOfFile("empty native file {}",
_scan_range.path);
+ }
+ // Non-EOF but empty buffer means corrupted native file.
+ if (_first_block_buf.empty()) {
+ return Status::InternalError("first native block is empty {}",
_scan_range.path);
+ }
+ _first_block_loaded = true;
+ }
+
+ PBlock pblock;
+ if (!pblock.ParseFromArray(_first_block_buf.data(),
+ static_cast<int>(_first_block_buf.size())))
{
+ return Status::InternalError("Failed to parse native PBlock for
schema from file {}",
+ _scan_range.path);
+ }
+ RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+ }
+
+ for (size_t i = 0; i < _schema_col_names.size(); ++i) {
+ name_to_type->emplace(_schema_col_names[i], _schema_col_types[i]);
+ }
+ return Status::OK();
+}
+
+Status NativeReader::init_schema_reader() {
+ RETURN_IF_ERROR(init_reader());
+ return Status::OK();
+}
+
+Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<DataTypePtr>* col_types) {
+ RETURN_IF_ERROR(init_reader());
+
+ if (!_schema_inited) {
+ if (!_first_block_loaded) {
+ bool local_eof = false;
+ RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
+ // Treat file as empty only if we reach EOF and there is no block
data at all.
+ if (local_eof && _first_block_buf.empty()) {
+ return Status::EndOfFile("empty native file {}",
_scan_range.path);
+ }
+ // Non-EOF but empty buffer means corrupted native file.
+ if (_first_block_buf.empty()) {
+ return Status::InternalError("first native block is empty {}",
_scan_range.path);
+ }
+ _first_block_loaded = true;
+ }
+
+ PBlock pblock;
+ if (!pblock.ParseFromArray(_first_block_buf.data(),
+ static_cast<int>(_first_block_buf.size())))
{
+ return Status::InternalError("Failed to parse native PBlock for
schema from file {}",
+ _scan_range.path);
+ }
+ RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+ }
+
+ *col_names = _schema_col_names;
+ *col_types = _schema_col_types;
+ return Status::OK();
+}
+
+Status NativeReader::close() {
+ _file_reader.reset();
+ return Status::OK();
+}
+
+Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) {
+ *eof = false;
+ buff->clear();
+
+ if (_file_reader == nullptr) {
+ RETURN_IF_ERROR(init_reader());
+ }
+
+ if (_current_offset >= _file_size) {
+ *eof = true;
+ return Status::OK();
+ }
+
+ uint64_t len = 0;
+ Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
+ size_t bytes_read = 0;
+ RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice,
&bytes_read));
+ if (bytes_read == 0) {
+ *eof = true;
+ return Status::OK();
+ }
+ if (bytes_read != sizeof(len)) {
+ return Status::InternalError(
+ "Failed to read native block length from file {}, expect {}, "
+ "actual {}",
+ _scan_range.path, sizeof(len), bytes_read);
+ }
+
+ _current_offset += sizeof(len);
+ if (len == 0) {
+ // Empty block, nothing to read.
+ *eof = (_current_offset >= _file_size);
+ return Status::OK();
+ }
+
+ buff->assign(len, '\0');
+ Slice data_slice(buff->data(), len);
+ bytes_read = 0;
+ RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice,
&bytes_read));
+ if (bytes_read != len) {
+ return Status::InternalError(
+ "Failed to read native block body from file {}, expect {}, "
+ "actual {}",
+ _scan_range.path, len, bytes_read);
+ }
+
+ _current_offset += len;
+ *eof = (_current_offset >= _file_size);
+ return Status::OK();
+}
+
+Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) {
+ _schema_col_names.clear();
+ _schema_col_types.clear();
+
+ for (const auto& pcol_meta : pblock.column_metas()) {
+ DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(pcol_meta));
+ VLOG_DEBUG << "init_schema_from_pblock, name=" << pcol_meta.name()
+ << ", type=" << type->get_name();
+ _schema_col_names.emplace_back(pcol_meta.name());
+ _schema_col_types.emplace_back(type);
+ }
+ _schema_inited = true;
+ return Status::OK();
+}
+
+#include "common/compile_check_end.h"
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/native/native_reader.h
b/be/src/vec/exec/format/native/native_reader.h
new file mode 100644
index 00000000000..bc1635e0ccb
--- /dev/null
+++ b/be/src/vec/exec/format/native/native_reader.h
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "common/status.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "io/fs/file_reader_writer_fwd.h"
+#include "vec/exec/format/generic_reader.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace io {
+struct IOContext;
+} // namespace io
+} // namespace doris
+
+namespace doris::vectorized {
+class Block;
+
+#include "common/compile_check_begin.h"
+
+// Doris Native format reader.
+// it will read a sequence of Blocks encoded in Doris Native binary format.
+//
+// NOTE: current implementation is just a skeleton and will be filled step by
step.
+class NativeReader : public GenericReader {
+public:
+ ENABLE_FACTORY_CREATOR(NativeReader);
+
+ NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range, io::IOContext* io_ctx,
RuntimeState* state);
+
+ ~NativeReader() override;
+
+ // Initialize underlying file reader and any format specific state.
+ Status init_reader();
+
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+ Status get_columns(std::unordered_map<std::string, DataTypePtr>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
+
+ Status init_schema_reader() override;
+
+ Status get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<DataTypePtr>* col_types) override;
+
+ Status close() override;
+
+ bool count_read_rows() override { return true; }
+
+protected:
+ void _collect_profile_before_close() override {}
+
+private:
+ RuntimeProfile* _profile = nullptr;
+ const TFileScanRangeParams& _scan_params;
+ const TFileRangeDesc& _scan_range;
+
+ io::FileReaderSPtr _file_reader;
+ io::IOContext* _io_ctx = nullptr;
+ RuntimeState* _state = nullptr;
+
+ bool _eof = false;
+
+ // Current read offset in the underlying file.
+ int64_t _current_offset = 0;
+ int64_t _file_size = 0;
+
+ // Cached schema information from the first PBlock.
+ bool _schema_inited = false;
+ std::vector<std::string> _schema_col_names;
+ std::vector<DataTypePtr> _schema_col_types;
+
+ // Cached first block (serialized) to allow schema probing before data
scan.
+ std::string _first_block_buf;
+ bool _first_block_loaded = false;
+ bool _first_block_consumed = false;
+
+ Status _read_next_pblock(std::string* buff, bool* eof);
+ Status _init_schema_from_pblock(const PBlock& pblock);
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/file_scanner.cpp
b/be/src/vec/exec/scan/file_scanner.cpp
index 5124f8ea631..37f580b25ff 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -60,6 +60,7 @@
#include "vec/exec/format/avro/avro_jni_reader.h"
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/json/new_json_reader.h"
+#include "vec/exec/format/native/native_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/hive_reader.h"
@@ -599,10 +600,6 @@ Status FileScanner::_cast_to_input_block(Block* block) {
// skip columns which does not exist in file
continue;
}
- if (slot_desc->type()->get_primitive_type() ==
PrimitiveType::TYPE_VARIANT) {
- // skip variant type
- continue;
- }
auto& arg =
_src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
auto return_type = slot_desc->get_data_type_ptr();
// remove nullable here, let the get_function decide whether nullable
@@ -617,8 +614,10 @@ Status FileScanner::_cast_to_input_block(Block* block) {
return_type->get_name());
}
idx = _src_block_name_to_idx[slot_desc->col_name()];
+ DCHECK(_state != nullptr);
+ auto ctx = FunctionContext::create_context(_state, {}, {});
RETURN_IF_ERROR(
- func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx,
arg.column->size()));
+ func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx,
arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
}
return Status::OK();
@@ -1118,6 +1117,14 @@ Status FileScanner::_get_next_reader() {
init_status =
((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
break;
}
+ case TFileFormatType::FORMAT_NATIVE: {
+ auto reader =
+ NativeReader::create_unique(_profile, *_params, range,
_io_ctx.get(), _state);
+ init_status = reader->init_reader();
+ _cur_reader = std::move(reader);
+ need_to_get_parsed_schema = false;
+ break;
+ }
case TFileFormatType::FORMAT_ARROW: {
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "remote_doris")
{
diff --git a/be/src/vec/functions/cast/cast_to_jsonb.h
b/be/src/vec/functions/cast/cast_to_jsonb.h
index 92e3d4954a9..bd3cd8a329b 100644
--- a/be/src/vec/functions/cast/cast_to_jsonb.h
+++ b/be/src/vec/functions/cast/cast_to_jsonb.h
@@ -222,7 +222,7 @@ struct ParseJsonbFromString {
}
};
-// create cresponding jsonb value with type to_type
+// create corresponding jsonb value with type to_type
// use jsonb writer to create jsonb value
WrapperType create_cast_to_jsonb_wrapper(const DataTypePtr& from_type, const
DataTypeJsonb& to_type,
bool string_as_jsonb_string) {
diff --git a/be/src/vec/functions/cast/cast_to_variant.h
b/be/src/vec/functions/cast/cast_to_variant.h
index 1593f9023de..1c45bb46597 100644
--- a/be/src/vec/functions/cast/cast_to_variant.h
+++ b/be/src/vec/functions/cast/cast_to_variant.h
@@ -20,86 +20,102 @@
#include "cast_base.h"
#include "cast_to_string.h"
#include "vec/data_types/data_type_variant.h"
+
namespace doris::vectorized::CastWrapper {
-struct CastFromVariant {
- static Status execute(FunctionContext* context, Block& block, const
ColumnNumbers& arguments,
- uint32_t result, size_t input_rows_count,
- const NullMap::value_type* null_map = nullptr) {
- auto& data_type_to = block.get_by_position(result).type;
- const auto& col_with_type_and_name =
block.get_by_position(arguments[0]);
- const auto& col_from = col_with_type_and_name.column;
- const auto& variant = assert_cast<const ColumnVariant&>(*col_from);
- ColumnPtr col_to = data_type_to->create_column();
- if (!variant.is_finalized()) {
- // ColumnVariant should be finalized before parsing, finalize
maybe modify original column structure
- variant.assume_mutable()->finalize();
- }
- // It's important to convert as many elements as possible in this
context. For instance,
- // if the root of this variant column is a number column, converting
it to a number column
- // is acceptable. However, if the destination type is a string and
root is none scalar root, then
- // we should convert the entire tree to a string.
- bool is_root_valuable = variant.is_scalar_variant() ||
- (!variant.is_null_root() &&
- variant.get_root_type()->get_primitive_type()
!= INVALID_TYPE &&
-
!is_string_type(data_type_to->get_primitive_type()) &&
- data_type_to->get_primitive_type() !=
TYPE_JSONB);
- if (is_root_valuable) {
- ColumnPtr nested = variant.get_root();
- auto nested_from_type = variant.get_root_type();
- // DCHECK(nested_from_type->is_nullable());
- DCHECK(!data_type_to->is_nullable());
- auto new_context = context->clone();
+// shared implementation for casting from variant to arbitrary non-nullable
target type
+inline Status cast_from_variant_impl(FunctionContext* context, Block& block,
+ const ColumnNumbers& arguments, uint32_t
result,
+ size_t input_rows_count,
+ const NullMap::value_type* /*null_map*/,
+ const DataTypePtr& data_type_to) {
+ const auto& col_with_type_and_name = block.get_by_position(arguments[0]);
+ const auto& col_from = col_with_type_and_name.column;
+ const auto& variant = assert_cast<const ColumnVariant&>(*col_from);
+ ColumnPtr col_to = data_type_to->create_column();
+
+ if (!variant.is_finalized()) {
+ // ColumnVariant should be finalized before parsing, finalize maybe
modify original column structure
+ variant.assume_mutable()->finalize();
+ }
+
+ // It's important to convert as many elements as possible in this context.
For instance,
+ // if the root of this variant column is a number column, converting it to
a number column
+ // is acceptable. However, if the destination type is a string and root is
none scalar root, then
+ // we should convert the entire tree to a string.
+ bool is_root_valuable = variant.is_scalar_variant() ||
+ (!variant.is_null_root() &&
+ variant.get_root_type()->get_primitive_type() !=
INVALID_TYPE &&
+
!is_string_type(data_type_to->get_primitive_type()) &&
+ data_type_to->get_primitive_type() != TYPE_JSONB);
+
+ if (is_root_valuable) {
+ ColumnPtr nested = variant.get_root();
+ auto nested_from_type = variant.get_root_type();
+ // DCHECK(nested_from_type->is_nullable());
+ DCHECK(!data_type_to->is_nullable());
+ auto new_context = context == nullptr ? nullptr : context->clone();
+ if (new_context != nullptr) {
new_context->set_jsonb_string_as_string(true);
// Strict mode is for user INSERT validation, not variant internal
conversion.
new_context->set_enable_strict_mode(false);
- // dst type nullable has been removed, so we should remove the
inner nullable of root column
- auto wrapper = prepare_impl(new_context.get(),
remove_nullable(nested_from_type),
- data_type_to);
- Block tmp_block {{remove_nullable(nested),
remove_nullable(nested_from_type), ""}};
- tmp_block.insert({nullptr, data_type_to, ""});
- /// Perform the requested conversion.
- Status st = wrapper(new_context.get(), tmp_block, {0}, 1,
input_rows_count, nullptr);
- if (!st.ok()) {
- // Fill with default values, which is null
-
col_to->assume_mutable()->insert_many_defaults(input_rows_count);
- col_to = make_nullable(col_to, true);
- } else {
- col_to = tmp_block.get_by_position(1).column;
- // Note: here we should return the nullable result column
- col_to = wrap_in_nullable(
- col_to, Block({{nested, nested_from_type, ""},
{col_to, data_type_to, ""}}),
- {0}, input_rows_count);
- }
+ }
+ // dst type nullable has been removed, so we should remove the inner
nullable of root column
+ auto wrapper =
+ prepare_impl(new_context.get(),
remove_nullable(nested_from_type), data_type_to);
+ Block tmp_block {{remove_nullable(nested),
remove_nullable(nested_from_type), ""}};
+ tmp_block.insert({nullptr, data_type_to, ""});
+ /// Perform the requested conversion.
+ Status st = wrapper(new_context.get(), tmp_block, {0}, 1,
input_rows_count, nullptr);
+ if (!st.ok()) {
+ // Fill with default values, which is null
+ col_to->assume_mutable()->insert_many_defaults(input_rows_count);
+ col_to = make_nullable(col_to, true);
} else {
- if (variant.only_have_default_values()) {
-
col_to->assume_mutable()->insert_many_defaults(input_rows_count);
- col_to = make_nullable(col_to, true);
- } else if (is_string_type(data_type_to->get_primitive_type())) {
- // serialize to string
- return CastToStringFunction::execute_impl(context, block,
arguments, result,
- input_rows_count);
- } else if (data_type_to->get_primitive_type() == TYPE_JSONB) {
- // serialize to json by parsing
- return cast_from_generic_to_jsonb(context, block, arguments,
result,
- input_rows_count);
- } else if (!data_type_to->is_nullable() &&
- !is_string_type(data_type_to->get_primitive_type())) {
- // other types
-
col_to->assume_mutable()->insert_many_defaults(input_rows_count);
- col_to = make_nullable(col_to, true);
- } else {
- assert_cast<ColumnNullable&>(*col_to->assume_mutable())
- .insert_many_defaults(input_rows_count);
- }
+ col_to = tmp_block.get_by_position(1).column;
+ // Note: here we should return the nullable result column
+ col_to = wrap_in_nullable(
+ col_to, Block({{nested, nested_from_type, ""}, {col_to,
data_type_to, ""}}),
+ {0}, input_rows_count);
}
- if (col_to->size() != input_rows_count) {
- return Status::InternalError("Unmatched row count {}, expected
{}", col_to->size(),
- input_rows_count);
+ } else {
+ if (variant.only_have_default_values()) {
+ col_to->assume_mutable()->insert_many_defaults(input_rows_count);
+ col_to = make_nullable(col_to, true);
+ } else if (is_string_type(data_type_to->get_primitive_type())) {
+ // serialize to string
+ return CastToStringFunction::execute_impl(context, block,
arguments, result,
+ input_rows_count);
+ } else if (data_type_to->get_primitive_type() == TYPE_JSONB) {
+ // serialize to json by parsing
+ return cast_from_generic_to_jsonb(context, block, arguments,
result, input_rows_count);
+ } else if (!data_type_to->is_nullable() &&
+ !is_string_type(data_type_to->get_primitive_type())) {
+ // other types
+ col_to->assume_mutable()->insert_many_defaults(input_rows_count);
+ col_to = make_nullable(col_to, true);
+ } else {
+ assert_cast<ColumnNullable&>(*col_to->assume_mutable())
+ .insert_many_defaults(input_rows_count);
}
+ }
- block.replace_by_position(result, std::move(col_to));
- return Status::OK();
+ if (col_to->size() != input_rows_count) {
+ return Status::InternalError("Unmatched row count {}, expected {}",
col_to->size(),
+ input_rows_count);
+ }
+
+ block.replace_by_position(result, std::move(col_to));
+ return Status::OK();
+}
+
+struct CastFromVariant {
+ static Status execute(FunctionContext* context, Block& block, const
ColumnNumbers& arguments,
+ uint32_t result, size_t input_rows_count,
+ const NullMap::value_type* null_map = nullptr) {
+ auto& data_type_to = block.get_by_position(result).type;
+ return cast_from_variant_impl(context, block, arguments, result,
input_rows_count, null_map,
+ data_type_to);
}
};
@@ -119,16 +135,32 @@ struct CastToVariant {
}
};
-// create cresponding variant value to wrap from_type
+// create corresponding variant value to wrap from_type
WrapperType create_cast_to_variant_wrapper(const DataTypePtr& from_type,
const DataTypeVariant& to_type) {
+ if (from_type->get_primitive_type() == TYPE_VARIANT) {
+ // variant_max_subcolumns_count is not equal
+ return create_unsupport_wrapper(from_type->get_name(),
to_type.get_name());
+ }
return &CastToVariant::execute;
}
-// create cresponding type convert from variant
+// create corresponding type convert from variant
WrapperType create_cast_from_variant_wrapper(const DataTypeVariant& from_type,
const DataTypePtr& to_type) {
- return &CastFromVariant::execute;
+ if (to_type->get_primitive_type() == TYPE_VARIANT) {
+ // variant_max_subcolumns_count is not equal
+ return create_unsupport_wrapper(from_type.get_name(),
to_type->get_name());
+ }
+ // Capture explicit target type to make the cast independent from
Block[result].type.
+ DataTypePtr captured_to_type = to_type;
+ return [captured_to_type](FunctionContext* context, Block& block,
+ const ColumnNumbers& arguments, uint32_t result,
+ size_t input_rows_count,
+ const NullMap::value_type* null_map) -> Status {
+ return cast_from_variant_impl(context, block, arguments, result,
input_rows_count, null_map,
+ captured_to_type);
+ };
}
} // namespace doris::vectorized::CastWrapper
diff --git a/be/src/vec/runtime/vnative_transformer.cpp
b/be/src/vec/runtime/vnative_transformer.cpp
new file mode 100644
index 00000000000..578364eff22
--- /dev/null
+++ b/be/src/vec/runtime/vnative_transformer.cpp
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vnative_transformer.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/data.pb.h>
+#include <glog/logging.h>
+
+#include "agent/be_exec_version_manager.h"
+#include "io/fs/file_writer.h"
+#include "runtime/runtime_state.h"
+#include "util/slice.h"
+#include "vec/core/block.h"
+#include "vec/exec/format/native/native_format.h"
+
+namespace doris::vectorized {
+
+#include "common/compile_check_begin.h"
+
+namespace {
+
+// Map high-level TFileCompressType to low-level segment_v2::CompressionTypePB.
+segment_v2::CompressionTypePB
to_local_compression_type(TFileCompressType::type type) {
+ using CT = segment_v2::CompressionTypePB;
+ switch (type) {
+ case TFileCompressType::GZ:
+ case TFileCompressType::ZLIB:
+ case TFileCompressType::DEFLATE:
+ return CT::ZLIB;
+ case TFileCompressType::LZ4FRAME:
+ case TFileCompressType::LZ4BLOCK:
+ return CT::LZ4;
+ case TFileCompressType::SNAPPYBLOCK:
+ return CT::SNAPPY;
+ case TFileCompressType::ZSTD:
+ return CT::ZSTD;
+ default:
+ return CT::ZSTD;
+ }
+}
+
+} // namespace
+
+VNativeTransformer::VNativeTransformer(RuntimeState* state,
doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs&
output_vexpr_ctxs,
+ bool output_object_data,
+ TFileCompressType::type compress_type)
+ : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+ _file_writer(file_writer),
+ _compression_type(to_local_compression_type(compress_type)) {}
+
+Status VNativeTransformer::open() {
+ // Write Doris Native file header:
+ // [magic bytes "DORISN1\0"][uint32_t format_version]
+ DCHECK(_file_writer != nullptr);
+ uint32_t version = DORIS_NATIVE_FORMAT_VERSION;
+
+ Slice magic_slice(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC));
+ Slice version_slice(reinterpret_cast<char*>(&version), sizeof(uint32_t));
+
+ RETURN_IF_ERROR(_file_writer->append(magic_slice));
+ RETURN_IF_ERROR(_file_writer->append(version_slice));
+
+ _written_len += sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t);
+ return Status::OK();
+}
+
+Status VNativeTransformer::write(const Block& block) {
+ if (block.rows() == 0) {
+ return Status::OK();
+ }
+
+ // Serialize Block into PBlock using existing vec serialization logic.
+ PBlock pblock;
+ size_t uncompressed_bytes = 0;
+ size_t compressed_bytes = 0;
+ int64_t compressed_time = 0;
+
+
RETURN_IF_ERROR(block.serialize(BeExecVersionManager::get_newest_version(),
&pblock,
+ &uncompressed_bytes, &compressed_bytes,
&compressed_time,
+ _compression_type));
+
+ std::string buff;
+ if (!pblock.SerializeToString(&buff)) {
+ auto err = Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
+ "serialize native block error. block rows: {}", block.rows());
+ return err;
+ }
+
+ // Layout of Doris Native file:
+ // [uint64_t block_size][PBlock bytes]...
+ uint64_t len = buff.size();
+ Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
+ RETURN_IF_ERROR(_file_writer->append(len_slice));
+ RETURN_IF_ERROR(_file_writer->append(buff));
+
+ _written_len += sizeof(len) + buff.size();
+ _cur_written_rows += block.rows();
+
+ return Status::OK();
+}
+
+Status VNativeTransformer::close() {
+ // Close underlying FileWriter to ensure data is flushed to disk.
+ if (_file_writer != nullptr && _file_writer->state() !=
doris::io::FileWriter::State::CLOSED) {
+ RETURN_IF_ERROR(_file_writer->close());
+ }
+
+ return Status::OK();
+}
+
+int64_t VNativeTransformer::written_len() {
+ return _written_len;
+}
+
+#include "common/compile_check_end.h"
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vnative_transformer.h
b/be/src/vec/runtime/vnative_transformer.h
new file mode 100644
index 00000000000..fdf7ff46ac9
--- /dev/null
+++ b/be/src/vec/runtime/vnative_transformer.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include <cstdint>
+
+#include "common/status.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vfile_format_transformer.h"
+
+namespace doris::io {
+class FileWriter;
+} // namespace doris::io
+
+namespace doris::vectorized {
+class Block;
+
+#include "common/compile_check_begin.h"
+
+// Doris Native format writer.
+// It serializes vectorized Blocks into Doris Native binary format.
+class VNativeTransformer final : public VFileFormatTransformer {
+public:
+ // |compress_type| controls how the PBlock is compressed on disk (ZSTD,
LZ4, etc).
+ // Defaults to ZSTD to preserve the previous behavior.
+ VNativeTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs& output_vexpr_ctxs, bool
output_object_data,
+ TFileCompressType::type compress_type =
TFileCompressType::ZSTD);
+
+ ~VNativeTransformer() override = default;
+
+ Status open() override;
+
+ Status write(const Block& block) override;
+
+ Status close() override;
+
+ int64_t written_len() override;
+
+private:
+ doris::io::FileWriter* _file_writer; // not owned
+ int64_t _written_len = 0;
+ // Compression type used for Block::serialize (PBlock compression).
+ segment_v2::CompressionTypePB _compression_type
{segment_v2::CompressionTypePB::ZSTD};
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 0a2aee6c811..edec869ab36 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -58,6 +58,7 @@
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/cast/cast_to_string.h"
#include "vec/runtime/vcsv_transformer.h"
+#include "vec/runtime/vnative_transformer.h"
#include "vec/runtime/vorc_transformer.h"
#include "vec/runtime/vparquet_transformer.h"
#include "vec/sink/vmysql_result_writer.h"
@@ -146,6 +147,12 @@ Status VFileResultWriter::_create_file_writer(const
std::string& file_name) {
_state, _file_writer_impl.get(), _vec_output_expr_ctxs,
_file_opts->orc_schema, {},
_output_object_data, _file_opts->orc_compression_type));
break;
+ case TFileFormatType::FORMAT_NATIVE:
+ // Doris Native binary format writer with configurable compression.
+ _vfile_writer.reset(new VNativeTransformer(_state,
_file_writer_impl.get(),
+ _vec_output_expr_ctxs,
_output_object_data,
+
_file_opts->compression_type));
+ break;
default:
return Status::InternalError("unsupported file format: {}",
_file_opts->file_format);
}
@@ -203,6 +210,8 @@ std::string VFileResultWriter::_file_format_to_name() {
return "parquet";
case TFileFormatType::FORMAT_ORC:
return "orc";
+ case TFileFormatType::FORMAT_NATIVE:
+ return "native";
default:
return "unknown";
}
diff --git a/be/test/data/vec/native/all_types_single_row.native
b/be/test/data/vec/native/all_types_single_row.native
new file mode 100644
index 00000000000..90f825153ad
Binary files /dev/null and
b/be/test/data/vec/native/all_types_single_row.native differ
diff --git a/be/test/vec/exec/format/native/native_reader_writer_test.cpp
b/be/test/vec/exec/format/native/native_reader_writer_test.cpp
new file mode 100644
index 00000000000..323f94944b3
--- /dev/null
+++ b/be/test/vec/exec/format/native/native_reader_writer_test.cpp
@@ -0,0 +1,1350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <string>
+
+#include "common/config.h"
+#include "io/fs/local_file_system.h"
+#include "io/fs/path.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/runtime_state.h"
+#include "util/jsonb_writer.h"
+#include "util/uid_util.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_map.h"
+#include "vec/columns/column_struct.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/data_types/data_type_variant.h"
+#include "vec/data_types/serde/data_type_serde.h"
+#include "vec/exec/format/native/native_format.h"
+#include "vec/exec/format/native/native_reader.h"
+#include "vec/runtime/vnative_transformer.h"
+
+namespace doris::vectorized {
+
+class NativeReaderWriterTest : public ::testing::Test {};
+
+static void fill_primitive_columns(Block& block, size_t rows) {
+ DataTypePtr int_type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false));
+ DataTypePtr str_type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR,
false));
+
+ {
+ MutableColumnPtr col = int_type->create_column();
+ for (size_t i = 0; i < rows; ++i) {
+ if (i % 3 == 0) {
+ // null
+ col->insert_default();
+ } else {
+ // insert int value via Field to match column interface
+ auto field =
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i * 10));
+ col->insert(field);
+ }
+ }
+ block.insert(ColumnWithTypeAndName(std::move(col), int_type,
"int_col"));
+ }
+
+ {
+ MutableColumnPtr col = str_type->create_column();
+ for (size_t i = 0; i < rows; ++i) {
+ if (i % 4 == 0) {
+ col->insert_default();
+ } else {
+ std::string v = "s" + std::to_string(i);
+ // insert varchar value via Field
+ auto field =
Field::create_field<PrimitiveType::TYPE_VARCHAR>(v);
+ col->insert(field);
+ }
+ }
+ block.insert(ColumnWithTypeAndName(std::move(col), str_type,
"str_col"));
+ }
+}
+
+static void fill_array_column(Block& block, size_t rows) {
+ // array<int>
+ DataTypePtr arr_nested_type =
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+ DataTypePtr arr_type =
make_nullable(std::make_shared<DataTypeArray>(arr_nested_type));
+
+ {
+ MutableColumnPtr col = arr_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column();
+ auto& null_map = nullable_col->get_null_map_data();
+ auto& array_col = assert_cast<ColumnArray&>(nested);
+ auto& offsets = array_col.get_offsets();
+ auto& data = array_col.get_data();
+ auto mutable_data = data.assume_mutable();
+
+ for (size_t i = 0; i < rows; ++i) {
+ if (i % 5 == 0) {
+ // null array
+ nullable_col->insert_default();
+ } else {
+ // non-null array with 3 elements: [i, i+1, i+2]
+ null_map.push_back(0);
+ mutable_data->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
+ mutable_data->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
+ mutable_data->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 2)));
+ offsets.push_back(offsets.empty() ? 3 : offsets.back() + 3);
+ }
+ }
+ block.insert(ColumnWithTypeAndName(std::move(col), arr_type,
"arr_col"));
+ }
+}
+
+static void fill_map_column(Block& block, size_t rows) {
+ // map<string, int>
+ DataTypePtr map_key_type =
DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false);
+ DataTypePtr map_value_type =
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+ DataTypePtr map_type =
+ make_nullable(std::make_shared<DataTypeMap>(map_key_type,
map_value_type));
+
+ // map<string, int> column
+ {
+ MutableColumnPtr col = map_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column();
+ auto& null_map = nullable_col->get_null_map_data();
+
+ for (size_t i = 0; i < rows; ++i) {
+ if (i % 7 == 0) {
+ // null map
+ nullable_col->insert_default();
+ } else {
+ null_map.push_back(0);
+ auto& offsets = assert_cast<ColumnMap&>(nested).get_offsets();
+ auto& keys = assert_cast<ColumnMap&>(nested).get_keys();
+ auto& values = assert_cast<ColumnMap&>(nested).get_values();
+
+ auto mutable_keys = keys.assume_mutable();
+ auto mutable_values = values.assume_mutable();
+
+ std::string k1 = "k" + std::to_string(i);
+ std::string k2 = "k" + std::to_string(i + 1);
+
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k1));
+ mutable_values->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
+
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k2));
+ mutable_values->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
+
+ offsets.push_back(offsets.empty() ? 2 : offsets.back() + 2);
+ }
+ }
+ block.insert(ColumnWithTypeAndName(std::move(col), map_type,
"map_col"));
+ }
+}
+
+static void fill_struct_column(Block& block, size_t rows) {
+ // struct<si:int, ss:string>
+ DataTypes struct_fields;
+ struct_fields.emplace_back(
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false)));
+ struct_fields.emplace_back(
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR,
false)));
+ DataTypePtr struct_type =
make_nullable(std::make_shared<DataTypeStruct>(struct_fields));
+
+ // struct<si:int, ss:string> column
+ {
+ MutableColumnPtr col = struct_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column();
+ auto& null_map = nullable_col->get_null_map_data();
+
+ auto& struct_col = assert_cast<ColumnStruct&>(nested);
+ const auto& fields = struct_col.get_columns();
+
+ for (size_t i = 0; i < rows; ++i) {
+ if (i % 6 == 0) {
+ nullable_col->insert_default();
+ } else {
+ null_map.push_back(0);
+ auto mutable_field0 = fields[0]->assume_mutable();
+ auto mutable_field1 = fields[1]->assume_mutable();
+ // int field
+
mutable_field0->insert(Field::create_field<PrimitiveType::TYPE_INT>(
+ static_cast<int32_t>(i * 100)));
+ // string field
+ std::string vs = "ss" + std::to_string(i);
+
mutable_field1->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(vs));
+ }
+ }
+ block.insert(ColumnWithTypeAndName(std::move(col), struct_type,
"struct_col"));
+ }
+}
+
+static void fill_variant_column(Block& block, size_t rows) {
+ // variant
+ DataTypePtr variant_type =
make_nullable(std::make_shared<DataTypeVariant>());
+
+ // variant column: use JSON strings + deserialize_column_from_json_vector
to populate ColumnVariant
+ {
+ MutableColumnPtr col = variant_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column(); // ColumnVariant
+
+ // Prepare JSON strings with variable number of keys per row
+ std::vector<std::string> json_rows;
+ json_rows.reserve(rows);
+ std::vector<Slice> slices;
+ slices.reserve(rows);
+
+ size_t key_cnt = 100;
+ for (size_t i = 0; i < rows; ++i) {
+ // key count cycles between 1, 2, 3, ... for better coverage
+ std::string json = "{";
+ for (size_t k = 0; k < key_cnt; ++k) {
+ if (k > 0) {
+ json += ",";
+ }
+ // keys: "k0", "k1", ...
+ json += "\"k" + std::to_string(k) + "\":";
+ // mix int and string values
+ if ((i + k) % 2 == 0) {
+ json += std::to_string(static_cast<int32_t>(i * 10 + k));
+ } else {
+ json += "\"v" + std::to_string(i * 10 + k) + "\"";
+ }
+ }
+ json += "}";
+ json_rows.emplace_back(std::move(json));
+ slices.emplace_back(json_rows.back().data(),
json_rows.back().size());
+ }
+
+ // Use Variant SerDe to parse JSON into ColumnVariant
+ auto variant_type_inner = std::make_shared<DataTypeVariant>();
+ auto serde = variant_type_inner->get_serde();
+ auto* variant_serde = assert_cast<DataTypeVariantSerDe*>(serde.get());
+
+ uint64_t num_deserialized = 0;
+ DataTypeSerDe::FormatOptions options;
+ Status st = variant_serde->deserialize_column_from_json_vector(nested,
slices,
+
&num_deserialized, options);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(rows, num_deserialized);
+
+ // All rows are treated as non-null here
+ auto& null_map = nullable_col->get_null_map_data();
+ null_map.clear();
+ null_map.resize_fill(rows, 0);
+
+ block.insert(ColumnWithTypeAndName(std::move(col), variant_type,
"variant_col"));
+ }
+}
+
+static void fill_complex_columns(Block& block, size_t rows) {
+ fill_array_column(block, rows);
+ fill_map_column(block, rows);
+ fill_struct_column(block, rows);
+ fill_variant_column(block, rows);
+}
+
+static Block create_test_block(size_t rows) {
+ Block block;
+ // simple schema with primitive + complex types:
+ // int_col, str_col, arr_col, map_col, struct_col, variant_col
+ fill_primitive_columns(block, rows);
+ fill_complex_columns(block, rows);
+
+ return block;
+}
+
+TEST_F(NativeReaderWriterTest, round_trip_native_file) {
+ // Prepare a temporary local file path
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_" + uuid + ".native";
+
+ // 1. Write block to Native file via VNativeTransformer
+ auto fs = io::global_local_filesystem();
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs; // empty, VNativeTransformer won't use it directly
+ VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ Block src_block = create_test_block(16);
+ st = transformer.write(src_block);
+ ASSERT_TRUE(st.ok()) << st;
+ // VNativeTransformer::close() will also close the underlying FileWriter,
+ // so we don't need (and must not) close file_writer again here.
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // 2. Read back via NativeReader using normal file scan path
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr,
&state);
+
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(src_block.rows(), read_rows);
+ ASSERT_EQ(src_block.columns(), dst_block.columns());
+
+ // Compare column-wise values
+ for (size_t col = 0; col < src_block.columns(); ++col) {
+ const auto& src = src_block.get_by_position(col);
+ const auto& dst = dst_block.get_by_position(col);
+ ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name());
+ ASSERT_EQ(src.column->size(), dst.column->size());
+ for (size_t row = 0; row < src_block.rows(); ++row) {
+ auto src_field = (*src.column)[row];
+ auto dst_field = (*dst.column)[row];
+ ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << "
row=" << row;
+ }
+ }
+
+ // Next call should hit EOF
+ Block dst_block2;
+ read_rows = 0;
+ eof = false;
+ st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_TRUE(eof);
+ ASSERT_EQ(read_rows, 0);
+
+ // Clean up temp file
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+}
+
+// Edge cases: empty file and single-row file
+TEST_F(NativeReaderWriterTest, round_trip_empty_and_single_row) {
+ auto fs = io::global_local_filesystem();
+
+ // 1. Empty native file (no data blocks)
+ {
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_empty_" + uuid + ".native";
+
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs;
+ VNativeTransformer transformer(&state, file_writer.get(), exprs,
false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Write an empty block, should not produce any data block in file.
+ Block empty_block;
+ st = transformer.write(empty_block);
+ ASSERT_TRUE(st.ok()) << st;
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Read back: should directly hit EOF with 0 rows.
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr,
&state);
+
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(0U, read_rows);
+ ASSERT_TRUE(eof);
+
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+ }
+
+ // 2. Single-row native file
+ {
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_single_row_" + uuid +
".native";
+
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs;
+ VNativeTransformer transformer(&state, file_writer.get(), exprs,
false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ Block src_block = create_test_block(1);
+ st = transformer.write(src_block);
+ ASSERT_TRUE(st.ok()) << st;
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr,
&state);
+
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(src_block.rows(), read_rows);
+ ASSERT_EQ(src_block.columns(), dst_block.columns());
+ ASSERT_FALSE(eof);
+
+ // Verify data equality for the single row.
+ for (size_t col = 0; col < src_block.columns(); ++col) {
+ const auto& src = src_block.get_by_position(col);
+ const auto& dst = dst_block.get_by_position(col);
+ ASSERT_EQ(src.type->get_family_name(),
dst.type->get_family_name());
+ ASSERT_EQ(src.column->size(), dst.column->size());
+ auto src_field = (*src.column)[0];
+ auto dst_field = (*dst.column)[0];
+ ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << "
row=0";
+ }
+
+ // Next call should hit EOF
+ Block dst_block2;
+ read_rows = 0;
+ eof = false;
+ st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_TRUE(eof);
+ ASSERT_EQ(read_rows, 0U);
+
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+ }
+}
+
+// Large volume test: verify round-trip correctness with many rows.
+TEST_F(NativeReaderWriterTest, round_trip_native_file_large_rows) {
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_large_" + uuid + ".native";
+
+ auto fs = io::global_local_filesystem();
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs;
+ VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Use a relatively large number of rows to test stability and performance
characteristics.
+ const size_t kRows = 9999;
+ Block src_block = create_test_block(kRows);
+
+ // Split into multiple blocks and write them one by one to exercise
multi-block path.
+ const size_t kBatchRows = 128;
+ for (size_t offset = 0; offset < kRows; offset += kBatchRows) {
+ size_t len = std::min(kBatchRows, kRows - offset);
+ // Create a sub block with the same schema and a row range [offset,
offset+len)
+ Block sub_block = src_block.clone_empty();
+ for (size_t col = 0; col < src_block.columns(); ++col) {
+ const auto& src_col = *src_block.get_by_position(col).column;
+ const auto& dst_col_holder =
*sub_block.get_by_position(col).column;
+ auto dst_mutable = dst_col_holder.assume_mutable();
+ dst_mutable->insert_range_from(src_col, offset, len);
+ }
+ st = transformer.write(sub_block);
+ ASSERT_TRUE(st.ok()) << st;
+ }
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr,
&state);
+
+ // Read back in multiple blocks and merge into a single result block.
+ Block merged_block;
+ bool first_block = true;
+ size_t total_read_rows = 0;
+ while (true) {
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ if (read_rows > 0) {
+ if (first_block) {
+ merged_block = dst_block;
+ total_read_rows = read_rows;
+ first_block = false;
+ } else {
+ MutableBlock merged_mutable(&merged_block);
+ Status add_st = merged_mutable.add_rows(&dst_block, 0,
read_rows);
+ ASSERT_TRUE(add_st.ok()) << add_st;
+ total_read_rows += read_rows;
+ }
+ }
+ if (eof) {
+ break;
+ }
+ if (read_rows == 0) {
+ break;
+ }
+ }
+
+ ASSERT_EQ(src_block.rows(), total_read_rows);
+ ASSERT_EQ(src_block.columns(), merged_block.columns());
+
+ // Compare column-wise values
+ for (size_t col = 0; col < src_block.columns(); ++col) {
+ const auto& src = src_block.get_by_position(col);
+ const auto& dst = merged_block.get_by_position(col);
+ ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name());
+ ASSERT_EQ(src.column->size(), dst.column->size());
+ for (size_t row = 0; row < src_block.rows(); row += 10) {
+ auto src_field = (*src.column)[row];
+ auto dst_field = (*dst.column)[row];
+ ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << "
row=" << row;
+ }
+ }
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+}
+
+// Verify that NativeReader forces all columns to nullable, even if the writer
+// serialized non-nullable columns.
+TEST_F(NativeReaderWriterTest, non_nullable_columns_forced_nullable) {
+ auto fs = io::global_local_filesystem();
+
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_non_nullable_" + uuid + ".native";
+
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs;
+ // Use default compression type.
+ VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Build a block with non-nullable columns.
+ Block src_block;
+ DataTypePtr int_type =
+
DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, false);
+ {
+ MutableColumnPtr col = int_type->create_column();
+ for (int i = 0; i < 8; ++i) {
+ auto field = Field::create_field<PrimitiveType::TYPE_INT>(i * 3);
+ col->insert(field);
+ }
+ src_block.insert(ColumnWithTypeAndName(std::move(col), int_type,
"int_nn"));
+ }
+
+ DataTypePtr str_type =
+
DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_VARCHAR,
false);
+ {
+ MutableColumnPtr col = str_type->create_column();
+ for (int i = 0; i < 8; ++i) {
+ std::string v = "v" + std::to_string(i);
+ auto field = Field::create_field<PrimitiveType::TYPE_VARCHAR>(v);
+ col->insert(field);
+ }
+ src_block.insert(ColumnWithTypeAndName(std::move(col), str_type,
"str_nn"));
+ }
+
+ st = transformer.write(src_block);
+ ASSERT_TRUE(st.ok()) << st;
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Read back via NativeReader.
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr,
&state);
+
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(src_block.rows(), read_rows);
+ ASSERT_FALSE(eof);
+
+ // All columns returned by NativeReader should be nullable.
+ for (size_t col = 0; col < dst_block.columns(); ++col) {
+ const auto& dst = dst_block.get_by_position(col);
+ ASSERT_TRUE(dst.type->is_nullable()) << "column " << col << " should
be nullable";
+ }
+
+ // Values should be preserved.
+ for (size_t col = 0; col < src_block.columns(); ++col) {
+ const auto& src = src_block.get_by_position(col);
+ const auto& dst = dst_block.get_by_position(col);
+ ASSERT_EQ(src.column->size(), dst.column->size());
+ for (size_t row = 0; row < src_block.rows(); ++row) {
+ auto src_field = (*src.column)[row];
+ auto dst_field = (*dst.column)[row];
+ ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << "
row=" << row;
+ }
+ }
+
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+}
+
+// Verify that VNativeTransformer writes the native file header and that
+// NativeReader can transparently read files with this header.
+TEST_F(NativeReaderWriterTest,
transformer_writes_header_and_reader_handles_it) {
+ auto fs = io::global_local_filesystem();
+
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_with_header_" + uuid + ".native";
+
+ // Write a small block via VNativeTransformer.
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs;
+ VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ Block src_block = create_test_block(4);
+ st = transformer.write(src_block);
+ ASSERT_TRUE(st.ok()) << st;
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Read back raw bytes and verify the header.
+ {
+ io::FileReaderSPtr file_reader;
+ st = fs->open_file(file_path, &file_reader);
+ ASSERT_TRUE(st.ok()) << st;
+ size_t file_size = file_reader->size();
+ ASSERT_GE(file_size, sizeof(uint64_t) + 12);
+
+ char header[12];
+ Slice header_slice(header, sizeof(header));
+ size_t bytes_read = 0;
+ st = file_reader->read_at(0, header_slice, &bytes_read);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(sizeof(header), bytes_read);
+
+ ASSERT_EQ(0, memcmp(header, DORIS_NATIVE_MAGIC,
sizeof(DORIS_NATIVE_MAGIC)));
+ uint32_t version = 0;
+ memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC),
sizeof(uint32_t));
+ ASSERT_EQ(DORIS_NATIVE_FORMAT_VERSION, version);
+ }
+
+ // Now read via NativeReader; it should detect the header and skip it.
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr,
&state);
+
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(src_block.rows(), read_rows);
+ ASSERT_FALSE(eof);
+
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+}
+
+// Verify get_columns and get_parsed_schema can probe schema without scanning
+// the whole file.
+TEST_F(NativeReaderWriterTest, get_columns_and_parsed_schema) {
+ auto fs = io::global_local_filesystem();
+
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_schema_" + uuid + ".native";
+
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs;
+ VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ Block src_block = create_test_block(5);
+ st = transformer.write(src_block);
+ ASSERT_TRUE(st.ok()) << st;
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr,
&state);
+
+ std::unordered_map<std::string, DataTypePtr> name_to_type;
+ std::unordered_set<std::string> missing_cols;
+ st = reader_impl.get_columns(&name_to_type, &missing_cols);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_TRUE(missing_cols.empty());
+
+ // All columns from src_block should appear in name_to_type.
+ for (size_t i = 0; i < src_block.columns(); ++i) {
+ const auto& col = src_block.get_by_position(i);
+ auto it = name_to_type.find(col.name);
+ ASSERT_TRUE(it != name_to_type.end()) << "missing column " << col.name;
+ ASSERT_TRUE(it->second->is_nullable()) << "schema type should be
nullable for " << col.name;
+ }
+
+ std::vector<std::string> col_names;
+ std::vector<DataTypePtr> col_types;
+ st = reader_impl.get_parsed_schema(&col_names, &col_types);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(col_names.size(), col_types.size());
+ ASSERT_EQ(col_names.size(), src_block.columns());
+
+ for (size_t i = 0; i < col_names.size(); ++i) {
+ ASSERT_TRUE(col_types[i]->is_nullable());
+ }
+
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+}
+
+// Create a test block containing all known primitive and complex types with a
single row.
+// This function covers:
+// - Basic integers: BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT
+// - Floating point: FLOAT, DOUBLE
+// - Date/Time: DATE, DATETIME, DATEV2, DATETIMEV2, TIMEV2
+// - String types: CHAR, VARCHAR, STRING
+// - Decimal types: DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128I, DECIMAL256
+// - IP types: IPV4, IPV6
+// - JSON: JSONB
+// - Complex types: ARRAY, MAP, STRUCT, VARIANT
+static Block create_all_types_test_block() {
+ Block block;
+
+ // 1. BOOLEAN
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_BOOLEAN,
false));
+ MutableColumnPtr col = type->create_column();
+ col->insert(Field::create_field<PrimitiveType::TYPE_BOOLEAN>(UInt8
{1})); // true
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_boolean"));
+ }
+
+ // 2. TINYINT
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_TINYINT,
false));
+ MutableColumnPtr col = type->create_column();
+
col->insert(Field::create_field<PrimitiveType::TYPE_TINYINT>(static_cast<int8_t>(42)));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_tinyint"));
+ }
+
+ // 3. SMALLINT
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_SMALLINT,
false));
+ MutableColumnPtr col = type->create_column();
+ col->insert(
+
Field::create_field<PrimitiveType::TYPE_SMALLINT>(static_cast<int16_t>(1234)));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_smallint"));
+ }
+
+ // 4. INT
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false));
+ MutableColumnPtr col = type->create_column();
+
col->insert(Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(123456)));
+ block.insert(ColumnWithTypeAndName(std::move(col), type, "col_int"));
+ }
+
+ // 5. BIGINT
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_BIGINT, false));
+ MutableColumnPtr col = type->create_column();
+ col->insert(Field::create_field<PrimitiveType::TYPE_BIGINT>(
+ static_cast<int64_t>(9876543210LL)));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_bigint"));
+ }
+
+ // 6. LARGEINT
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_LARGEINT,
false));
+ MutableColumnPtr col = type->create_column();
+ Int128 large_val = Int128(123456789012345LL) * 1000000;
+
col->insert(Field::create_field<PrimitiveType::TYPE_LARGEINT>(large_val));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_largeint"));
+ }
+
+ // 7. FLOAT
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_FLOAT, false));
+ MutableColumnPtr col = type->create_column();
+ col->insert(Field::create_field<PrimitiveType::TYPE_FLOAT>(3.14F));
+ block.insert(ColumnWithTypeAndName(std::move(col), type, "col_float"));
+ }
+
+ // 8. DOUBLE
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DOUBLE, false));
+ MutableColumnPtr col = type->create_column();
+
col->insert(Field::create_field<PrimitiveType::TYPE_DOUBLE>(1.234567890123456789));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_double"));
+ }
+
+ // 9. DATE (DateV1)
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATE, false));
+ MutableColumnPtr col = type->create_column();
+ VecDateTimeValue dt;
+ dt.from_date_int64(20231215); // 2023-12-15
+ col->insert(Field::create_field<PrimitiveType::TYPE_DATE>(dt));
+ block.insert(ColumnWithTypeAndName(std::move(col), type, "col_date"));
+ }
+
+ // 10. DATETIME (DateTimeV1)
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATETIME,
false));
+ MutableColumnPtr col = type->create_column();
+ VecDateTimeValue dt;
+ dt.from_date_int64(20231215103045LL); // 2023-12-15 10:30:45
+ col->insert(Field::create_field<PrimitiveType::TYPE_DATETIME>(dt));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_datetime"));
+ }
+
+ // 11. DATEV2
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_DATEV2, false));
+ MutableColumnPtr col = type->create_column();
+ DateV2Value<DateV2ValueType> dv2;
+ dv2.from_date_int64(20231215);
+ col->insert(Field::create_field<PrimitiveType::TYPE_DATEV2>(dv2));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_datev2"));
+ }
+
+ // 12. DATETIMEV2 (scale=0)
+ {
+ DataTypePtr type = make_nullable(
+ DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2,
false, 0, 0));
+ MutableColumnPtr col = type->create_column();
+ DateV2Value<DateTimeV2ValueType> dtv2;
+ dtv2.from_date_int64(20231215103045LL);
+ col->insert(Field::create_field<PrimitiveType::TYPE_DATETIMEV2>(dtv2));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_datetimev2"));
+ }
+
+ // 13. TIMEV2 (scale=0)
+ {
+ DataTypePtr type = make_nullable(
+ DataTypeFactory::instance().create_data_type(TYPE_TIMEV2,
false, 0, 0));
+ MutableColumnPtr col = type->create_column();
+ // TIMEV2 is stored as Float64 representing seconds
+ col->insert(Field::create_field<PrimitiveType::TYPE_TIMEV2>(
+ 37845.0)); // 10:30:45 in seconds
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_timev2"));
+ }
+
+ // 14. CHAR
+ {
+ DataTypePtr type = make_nullable(
+ DataTypeFactory::instance().create_data_type(TYPE_CHAR, false,
0, 0, 20));
+ MutableColumnPtr col = type->create_column();
+
col->insert(Field::create_field<PrimitiveType::TYPE_CHAR>(std::string("fixed_char_val")));
+ block.insert(ColumnWithTypeAndName(std::move(col), type, "col_char"));
+ }
+
+ // 15. VARCHAR
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR,
false));
+ MutableColumnPtr col = type->create_column();
+ col->insert(
+
Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("variable_string")));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_varchar"));
+ }
+
+ // 16. STRING
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_STRING, false));
+ MutableColumnPtr col = type->create_column();
+ col->insert(Field::create_field<PrimitiveType::TYPE_STRING>(
+ std::string("long_text_content_here")));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_string"));
+ }
+
+ // 17. DECIMALV2 (precision=27, scale=9)
+ {
+ DataTypePtr type = make_nullable(
+ DataTypeFactory::instance().create_data_type(TYPE_DECIMALV2,
false, 27, 9));
+ MutableColumnPtr col = type->create_column();
+ DecimalV2Value dec_val(123456789, 123456789);
+
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMALV2>(dec_val));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_decimalv2"));
+ }
+
+ // 18. DECIMAL32 (precision=9, scale=2)
+ {
+ DataTypePtr type = make_nullable(
+ DataTypeFactory::instance().create_data_type(TYPE_DECIMAL32,
false, 9, 2));
+ MutableColumnPtr col = type->create_column();
+ Decimal32 dec_val(static_cast<Int32>(12345678));
+
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL32>(dec_val));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_decimal32"));
+ }
+
+ // 19. DECIMAL64 (precision=18, scale=4)
+ {
+ DataTypePtr type = make_nullable(
+ DataTypeFactory::instance().create_data_type(TYPE_DECIMAL64,
false, 18, 4));
+ MutableColumnPtr col = type->create_column();
+ Decimal64 dec_val(static_cast<Int64>(123456789012345678LL));
+
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL64>(dec_val));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_decimal64"));
+ }
+
+ // 20. DECIMAL128I (precision=38, scale=6)
+ {
+ DataTypePtr type = make_nullable(
+ DataTypeFactory::instance().create_data_type(TYPE_DECIMAL128I,
false, 38, 6));
+ MutableColumnPtr col = type->create_column();
+ Decimal128V3 dec_val(static_cast<Int128>(123456789012345678LL) * 100);
+
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL128I>(dec_val));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_decimal128"));
+ }
+
+ // 21. DECIMAL256 (precision=76, scale=10)
+ {
+ DataTypePtr type = make_nullable(
+ DataTypeFactory::instance().create_data_type(TYPE_DECIMAL256,
false, 76, 10));
+ MutableColumnPtr col = type->create_column();
+ wide::Int256 wide_val = wide::Int256(123456789012345678LL) *
10000000000LL;
+ Decimal256 dec_val(wide_val);
+
col->insert(Field::create_field<PrimitiveType::TYPE_DECIMAL256>(dec_val));
+ block.insert(ColumnWithTypeAndName(std::move(col), type,
"col_decimal256"));
+ }
+
+ // 22. IPV4
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_IPV4, false));
+ MutableColumnPtr col = type->create_column();
+ IPv4 ip_val = (192U << 24) | (168U << 16) | (1U << 8) | 100U; //
192.168.1.100
+ col->insert(Field::create_field<PrimitiveType::TYPE_IPV4>(ip_val));
+ block.insert(ColumnWithTypeAndName(std::move(col), type, "col_ipv4"));
+ }
+
+ // 23. IPV6
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_IPV6, false));
+ MutableColumnPtr col = type->create_column();
+ // ::ffff:192.168.1.100 in IPv6
+ IPv6 ip6_val = 0;
+ ip6_val = (static_cast<IPv6>(0xFFFF) << 32) |
+ ((192U << 24) | (168U << 16) | (1U << 8) | 100U);
+ col->insert(Field::create_field<PrimitiveType::TYPE_IPV6>(ip6_val));
+ block.insert(ColumnWithTypeAndName(std::move(col), type, "col_ipv6"));
+ }
+
+ // 24. JSONB
+ {
+ DataTypePtr type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_JSONB, false));
+ MutableColumnPtr col = type->create_column();
+ // Use JsonbWriter to create JSONB binary data
+ JsonbWriter writer;
+ writer.writeStartObject();
+ writer.writeKey("key", static_cast<uint8_t>(3));
+ writer.writeStartString();
+ writer.writeString("value");
+ writer.writeEndString();
+ writer.writeKey("num", static_cast<uint8_t>(3));
+ writer.writeInt32(123);
+ writer.writeEndObject();
+ const char* jsonb_data = writer.getOutput()->getBuffer();
+ size_t jsonb_size = writer.getOutput()->getSize();
+ JsonbField field(jsonb_data, jsonb_size);
+
col->insert(Field::create_field<PrimitiveType::TYPE_JSONB>(std::move(field)));
+ block.insert(ColumnWithTypeAndName(std::move(col), type, "col_jsonb"));
+ }
+
+ // 25. ARRAY<INT>
+ {
+ DataTypePtr arr_nested_type =
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+ DataTypePtr arr_type =
make_nullable(std::make_shared<DataTypeArray>(arr_nested_type));
+ MutableColumnPtr col = arr_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column();
+ auto& null_map = nullable_col->get_null_map_data();
+ null_map.push_back(0); // non-null
+
+ auto& array_col = assert_cast<ColumnArray&>(nested);
+ auto& offsets = array_col.get_offsets();
+ auto& data = array_col.get_data();
+ auto mutable_data = data.assume_mutable();
+
+ mutable_data->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(10)));
+ mutable_data->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(20)));
+ mutable_data->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(30)));
+ offsets.push_back(3); // 3 elements
+ block.insert(ColumnWithTypeAndName(std::move(col), arr_type,
"col_array"));
+ }
+
+ // 26. MAP<STRING, INT>
+ {
+ DataTypePtr map_key_type =
+ DataTypeFactory::instance().create_data_type(TYPE_VARCHAR,
false);
+ DataTypePtr map_value_type =
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+ DataTypePtr map_type =
+ make_nullable(std::make_shared<DataTypeMap>(map_key_type,
map_value_type));
+ MutableColumnPtr col = map_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column();
+ auto& null_map = nullable_col->get_null_map_data();
+ null_map.push_back(0); // non-null
+
+ auto& offsets = assert_cast<ColumnMap&>(nested).get_offsets();
+ auto& keys = assert_cast<ColumnMap&>(nested).get_keys();
+ auto& values = assert_cast<ColumnMap&>(nested).get_values();
+
+ auto mutable_keys = keys.assume_mutable();
+ auto mutable_values = values.assume_mutable();
+
+
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("key1")));
+ mutable_values->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(100)));
+
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("key2")));
+ mutable_values->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(200)));
+
+ offsets.push_back(2); // 2 key-value pairs
+ block.insert(ColumnWithTypeAndName(std::move(col), map_type,
"col_map"));
+ }
+
+ // 27. STRUCT<si:INT, ss:VARCHAR>
+ {
+ DataTypes struct_fields;
+ struct_fields.emplace_back(
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false)));
+ struct_fields.emplace_back(
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR,
false)));
+ Strings field_names = {"si", "ss"};
+ DataTypePtr struct_type =
+ make_nullable(std::make_shared<DataTypeStruct>(struct_fields,
field_names));
+ MutableColumnPtr col = struct_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column();
+ auto& null_map = nullable_col->get_null_map_data();
+ null_map.push_back(0); // non-null
+
+ auto& struct_col = assert_cast<ColumnStruct&>(nested);
+ const auto& fields = struct_col.get_columns();
+ auto mutable_field0 = fields[0]->assume_mutable();
+ auto mutable_field1 = fields[1]->assume_mutable();
+
+ mutable_field0->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(999)));
+ mutable_field1->insert(
+
Field::create_field<PrimitiveType::TYPE_VARCHAR>(std::string("struct_val")));
+ block.insert(ColumnWithTypeAndName(std::move(col), struct_type,
"col_struct"));
+ }
+
+ // 28. VARIANT (JSON object)
+ {
+ DataTypePtr variant_type =
make_nullable(std::make_shared<DataTypeVariant>());
+ MutableColumnPtr col = variant_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column();
+
+ std::string json_str = R"({"name":"test","value":12345})";
+ std::vector<Slice> slices = {Slice(json_str.data(), json_str.size())};
+
+ auto variant_type_inner = std::make_shared<DataTypeVariant>();
+ auto serde = variant_type_inner->get_serde();
+ auto* variant_serde = assert_cast<DataTypeVariantSerDe*>(serde.get());
+
+ uint64_t num_deserialized = 0;
+ DataTypeSerDe::FormatOptions options;
+ Status st = variant_serde->deserialize_column_from_json_vector(nested,
slices,
+
&num_deserialized, options);
+ EXPECT_TRUE(st.ok()) << st;
+ EXPECT_EQ(1U, num_deserialized);
+
+ auto& null_map = nullable_col->get_null_map_data();
+ null_map.clear();
+ null_map.resize_fill(1, 0);
+
+ block.insert(ColumnWithTypeAndName(std::move(col), variant_type,
"col_variant"));
+ }
+
+ return block;
+}
+
+// Pre-generated native file path for all types test.
+// The file is stored in: be/test/data/vec/native/all_types_single_row.native
+static std::string get_all_types_native_file_path() {
+ auto root_dir = std::string(getenv("ROOT"));
+ return root_dir + "/be/test/data/vec/native/all_types_single_row.native";
+}
+
+// Generator test: Generate native file with all types (DISABLED by default).
+// Run this test manually to regenerate the test data file:
+// ./run-be-ut.sh --run --filter=*DISABLED_generate_all_types_native_file*
+// Then copy the generated file to:
be/test/data/vec/native/all_types_single_row.native
+TEST_F(NativeReaderWriterTest, generate_all_types_native_file) {
+ // Output to current directory, user needs to copy it to test data dir
+ std::string file_path = "./all_types_single_row.native";
+
+ auto fs = io::global_local_filesystem();
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs;
+ VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ Block src_block = create_all_types_test_block();
+ ASSERT_EQ(1U, src_block.rows()) << "Source block should have exactly 1
row";
+ ASSERT_EQ(28U, src_block.columns()) << "Source block should have 28
columns for all types";
+
+ st = transformer.write(src_block);
+ ASSERT_TRUE(st.ok()) << st;
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ std::cout << "Generated native file: " << file_path << std::endl;
+ std::cout << "Please copy it to:
be/test/data/vec/native/all_types_single_row.native"
+ << std::endl;
+}
+
+// Test: read pre-generated native file with all types and verify data
+TEST_F(NativeReaderWriterTest, read_all_types_from_pregenerated_file) {
+ std::string file_path = get_all_types_native_file_path();
+
+ auto fs = io::global_local_filesystem();
+ bool exists = false;
+ Status st = fs->exists(file_path, &exists);
+ DCHECK(exists) << "Pre-generated native file not found: " << file_path
+ << ". Run generate_all_types_native_file to generate it.";
+
+ RuntimeState state;
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr,
&state);
+
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(1U, read_rows) << "Should read exactly 1 row";
+ ASSERT_EQ(28U, dst_block.columns()) << "Should have 28 columns for all
types";
+ ASSERT_FALSE(eof);
+
+ // Regenerate expected block and compare
+ Block expected_block = create_all_types_test_block();
+
+ // Verify data equality for the single row
+ for (size_t col = 0; col < expected_block.columns(); ++col) {
+ const auto& src = expected_block.get_by_position(col);
+ const auto& dst = dst_block.get_by_position(col);
+
+ // Type family should match
+ ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name())
+ << "Type mismatch at col=" << col << " (" << src.name << ")";
+ ASSERT_EQ(src.column->size(), dst.column->size())
+ << "Size mismatch at col=" << col << " (" << src.name << ")";
+
+ // Compare field values
+ auto src_field = (*src.column)[0];
+ auto dst_field = (*dst.column)[0];
+ ASSERT_EQ(src_field, dst_field)
+ << "Value mismatch at col=" << col << " (" << src.name << ")";
+ }
+
+ // Next call should hit EOF
+ Block dst_block2;
+ read_rows = 0;
+ eof = false;
+ st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_TRUE(eof);
+ ASSERT_EQ(read_rows, 0U);
+}
+
+// Test: round-trip all known types with a single row (generates temp file
each time)
+TEST_F(NativeReaderWriterTest, round_trip_all_types_single_row) {
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_all_types_" + uuid + ".native";
+
+ auto fs = io::global_local_filesystem();
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs;
+ VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ Block src_block = create_all_types_test_block();
+ ASSERT_EQ(1U, src_block.rows()) << "Source block should have exactly 1
row";
+ ASSERT_EQ(28U, src_block.columns()) << "Source block should have 28
columns for all types";
+
+ st = transformer.write(src_block);
+ ASSERT_TRUE(st.ok()) << st;
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Read back via NativeReader
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, nullptr,
&state);
+
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(src_block.rows(), read_rows);
+ ASSERT_EQ(src_block.columns(), dst_block.columns());
+ ASSERT_FALSE(eof);
+
+ // Verify data equality for the single row
+ for (size_t col = 0; col < src_block.columns(); ++col) {
+ const auto& src = src_block.get_by_position(col);
+ const auto& dst = dst_block.get_by_position(col);
+
+ // Type family should match
+ ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name())
+ << "Type mismatch at col=" << col << " (" << src.name << ")";
+ ASSERT_EQ(src.column->size(), dst.column->size())
+ << "Size mismatch at col=" << col << " (" << src.name << ")";
+
+ // Compare field values
+ auto src_field = (*src.column)[0];
+ auto dst_field = (*dst.column)[0];
+ ASSERT_EQ(src_field, dst_field)
+ << "Value mismatch at col=" << col << " (" << src.name << ")";
+ }
+
+ // Next call should hit EOF
+ Block dst_block2;
+ read_rows = 0;
+ eof = false;
+ st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_TRUE(eof);
+ ASSERT_EQ(read_rows, 0U);
+
+ // Clean up temp file
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+}
+
+} // namespace doris::vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
index 62345cd811d..774ee4e6e83 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
@@ -30,6 +30,7 @@ public class FileFormatConstants {
public static final String FORMAT_AVRO = "avro";
public static final String FORMAT_WAL = "wal";
public static final String FORMAT_ARROW = "arrow";
+ public static final String FORMAT_NATIVE = "native";
public static final String PROP_FORMAT = "format";
public static final String PROP_COLUMN_SEPARATOR = "column_separator";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index b95f35ffb60..ae75e4dd68f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -576,6 +576,9 @@ public class Util {
return TFileFormatType.FORMAT_WAL;
} else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ARROW)) {
return TFileFormatType.FORMAT_ARROW;
+ } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_NATIVE)) {
+ // Doris Native binary columnar format
+ return TFileFormatType.FORMAT_NATIVE;
} else {
return TFileFormatType.FORMAT_UNKNOWN;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
index 9706982c1ba..ac2512d88b5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
@@ -39,6 +39,7 @@ public abstract class FileFormatProperties {
public static final String FORMAT_AVRO = "avro";
public static final String FORMAT_WAL = "wal";
public static final String FORMAT_ARROW = "arrow";
+ public static final String FORMAT_NATIVE = "native";
public static final String PROP_COMPRESS_TYPE = "compress_type";
protected String formatName;
@@ -102,6 +103,8 @@ public abstract class FileFormatProperties {
return new WalFileFormatProperties();
case FORMAT_ARROW:
return new ArrowFileFormatProperties();
+ case FORMAT_NATIVE:
+ return new NativeFileFormatProperties();
default:
throw new AnalysisException("format:" + formatString + " is
not supported.");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
new file mode 100644
index 00000000000..fe4306906d4
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import java.util.Map;
+
+/**
+ * File format properties for Doris Native binary columnar format.
+ *
+ * This format is intended for high-performance internal data exchange
+ */
+
+public class NativeFileFormatProperties extends FileFormatProperties {
+
+ public NativeFileFormatProperties() {
+ super(TFileFormatType.FORMAT_NATIVE,
FileFormatProperties.FORMAT_NATIVE);
+ }
+
+ @Override
+ public void analyzeFileFormatProperties(Map<String, String>
formatProperties,
+ boolean isRemoveOriginProperty)
+ throws AnalysisException {
+ // Currently no extra user visible properties for native format.
+ // Just ignore all other properties gracefully.
+ }
+
+ @Override
+ public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions)
{
+ // No extra sink options are required for native format.
+ }
+
+ @Override
+ public TFileAttributes toTFileAttributes() {
+ // For now we don't need text params for Native format, but
TFileAttributes
+ // requires a text_params field to be non-null on BE side.
+ TFileAttributes fileAttributes = new TFileAttributes();
+ TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
+ fileAttributes.setTextParams(textParams);
+ return fileAttributes;
+ }
+}
+
+
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
index 52f5b3713a8..4ec74e001d3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java
@@ -368,7 +368,10 @@ public class NereidsLoadScanProvider {
}
} else {
Column slotColumn;
- if (fileGroup.getFileFormatProperties().getFileFormatType() ==
TFileFormatType.FORMAT_ARROW) {
+ TFileFormatType fileFormatType =
fileGroup.getFileFormatProperties().getFileFormatType();
+ // Use real column type for arrow/native format, other formats
read as varchar first
+ if (fileFormatType == TFileFormatType.FORMAT_ARROW
+ || fileFormatType == TFileFormatType.FORMAT_NATIVE) {
slotColumn = new Column(realColName,
colToType.get(realColName), true);
} else {
if (fileGroupInfo.getUniqueKeyUpdateMode() ==
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index e08eaa2c825..3dc8468e846 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.VariantType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
@@ -359,7 +360,8 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
* @return column type and the number of parsed PTypeNodes
*/
private Pair<Type, Integer> getColumnType(List<PTypeNode> typeNodes, int
start) {
- PScalarType columnType = typeNodes.get(start).getScalarType();
+ PTypeNode typeNode = typeNodes.get(start);
+ PScalarType columnType = typeNode.getScalarType();
TPrimitiveType tPrimitiveType =
TPrimitiveType.findByValue(columnType.getType());
Type type;
int parsedNodes;
@@ -391,6 +393,16 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
parsedNodes += fieldType.value();
}
type = new StructType(fields);
+ } else if (tPrimitiveType == TPrimitiveType.VARIANT) {
+ // Preserve VARIANT-specific properties from PTypeNode, especially
variant_max_subcolumns_count.
+ int maxSubcolumns = typeNode.getVariantMaxSubcolumnsCount();
+ // Currently no predefined fields are carried in PTypeNode for
VARIANT, so use empty list and default
+ // values for other properties.
+ type = new VariantType(new ArrayList<>(), maxSubcolumns,
+ /*enableTypedPathsToSparse*/ false,
+ /*variantMaxSparseColumnStatisticsSize*/ 10000,
+ /*variantSparseHashShardCount*/ 0);
+ parsedNodes = 1;
} else {
type =
ScalarType.createType(PrimitiveType.fromThrift(tPrimitiveType),
columnType.getLen(), columnType.getPrecision(),
columnType.getScale());
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 0f9ccbf8b01..4435423f5c5 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -108,7 +108,8 @@ enum TFileFormatType {
FORMAT_CSV_SNAPPYBLOCK = 14,
FORMAT_WAL = 15,
FORMAT_ARROW = 16,
- FORMAT_TEXT = 17
+ FORMAT_TEXT = 17,
+ FORMAT_NATIVE = 18
}
// In previous versions, the data compression format and file format were
stored together, as TFileFormatType,
diff --git
a/regression-test/data/export_p0/outfile/native/test_outfile_native.out
b/regression-test/data/export_p0/outfile/native/test_outfile_native.out
new file mode 100644
index 00000000000..fc60340a7e1
--- /dev/null
+++ b/regression-test/data/export_p0/outfile/native/test_outfile_native.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_default --
+1 2024-01-01 2024-01-01T00:00 s1 1 1 true
1.1
+2 2024-01-01 2024-01-01T00:00 s2 2 2 true
2.2
+3 2024-01-01 2024-01-01T00:00 s3 3 3 true
3.3
+4 2024-01-01 2024-01-01T00:00 s4 4 4 true
4.4
+5 2024-01-01 2024-01-01T00:00 s5 5 5 true
5.5
+6 2024-01-01 2024-01-01T00:00 s6 6 6 true
6.6
+7 2024-01-01 2024-01-01T00:00 s7 7 7 true
7.7
+8 2024-01-01 2024-01-01T00:00 s8 8 8 true
8.800000000000001
+9 2024-01-01 2024-01-01T00:00 s9 9 9 true
9.9
+10 2024-01-01 2024-01-01T00:00 s10 10 10 true
10.1
+
+-- !select_s3_native --
+1 2024-01-01 2024-01-01T00:00 s1 1 1 true
1.1
+2 2024-01-01 2024-01-01T00:00 s2 2 2 true
2.2
+3 2024-01-01 2024-01-01T00:00 s3 3 3 true
3.3
+4 2024-01-01 2024-01-01T00:00 s4 4 4 true
4.4
+5 2024-01-01 2024-01-01T00:00 s5 5 5 true
5.5
+6 2024-01-01 2024-01-01T00:00 s6 6 6 true
6.6
+7 2024-01-01 2024-01-01T00:00 s7 7 7 true
7.7
+8 2024-01-01 2024-01-01T00:00 s8 8 8 true
8.800000000000001
+9 2024-01-01 2024-01-01T00:00 s9 9 9 true
9.9
+10 2024-01-01 2024-01-01T00:00 s10 10 10 true
10.1
+
diff --git
a/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy
b/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy
new file mode 100644
index 00000000000..334fc809b88
--- /dev/null
+++ b/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_outfile_native", "p0") {
+ // open nereids
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_fallback_to_original_planner=false """
+
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+
+ def tableName = "outfile_native_test"
+ def outFilePath = "${bucket}/outfile/native/exp_"
+
+ // Export helper: write to S3 and return the URL output by FE
+ def outfile_to_s3 = {
+ def res = sql """
+ SELECT * FROM ${tableName} t ORDER BY id
+ INTO OUTFILE "s3://${outFilePath}"
+ FORMAT AS native
+ PROPERTIES (
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}"
+ );
+ """
+ return res[0][3]
+ }
+
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `id` INT NOT NULL,
+ `c_date` DATE NOT NULL,
+ `c_dt` DATETIME NOT NULL,
+ `c_str` VARCHAR(20),
+ `c_int` INT,
+ `c_tinyint` TINYINT,
+ `c_bool` boolean,
+ `c_double` double
+ )
+ DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ """
+
+ // Insert 10 rows of test data (the last row is all NULL)
+ StringBuilder sb = new StringBuilder()
+ int i = 1
+ for (; i < 10000; i ++) {
+ sb.append("""
+ (${i}, '2024-01-01', '2024-01-01 00:00:00', 's${i}', ${i}, ${i
% 128}, true, ${i}.${i}),
+ """)
+ }
+ sb.append("""
+ (${i}, '2024-01-01', '2024-01-01 00:00:00', NULL, NULL, NULL,
NULL, NULL)
+ """)
+ sql """ INSERT INTO ${tableName} VALUES ${sb.toString()} """
+
+ // baseline: local table query result
+ qt_select_default """ SELECT * FROM ${tableName} t ORDER BY id limit
10; """
+
+ // 1) 导出为 Native 文件到 S3
+ def outfileUrl = outfile_to_s3()
+
+ // 2) 从 S3 使用 S3 TVF (format=native) 查询回数据,并与 baseline 对比
+ // outfileUrl 形如:s3://bucket/outfile/native/exp_xxx_* ,需要去掉
"s3://bucket" 前缀和末尾的 '*'
+ qt_select_s3_native """ SELECT * FROM S3 (
+ "uri" = "http://${bucket}.${s3_endpoint}${
+ outfileUrl.substring(5 + bucket.length(),
outfileUrl.length() - 1)
+ }0.native",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "native",
+ "region" = "${region}"
+ ) order by id limit 10;
+ """
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${tableName}")
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy
b/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy
new file mode 100644
index 00000000000..911d821263e
--- /dev/null
+++ b/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.io.File
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+suite("test_export_variant_10k_columns", "p0") {
+ // open nereids
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_fallback_to_original_planner=false """
+
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ // String bucket = context.config.otherConfigs.get("s3BucketName");
+ String bucket = getS3BucketName()
+
+ def table_export_name = "test_export_variant_10k"
+ def table_load_name = "test_load_variant_10k"
+ def outfile_path_prefix = """${bucket}/export/p0/variant_10k/exp"""
+
+ def waiting_export = { export_label ->
+ while (true) {
+ def res = sql """ show export where label = "${export_label}" """
+ logger.info("export state: " + res[0][2])
+ if (res[0][2] == "FINISHED") {
+ def json = parseJson(res[0][11])
+ assert json instanceof List
+ // assertEquals("1", json.fileNumber[0][0])
+ log.info("outfile_path: ${json.url[0][0]}")
+ return json.url[0][0];
+ } else if (res[0][2] == "CANCELLED") {
+ throw new IllegalStateException("""export failed:
${res[0][10]}""")
+ } else {
+ sleep(5000)
+ }
+ }
+ }
+
+ // 1. Create table with variant column
+ sql """ DROP TABLE IF EXISTS ${table_export_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_export_name} (
+ `id` INT NOT NULL,
+ `v` VARIANT<PROPERTIES ("variant_max_subcolumns_count" = "2048")> NULL
+ )
+ DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ """
+
+ // 2. Generate data with 10000 keys in variant
+ // Generate N=100,000 rows.
+ // Total 10,000 columns, but each row only has 50 columns (sparse).
+ // This simulates a realistic sparse wide table scenario.
+
+ File dataFile = File.createTempFile("variant_10k_data", ".json")
+ dataFile.deleteOnExit()
+ int num_rows = 1000
+ try {
+ dataFile.withWriter { writer ->
+ StringBuilder sb = new StringBuilder()
+ for (int i = 1; i <= num_rows; i++) {
+ sb.setLength(0)
+ sb.append("{\"id\": ").append(i).append(", \"v\": {")
+ // Select 50 keys out of 10000 for each row
+ for (int k = 0; k < 50; k++) {
+ if (k > 0) sb.append(", ")
+ // Scatter the keys to ensure coverage of all 10000
columns across rows
+ int keyIdx = (i + k * 200) % 10000
+ sb.append('"k').append(keyIdx).append('":').append(i)
+ }
+ sb.append("}}\n")
+ writer.write(sb.toString())
+ }
+ }
+
+ // 3. Stream Load
+ streamLoad {
+ table table_export_name
+ set 'format', 'json'
+ set 'read_json_by_line', 'true'
+ file dataFile.getAbsolutePath()
+ time 60000 // 60s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(num_rows, json.NumberTotalRows)
+ assertEquals(num_rows, json.NumberLoadedRows)
+ }
+ }
+ } finally {
+ dataFile.delete()
+ }
+
+ // def format = "parquet"
+ def format = "native"
+
+ // 4. Export to S3 (Parquet)
+ def uuid = UUID.randomUUID().toString()
+ // def outFilePath = """/tmp/variant_10k_export"""
+ def outFilePath = """${outfile_path_prefix}_${uuid}"""
+ def label = "label_${uuid}"
+
+ try {
+ sql """
+ EXPORT TABLE ${table_export_name} TO "s3://${outFilePath}/"
+ PROPERTIES(
+ "label" = "${label}",
+ "format" = "${format}"
+ )
+ WITH S3(
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}",
+ "provider" = "${getS3Provider()}"
+ );
+ """
+
+ long startExport = System.currentTimeMillis()
+ def outfile_url = waiting_export.call(label)
+ long endExport = System.currentTimeMillis()
+ logger.info("Export ${num_rows} rows with variant took ${endExport -
startExport} ms")
+
+ // 5. Validate by S3 TVF
+ def s3_tvf_sql = """ s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}0.${format}",
+ "s3.access_key"= "${ak}",
+ "s3.secret_key" = "${sk}",
+ "format" = "${format}",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}"
+ ) """
+
+ // Verify row count
+ def res = sql """ SELECT count(*) FROM ${s3_tvf_sql} """
+ assertEquals(num_rows, res[0][0])
+
+ def value_type = "VARIANT<PROPERTIES (\"variant_max_subcolumns_count\"
= \"2048\")>"
+ if (new Random().nextInt(2) == 0) {
+ value_type = "text"
+ }
+ // 6. Load back into Doris (to a new table) to verify import
performance/capability
+ sql """ DROP TABLE IF EXISTS ${table_load_name} """
+ sql """
+ CREATE TABLE ${table_load_name} (
+ `id` INT,
+ `v` ${value_type}
+ )
+ DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ """
+
+ // Use LOAD LABEL. The label contains '-' from UUID, so it must be
quoted with back-quotes.
+ def load_label = "load_${uuid}"
+
+ sql """
+ LOAD LABEL `${load_label}`
+ (
+ DATA INFILE("s3://${outFilePath}/*")
+ INTO TABLE ${table_load_name}
+ FORMAT AS "${format}"
+ (id, v)
+ )
+ WITH S3 (
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}",
+ "provider" = "${getS3Provider()}"
+ );
+ """
+
+ Awaitility.await().atMost(240, SECONDS).pollInterval(5,
SECONDS).until({
+ def loadResult = sql """
+ show load where label = '${load_label}'
+ """
+ if (loadResult.get(0).get(2) == 'CANCELLED' ||
loadResult.get(0).get(2) == 'FAILED') {
+ println("load failed: " + loadResult.get(0))
+ throw new RuntimeException("load failed"+ loadResult.get(0))
+ }
+ return loadResult.get(0).get(2) == 'FINISHED'
+ })
+ // Check if data loaded
+ def load_count = sql """ SELECT count(*) FROM ${table_load_name} """
+ assertEquals(num_rows, load_count[0][0])
+
+ // Check variant data integrity (sample)
+ // Row 1 has keys: (1 + k*200) % 10000. For k=0, key is k1. Value is 1.
+ def check_v = sql """ SELECT cast(v['k1'] as int) FROM
${table_load_name} WHERE id = 1 """
+ assertEquals(1, check_v[0][0])
+
+ } finally {
+ // try_sql("DROP TABLE IF EXISTS ${table_export_name}")
+ // try_sql("DROP TABLE IF EXISTS ${table_load_name}")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]