This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch cs_opt_version-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/cs_opt_version-3.1 by this
push:
new d30d429847b support native format (#58878)
d30d429847b is described below
commit d30d429847b64009d8f0104d6cb79856670f9974
Author: lihangyu <[email protected]>
AuthorDate: Tue Dec 9 22:16:38 2025 +0800
support native format (#58878)
---
be/src/service/internal_service.cpp | 9 +
be/src/vec/exec/format/native/native_reader.cpp | 351 +++++++++++++
be/src/vec/exec/format/native/native_reader.h | 121 +++++
be/src/vec/exec/scan/vfile_scanner.cpp | 18 +-
be/src/vec/functions/function_cast.h | 13 +-
be/src/vec/runtime/vnative_transformer.cpp | 121 +++++
be/src/vec/runtime/vnative_transformer.h | 65 +++
be/src/vec/sink/writer/vfile_result_writer.cpp | 9 +
.../format/native/native_reader_writer_test.cpp | 564 +++++++++++++++++++++
.../doris/common/util/FileFormatConstants.java | 1 +
.../java/org/apache/doris/common/util/Util.java | 3 +
.../property/fileformat/FileFormatProperties.java | 3 +
.../fileformat/NativeFileFormatProperties.java | 65 +++
.../apache/doris/load/LoadExprTransformUtils.java | 2 +-
.../ExternalFileTableValuedFunction.java | 13 +-
gensrc/thrift/PlanNodes.thrift | 3 +-
.../outfile/native/test_outfile_native.out | 25 +
.../outfile/native/test_outfile_native.groovy | 100 ++++
.../test_export_variant_10k_columns.groovy | 238 +++++++++
19 files changed, 1710 insertions(+), 14 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 15ddf30ce53..f962f47394e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -134,6 +134,7 @@
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/json/new_json_reader.h"
+#include "vec/exec/format/native/native_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/text/text_reader.h"
@@ -855,6 +856,14 @@ void
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
reader = vectorized::OrcReader::create_unique(params, range, "",
&io_ctx);
break;
}
+ case TFileFormatType::FORMAT_NATIVE: {
+ // Doris Native binary format reader for schema probing in S3/FILE
TVF.
+ // Use a reasonable default batch size; it only affects internal
buffering.
+ size_t batch_size = 4096;
+ reader = vectorized::NativeReader::create_unique(profile.get(),
params, range,
+ batch_size,
&io_ctx, nullptr);
+ break;
+ }
case TFileFormatType::FORMAT_JSON: {
reader = vectorized::NewJsonReader::create_unique(profile.get(),
params, range,
file_slots,
&io_ctx);
diff --git a/be/src/vec/exec/format/native/native_reader.cpp
b/be/src/vec/exec/format/native/native_reader.cpp
new file mode 100644
index 00000000000..e01c78890eb
--- /dev/null
+++ b/be/src/vec/exec/format/native/native_reader.cpp
@@ -0,0 +1,351 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/native/native_reader.h"
+
+#include <gen_cpp/data.pb.h>
+
+#include "io/file_factory.h"
+#include "io/fs/buffered_reader.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/tracing_file_reader.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris::vectorized {
+
+#include "common/compile_check_begin.h"
+
+NativeReader::NativeReader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
+ const TFileRangeDesc& range, size_t /*batch_size*/,
+ io::IOContext* io_ctx, RuntimeState* state)
+ : _profile(profile),
+ _scan_params(params),
+ _scan_range(range),
+ _io_ctx(io_ctx),
+ _state(state) {}
+
+NativeReader::~NativeReader() {
+ (void)close();
+}
+
+Status NativeReader::init_reader() {
+ if (_file_reader != nullptr) {
+ return Status::OK();
+ }
+
+ // Create underlying file reader. For now we always use random access mode.
+ io::FileSystemProperties system_properties;
+ io::FileDescription file_description;
+ if (_scan_range.__isset.file_size) {
+ file_description.file_size = _scan_range.file_size;
+ } else {
+ file_description.file_size = -1;
+ }
+ file_description.path = _scan_range.path;
+ if (_scan_range.__isset.fs_name) {
+ file_description.fs_name = _scan_range.fs_name;
+ }
+ if (_scan_range.__isset.modification_time) {
+ file_description.mtime = _scan_range.modification_time;
+ } else {
+ file_description.mtime = 0;
+ }
+
+ if (_scan_range.__isset.file_type) {
+ // For compatibility with older FE.
+ system_properties.system_type = _scan_range.file_type;
+ } else {
+ system_properties.system_type = _scan_params.file_type;
+ }
+ system_properties.properties = _scan_params.properties;
+ system_properties.hdfs_params = _scan_params.hdfs_params;
+ if (_scan_params.__isset.broker_addresses) {
+
system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(),
+
_scan_params.broker_addresses.end());
+ }
+
+ io::FileReaderOptions reader_options =
+ FileFactory::get_reader_options(_state, file_description);
+ auto reader_res = io::DelegateReader::create_file_reader(
+ _profile, system_properties, file_description, reader_options,
+ io::DelegateReader::AccessMode::RANDOM, _io_ctx);
+ if (!reader_res.has_value()) {
+ return reader_res.error();
+ }
+ _file_reader = reader_res.value();
+
+ if (_io_ctx) {
+ _file_reader =
+ std::make_shared<io::TracingFileReader>(_file_reader,
_io_ctx->file_reader_stats);
+ }
+
+ _file_size = _file_reader->size();
+ _current_offset = 0;
+ _eof = (_file_size == 0);
+
+ // Detect optional Doris Native file header written by VNativeTransformer.
+ // New files have layout:
+ // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t
block_size]...
+ if (!_eof && _file_size >= 12) {
+ char header[12];
+ Slice header_slice(header, sizeof(header));
+ size_t bytes_read = 0;
+ RETURN_IF_ERROR(_file_reader->read_at(0, header_slice, &bytes_read));
+ if (bytes_read == sizeof(header)) {
+ static constexpr char NATIVE_MAGIC[8] = {'D', 'O', 'R', 'I', 'S',
'N', '1', '\0'};
+ if (memcmp(header, NATIVE_MAGIC, sizeof(NATIVE_MAGIC)) == 0) {
+ // We currently only have format version 1, but keep the value
for future use.
+ uint32_t version = 0;
+ memcpy(&version, header + sizeof(NATIVE_MAGIC),
sizeof(uint32_t));
+ (void)version;
+ _current_offset = sizeof(header);
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Status NativeReader::get_next_block(Block* block, size_t* read_rows, bool*
eof) {
+ if (_eof) {
+ *read_rows = 0;
+ *eof = true;
+ return Status::OK();
+ }
+
+ RETURN_IF_ERROR(init_reader());
+
+ std::string buff;
+ bool local_eof = false;
+
+ // If we have already loaded the first block for schema probing, use it
first.
+ if (_first_block_loaded && !_first_block_consumed) {
+ buff = _first_block_buf;
+ local_eof = false;
+ } else {
+ RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof));
+ }
+
+ // If we reach EOF and also read no data for this call, the whole file is
considered finished.
+ if (local_eof && buff.empty()) {
+ *read_rows = 0;
+ *eof = true;
+ _eof = true;
+ return Status::OK();
+ }
+ // If buffer is empty but we have not reached EOF yet, treat this as an
error.
+ if (buff.empty()) {
+ return Status::InternalError("read empty native block from file {}",
_scan_range.path);
+ }
+
+ PBlock pblock;
+ if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) {
+ return Status::InternalError("Failed to parse native PBlock from file
{}",
+ _scan_range.path);
+ }
+
+ // Initialize schema from first block if not done yet.
+ if (!_schema_inited) {
+ RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+ }
+
+ // deserialize PBlock into Block. Older Block API only exposes a
single-argument
+ // deserialize method without decompression statistics.
+ RETURN_IF_ERROR(block->deserialize(pblock));
+
+ // For external file scan / TVF scenarios, unify all columns as nullable
to match
+ // GenericReader/SlotDescriptor convention. This ensures schema
consistency when
+ // some writers emit non-nullable columns.
+ for (size_t i = 0; i < block->columns(); ++i) {
+ auto& col_with_type = block->get_by_position(i);
+ if (!col_with_type.type->is_nullable()) {
+ col_with_type.column = make_nullable(col_with_type.column);
+ col_with_type.type = make_nullable(col_with_type.type);
+ }
+ }
+
+ *read_rows = block->rows();
+ *eof = false;
+
+ if (_first_block_loaded && !_first_block_consumed) {
+ _first_block_consumed = true;
+ }
+
+ // If we reached the physical end of file, mark eof for subsequent calls.
+ if (_current_offset >= _file_size) {
+ _eof = true;
+ }
+
+ return Status::OK();
+}
+
+Status NativeReader::get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>*
missing_cols) {
+ missing_cols->clear();
+ RETURN_IF_ERROR(init_reader());
+
+ if (!_schema_inited) {
+ // Load first block lazily to initialize schema.
+ if (!_first_block_loaded) {
+ bool local_eof = false;
+ RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
+ // Treat file as empty only if we reach EOF and there is no block
data at all.
+ if (local_eof && _first_block_buf.empty()) {
+ return Status::EndOfFile("empty native file {}",
_scan_range.path);
+ }
+ // Non-EOF but empty buffer means corrupted native file.
+ if (_first_block_buf.empty()) {
+ return Status::InternalError("first native block is empty {}",
_scan_range.path);
+ }
+ _first_block_loaded = true;
+ }
+
+ PBlock pblock;
+ if (!pblock.ParseFromArray(_first_block_buf.data(),
+ static_cast<int>(_first_block_buf.size())))
{
+ return Status::InternalError("Failed to parse native PBlock for
schema from file {}",
+ _scan_range.path);
+ }
+ RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+ }
+
+ for (size_t i = 0; i < _schema_col_names.size(); ++i) {
+ // convert internal DataTypePtr to TypeDescriptor for external schema
+ name_to_type->emplace(_schema_col_names[i],
+
_schema_col_types[i]->get_type_as_type_descriptor());
+ }
+ return Status::OK();
+}
+
+Status NativeReader::init_schema_reader() {
+ RETURN_IF_ERROR(init_reader());
+ return Status::OK();
+}
+
+Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types)
{
+ RETURN_IF_ERROR(init_reader());
+
+ if (!_schema_inited) {
+ if (!_first_block_loaded) {
+ bool local_eof = false;
+ RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
+ // Treat file as empty only if we reach EOF and there is no block
data at all.
+ if (local_eof && _first_block_buf.empty()) {
+ return Status::EndOfFile("empty native file {}",
_scan_range.path);
+ }
+ // Non-EOF but empty buffer means corrupted native file.
+ if (_first_block_buf.empty()) {
+ return Status::InternalError("first native block is empty {}",
_scan_range.path);
+ }
+ _first_block_loaded = true;
+ }
+
+ PBlock pblock;
+ if (!pblock.ParseFromArray(_first_block_buf.data(),
+ static_cast<int>(_first_block_buf.size())))
{
+ return Status::InternalError("Failed to parse native PBlock for
schema from file {}",
+ _scan_range.path);
+ }
+ RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
+ }
+
+ *col_names = _schema_col_names;
+ col_types->clear();
+ col_types->reserve(_schema_col_types.size());
+ for (const auto& dt : _schema_col_types) {
+ col_types->emplace_back(dt->get_type_as_type_descriptor());
+ }
+ return Status::OK();
+}
+
+Status NativeReader::close() {
+ _file_reader.reset();
+ return Status::OK();
+}
+
+Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) {
+ *eof = false;
+ buff->clear();
+
+ if (_file_reader == nullptr) {
+ RETURN_IF_ERROR(init_reader());
+ }
+
+ if (_current_offset >= _file_size) {
+ *eof = true;
+ return Status::OK();
+ }
+
+ uint64_t len = 0;
+ Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
+ size_t bytes_read = 0;
+ RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice,
&bytes_read));
+ if (bytes_read == 0) {
+ *eof = true;
+ return Status::OK();
+ }
+ if (bytes_read != sizeof(len)) {
+ return Status::InternalError(
+ "Failed to read native block length from file {}, expect {}, "
+ "actual {}",
+ _scan_range.path, sizeof(len), bytes_read);
+ }
+
+ _current_offset += sizeof(len);
+ if (len == 0) {
+ // Empty block, nothing to read.
+ *eof = (_current_offset >= _file_size);
+ return Status::OK();
+ }
+
+ buff->assign(len, '\0');
+ Slice data_slice(buff->data(), len);
+ bytes_read = 0;
+ RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice,
&bytes_read));
+ if (bytes_read != len) {
+ return Status::InternalError(
+ "Failed to read native block body from file {}, expect {}, "
+ "actual {}",
+ _scan_range.path, len, bytes_read);
+ }
+
+ _current_offset += len;
+ *eof = (_current_offset >= _file_size);
+ return Status::OK();
+}
+
+Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) {
+ _schema_col_names.clear();
+ _schema_col_types.clear();
+
+ for (const auto& pcol_meta : pblock.column_metas()) {
+ DataTypePtr type =
make_nullable(DataTypeFactory::instance().create_data_type(pcol_meta));
+ LOG(INFO) << "init_schema_from_pblock, name=" << pcol_meta.name()
+ << ", type=" << type->get_name();
+ _schema_col_names.emplace_back(pcol_meta.name());
+ _schema_col_types.emplace_back(type);
+ }
+ _schema_inited = true;
+ return Status::OK();
+}
+
+#include "common/compile_check_end.h"
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/native/native_reader.h
b/be/src/vec/exec/format/native/native_reader.h
new file mode 100644
index 00000000000..6f62cff9f09
--- /dev/null
+++ b/be/src/vec/exec/format/native/native_reader.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "common/status.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "io/fs/file_reader_writer_fwd.h"
+#include "vec/exec/format/generic_reader.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+struct TypeDescriptor;
+class PBlock;
+
+namespace io {
+struct IOContext;
+} // namespace io
+} // namespace doris
+
+namespace doris::vectorized {
+class Block;
+
+#include "common/compile_check_begin.h"
+
+// Doris Native format reader.
+// it will read a sequence of Blocks encoded in Doris Native binary format.
+//
+// NOTE: current implementation is just a skeleton and will be filled step by
step.
+class NativeReader : public GenericReader {
+public:
+ ENABLE_FACTORY_CREATOR(NativeReader);
+
+ NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range, size_t batch_size,
io::IOContext* io_ctx,
+ RuntimeState* state);
+
+ ~NativeReader() override;
+
+#ifdef BE_TEST
+ // for unit test: inject an existing FileReader and skip FileFactory path
+ void set_file_reader(io::FileReaderSPtr file_reader) {
+ _file_reader = std::move(file_reader);
+ _io_ctx = nullptr;
+ _file_size = _file_reader ? _file_reader->size() : 0;
+ _current_offset = 0;
+ _eof = (_file_size == 0);
+ }
+#endif
+
+ // Initialize underlying file reader and any format specific state.
+ Status init_reader();
+
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+ Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
+
+ Status init_schema_reader() override;
+
+ Status get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types) override;
+
+ Status close() override;
+
+ bool count_read_rows() override { return true; }
+
+protected:
+ void _collect_profile_before_close() override {}
+
+private:
+ RuntimeProfile* _profile = nullptr;
+ const TFileScanRangeParams& _scan_params;
+ const TFileRangeDesc& _scan_range;
+
+ io::FileReaderSPtr _file_reader;
+ io::IOContext* _io_ctx = nullptr;
+ RuntimeState* _state = nullptr;
+
+ bool _eof = false;
+
+ // Current read offset in the underlying file.
+ int64_t _current_offset = 0;
+ int64_t _file_size = 0;
+
+ // Cached schema information from the first PBlock.
+ bool _schema_inited = false;
+ std::vector<std::string> _schema_col_names;
+ std::vector<DataTypePtr> _schema_col_types;
+
+ // Cached first block (serialized) to allow schema probing before data
scan.
+ std::string _first_block_buf;
+ bool _first_block_loaded = false;
+ bool _first_block_consumed = false;
+
+ Status _read_next_pblock(std::string* buff, bool* eof);
+ Status _init_schema_from_pblock(const PBlock& pblock);
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 54f9c807989..e46f4141f69 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -60,6 +60,7 @@
#include "vec/exec/format/avro/avro_jni_reader.h"
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/json/new_json_reader.h"
+#include "vec/exec/format/native/native_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/hive_reader.h"
@@ -597,10 +598,6 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
// skip columns which does not exist in file
continue;
}
- if (slot_desc->type().is_variant_type()) {
- // skip variant type
- continue;
- }
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
auto return_type = slot_desc->get_data_type_ptr();
// remove nullable here, let the get_function decide whether nullable
@@ -612,8 +609,10 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
"CAST", arguments, return_type,
{.enable_decimal256 = runtime_state()->enable_decimal256()});
idx = _src_block_name_to_idx[slot_desc->col_name()];
+ DCHECK(_state != nullptr);
+ auto ctx = FunctionContext::create_context(_state, {}, {});
RETURN_IF_ERROR(
- func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx,
arg.column->size()));
+ func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx,
arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
}
return Status::OK();
@@ -1272,6 +1271,15 @@ Status VFileScanner::_get_next_reader() {
init_status =
((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
break;
}
+ case TFileFormatType::FORMAT_NATIVE: {
+ auto reader = NativeReader::create_unique(_profile, *_params,
range,
+
_state->query_options().batch_size,
+ _io_ctx.get(), _state);
+ init_status = reader->init_reader();
+ _cur_reader = std::move(reader);
+ need_to_get_parsed_schema = false;
+ break;
+ }
case TFileFormatType::FORMAT_ARROW: {
_cur_reader = ArrowStreamReader::create_unique(_state, _profile,
&_counter, *_params,
range,
_file_slot_descs, _io_ctx.get());
diff --git a/be/src/vec/functions/function_cast.h
b/be/src/vec/functions/function_cast.h
index 3644abae614..a6b25f79d23 100644
--- a/be/src/vec/functions/function_cast.h
+++ b/be/src/vec/functions/function_cast.h
@@ -2124,10 +2124,10 @@ private:
}
struct ConvertImplGenericFromVariant {
- static Status execute(const FunctionCast* fn, FunctionContext*
context, Block& block,
+ static Status execute(const FunctionCast* fn, const DataTypePtr&
data_type_to,
+ FunctionContext* context, Block& block,
const ColumnNumbers& arguments, const size_t
result,
size_t input_rows_count) {
- auto& data_type_to = block.get_by_position(result).type;
const auto& col_with_type_and_name =
block.get_by_position(arguments[0]);
auto& col_from = col_with_type_and_name.column;
auto& variant = assert_cast<const ColumnObject&>(*col_from);
@@ -2230,10 +2230,11 @@ private:
// create cresponding type convert from variant
WrapperType create_variant_wrapper(const DataTypeObject& from_type,
const DataTypePtr& to_type) const {
- return [this](FunctionContext* context, Block& block, const
ColumnNumbers& arguments,
- const size_t result, size_t input_rows_count) -> Status {
- return ConvertImplGenericFromVariant::execute(this, context,
block, arguments, result,
- input_rows_count);
+ return [this, to_type](FunctionContext* context, Block& block,
+ const ColumnNumbers& arguments, const size_t
result,
+ size_t input_rows_count) -> Status {
+ return ConvertImplGenericFromVariant::execute(this, to_type,
context, block, arguments,
+ result,
input_rows_count);
};
}
diff --git a/be/src/vec/runtime/vnative_transformer.cpp
b/be/src/vec/runtime/vnative_transformer.cpp
new file mode 100644
index 00000000000..378f7c83196
--- /dev/null
+++ b/be/src/vec/runtime/vnative_transformer.cpp
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vnative_transformer.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/data.pb.h>
+#include <glog/logging.h>
+
+#include "agent/be_exec_version_manager.h"
+#include "io/fs/file_writer.h"
+#include "runtime/runtime_state.h"
+#include "util/slice.h"
+#include "vec/core/block.h"
+
+namespace doris::vectorized {
+
+#include "common/compile_check_begin.h"
+
+namespace {
+
+// Map high-level TFileCompressType to low-level segment_v2::CompressionTypePB.
+segment_v2::CompressionTypePB
to_local_compression_type(TFileCompressType::type type) {
+ using CT = segment_v2::CompressionTypePB;
+ switch (type) {
+ case TFileCompressType::GZ:
+ case TFileCompressType::ZLIB:
+ case TFileCompressType::DEFLATE:
+ return CT::ZLIB;
+ case TFileCompressType::LZ4FRAME:
+ case TFileCompressType::LZ4BLOCK:
+ return CT::LZ4;
+ case TFileCompressType::SNAPPYBLOCK:
+ return CT::SNAPPY;
+ case TFileCompressType::ZSTD:
+ return CT::ZSTD;
+ default:
+ return CT::ZSTD;
+ }
+}
+
+} // namespace
+
+VNativeTransformer::VNativeTransformer(RuntimeState* state,
doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs&
output_vexpr_ctxs,
+ bool output_object_data,
+ TFileCompressType::type compress_type)
+ : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+ _file_writer(file_writer),
+ _compression_type(to_local_compression_type(compress_type)) {}
+
+Status VNativeTransformer::open() {
+ // No extra initialization is required for now. The underlying FileWriter
+ // is managed by VFileResultWriter.
+ return Status::OK();
+}
+
+Status VNativeTransformer::write(const Block& block) {
+ if (block.rows() == 0) {
+ return Status::OK();
+ }
+
+ // Serialize Block into PBlock using existing vec serialization logic.
+ PBlock pblock;
+ size_t uncompressed_bytes = 0;
+ size_t compressed_bytes = 0;
+
+
RETURN_IF_ERROR(block.serialize(BeExecVersionManager::get_newest_version(),
&pblock,
+ &uncompressed_bytes, &compressed_bytes,
+ _compression_type, true));
+
+ std::string buff;
+ if (!pblock.SerializeToString(&buff)) {
+ auto err = Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
+ "serialize native block error. block rows: {}", block.rows());
+ return err;
+ }
+
+ // Layout of Doris Native file:
+ // [uint64_t block_size][PBlock bytes]...
+ uint64_t len = buff.size();
+ Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
+ RETURN_IF_ERROR(_file_writer->append(len_slice));
+ RETURN_IF_ERROR(_file_writer->append(buff));
+
+ _written_len += sizeof(len) + buff.size();
+ _cur_written_rows += block.rows();
+
+ return Status::OK();
+}
+
+Status VNativeTransformer::close() {
+ // Close underlying FileWriter to ensure data is flushed to disk.
+ if (_file_writer != nullptr && _file_writer->state() !=
doris::io::FileWriter::State::CLOSED) {
+ RETURN_IF_ERROR(_file_writer->close());
+ }
+
+ return Status::OK();
+}
+
+int64_t VNativeTransformer::written_len() {
+ return _written_len;
+}
+
+#include "common/compile_check_end.h"
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vnative_transformer.h
b/be/src/vec/runtime/vnative_transformer.h
new file mode 100644
index 00000000000..fdf7ff46ac9
--- /dev/null
+++ b/be/src/vec/runtime/vnative_transformer.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include <cstdint>
+
+#include "common/status.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vfile_format_transformer.h"
+
+namespace doris::io {
+class FileWriter;
+} // namespace doris::io
+
+namespace doris::vectorized {
+class Block;
+
+#include "common/compile_check_begin.h"
+
+// Doris Native format writer.
+// It serializes vectorized Blocks into Doris Native binary format.
+class VNativeTransformer final : public VFileFormatTransformer {
+public:
+ // |compress_type| controls how the PBlock is compressed on disk (ZSTD,
LZ4, etc).
+ // Defaults to ZSTD to preserve the previous behavior.
+ VNativeTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs& output_vexpr_ctxs, bool
output_object_data,
+ TFileCompressType::type compress_type =
TFileCompressType::ZSTD);
+
+ ~VNativeTransformer() override = default;
+
+ Status open() override;
+
+ Status write(const Block& block) override;
+
+ Status close() override;
+
+ int64_t written_len() override;
+
+private:
+ doris::io::FileWriter* _file_writer; // not owned
+ int64_t _written_len = 0;
+ // Compression type used for Block::serialize (PBlock compression).
+ segment_v2::CompressionTypePB _compression_type
{segment_v2::CompressionTypePB::ZSTD};
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index 9275245acf6..74bbe355b46 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -57,6 +57,7 @@
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/runtime/vcsv_transformer.h"
+#include "vec/runtime/vnative_transformer.h"
#include "vec/runtime/vorc_transformer.h"
#include "vec/runtime/vparquet_transformer.h"
@@ -143,6 +144,12 @@ Status VFileResultWriter::_create_file_writer(const
std::string& file_name) {
_state, _file_writer_impl.get(), _vec_output_expr_ctxs,
_file_opts->orc_schema, {},
_output_object_data, _file_opts->orc_compression_type));
break;
+ case TFileFormatType::FORMAT_NATIVE:
+ // Doris Native binary format writer with configurable compression.
+ _vfile_writer.reset(new VNativeTransformer(_state,
_file_writer_impl.get(),
+ _vec_output_expr_ctxs,
_output_object_data,
+
_file_opts->compression_type));
+ break;
default:
return Status::InternalError("unsupported file format: {}",
_file_opts->file_format);
}
@@ -200,6 +207,8 @@ std::string VFileResultWriter::_file_format_to_name() {
return "parquet";
case TFileFormatType::FORMAT_ORC:
return "orc";
+ case TFileFormatType::FORMAT_NATIVE:
+ return "native";
default:
return "unknown";
}
diff --git a/be/test/vec/exec/format/native/native_reader_writer_test.cpp
b/be/test/vec/exec/format/native/native_reader_writer_test.cpp
new file mode 100644
index 00000000000..2a61c12eb0d
--- /dev/null
+++ b/be/test/vec/exec/format/native/native_reader_writer_test.cpp
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <string>
+
+#include "common/config.h"
+#include "io/fs/local_file_system.h"
+#include "io/fs/path.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/runtime_state.h"
+#include "util/uid_util.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_map.h"
+#include "vec/columns/column_struct.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/data_types/data_type_variant.h"
+#include "vec/data_types/serde/data_type_serde.h"
+#include "vec/exec/format/native/native_reader.h"
+#include "vec/runtime/vnative_transformer.h"
+
+namespace doris::vectorized {
+
+class NativeReaderWriterTest : public ::testing::Test {};
+
+static void fill_primitive_columns(Block& block, size_t rows) {
+ DataTypePtr int_type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false));
+ DataTypePtr str_type =
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR,
false));
+
+ {
+ MutableColumnPtr col = int_type->create_column();
+ for (size_t i = 0; i < rows; ++i) {
+ if (i % 3 == 0) {
+ // null
+ col->insert_default();
+ } else {
+ // insert int value via Field to match column interface
+ auto field =
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i * 10));
+ col->insert(field);
+ }
+ }
+ block.insert(ColumnWithTypeAndName(std::move(col), int_type,
"int_col"));
+ }
+
+ {
+ MutableColumnPtr col = str_type->create_column();
+ for (size_t i = 0; i < rows; ++i) {
+ if (i % 4 == 0) {
+ col->insert_default();
+ } else {
+ std::string v = "s" + std::to_string(i);
+ // insert varchar value via Field
+ auto field =
Field::create_field<PrimitiveType::TYPE_VARCHAR>(v);
+ col->insert(field);
+ }
+ }
+ block.insert(ColumnWithTypeAndName(std::move(col), str_type,
"str_col"));
+ }
+}
+
+static void fill_array_column(Block& block, size_t rows) {
+ // array<int>
+ DataTypePtr arr_nested_type =
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+ DataTypePtr arr_type =
make_nullable(std::make_shared<DataTypeArray>(arr_nested_type));
+
+ {
+ MutableColumnPtr col = arr_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column();
+ auto& null_map = nullable_col->get_null_map_data();
+ auto& array_col = assert_cast<ColumnArray&>(nested);
+ auto& offsets = array_col.get_offsets();
+ auto& data = array_col.get_data();
+ auto mutable_data = data.assume_mutable();
+
+ for (size_t i = 0; i < rows; ++i) {
+ if (i % 5 == 0) {
+ // null array
+ nullable_col->insert_default();
+ } else {
+ // non-null array with 3 elements: [i, i+1, i+2]
+ null_map.push_back(0);
+ mutable_data->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
+ mutable_data->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
+ mutable_data->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 2)));
+ offsets.push_back(offsets.empty() ? 3 : offsets.back() + 3);
+ }
+ }
+ block.insert(ColumnWithTypeAndName(std::move(col), arr_type,
"arr_col"));
+ }
+}
+
+static void fill_map_column(Block& block, size_t rows) {
+ // map<string, int>
+ DataTypePtr map_key_type =
DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, false);
+ DataTypePtr map_value_type =
DataTypeFactory::instance().create_data_type(TYPE_INT, false);
+ DataTypePtr map_type =
+ make_nullable(std::make_shared<DataTypeMap>(map_key_type,
map_value_type));
+
+ // map<string, int> column
+ {
+ MutableColumnPtr col = map_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column();
+ auto& null_map = nullable_col->get_null_map_data();
+
+ for (size_t i = 0; i < rows; ++i) {
+ if (i % 7 == 0) {
+ // null map
+ nullable_col->insert_default();
+ } else {
+ null_map.push_back(0);
+ auto& offsets = assert_cast<ColumnMap&>(nested).get_offsets();
+ auto& keys = assert_cast<ColumnMap&>(nested).get_keys();
+ auto& values = assert_cast<ColumnMap&>(nested).get_values();
+
+ auto mutable_keys = keys.assume_mutable();
+ auto mutable_values = values.assume_mutable();
+
+ std::string k1 = "k" + std::to_string(i);
+ std::string k2 = "k" + std::to_string(i + 1);
+
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k1));
+ mutable_values->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i)));
+
mutable_keys->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(k2));
+ mutable_values->insert(
+
Field::create_field<PrimitiveType::TYPE_INT>(static_cast<int32_t>(i + 1)));
+
+ offsets.push_back(offsets.empty() ? 2 : offsets.back() + 2);
+ }
+ }
+ block.insert(ColumnWithTypeAndName(std::move(col), map_type,
"map_col"));
+ }
+}
+
+static void fill_struct_column(Block& block, size_t rows) {
+ // struct<si:int, ss:string>
+ DataTypes struct_fields;
+ struct_fields.emplace_back(
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_INT, false)));
+ struct_fields.emplace_back(
+
make_nullable(DataTypeFactory::instance().create_data_type(TYPE_VARCHAR,
false)));
+ DataTypePtr struct_type =
make_nullable(std::make_shared<DataTypeStruct>(struct_fields));
+
+ // struct<si:int, ss:string> column
+ {
+ MutableColumnPtr col = struct_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column();
+ auto& null_map = nullable_col->get_null_map_data();
+
+ auto& struct_col = assert_cast<ColumnStruct&>(nested);
+ const auto& fields = struct_col.get_columns();
+
+ for (size_t i = 0; i < rows; ++i) {
+ if (i % 6 == 0) {
+ nullable_col->insert_default();
+ } else {
+ null_map.push_back(0);
+ auto mutable_field0 = fields[0]->assume_mutable();
+ auto mutable_field1 = fields[1]->assume_mutable();
+ // int field
+
mutable_field0->insert(Field::create_field<PrimitiveType::TYPE_INT>(
+ static_cast<int32_t>(i * 100)));
+ // string field
+ std::string vs = "ss" + std::to_string(i);
+
mutable_field1->insert(Field::create_field<PrimitiveType::TYPE_VARCHAR>(vs));
+ }
+ }
+ block.insert(ColumnWithTypeAndName(std::move(col), struct_type,
"struct_col"));
+ }
+}
+
+static void fill_variant_column(Block& block, size_t rows) {
+ // variant
+ DataTypePtr variant_type =
make_nullable(std::make_shared<DataTypeVariant>());
+
+ // variant column: use JSON strings + deserialize_column_from_json_vector
to populate ColumnVariant
+ {
+ MutableColumnPtr col = variant_type->create_column();
+ auto* nullable_col = assert_cast<ColumnNullable*>(col.get());
+ auto& nested = nullable_col->get_nested_column(); // ColumnVariant
+
+ // Prepare JSON strings with variable number of keys per row
+ std::vector<std::string> json_rows;
+ json_rows.reserve(rows);
+ std::vector<Slice> slices;
+ slices.reserve(rows);
+
+ size_t key_cnt = 100;
+ for (size_t i = 0; i < rows; ++i) {
+ // key count cycles between 1, 2, 3, ... for better coverage
+ std::string json = "{";
+ for (size_t k = 0; k < key_cnt; ++k) {
+ if (k > 0) {
+ json += ",";
+ }
+ // keys: "k0", "k1", ...
+ json += "\"k" + std::to_string(k) + "\":";
+ // mix int and string values
+ if ((i + k) % 2 == 0) {
+ json += std::to_string(static_cast<int32_t>(i * 10 + k));
+ } else {
+ json += "\"v" + std::to_string(i * 10 + k) + "\"";
+ }
+ }
+ json += "}";
+ json_rows.emplace_back(std::move(json));
+ slices.emplace_back(json_rows.back().data(),
json_rows.back().size());
+ }
+
+ // Use Variant SerDe to parse JSON into ColumnVariant
+ auto variant_type_inner = std::make_shared<DataTypeVariant>();
+ auto serde = variant_type_inner->get_serde();
+ auto* variant_serde = assert_cast<DataTypeVariantSerDe*>(serde.get());
+
+ uint64_t num_deserialized = 0;
+ DataTypeSerDe::FormatOptions options;
+ Status st = variant_serde->deserialize_column_from_json_vector(nested,
slices,
+
&num_deserialized, options);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(rows, num_deserialized);
+
+ // All rows are treated as non-null here
+ auto& null_map = nullable_col->get_null_map_data();
+ null_map.clear();
+ null_map.resize_fill(rows, 0);
+
+ block.insert(ColumnWithTypeAndName(std::move(col), variant_type,
"variant_col"));
+ }
+}
+
+static void fill_complex_columns(Block& block, size_t rows) {
+ fill_array_column(block, rows);
+ fill_map_column(block, rows);
+ fill_struct_column(block, rows);
+ fill_variant_column(block, rows);
+}
+
+static Block create_test_block(size_t rows) {
+ Block block;
+ // simple schema with primitive + complex types:
+ // int_col, str_col, arr_col, map_col, struct_col, variant_col
+ fill_primitive_columns(block, rows);
+ fill_complex_columns(block, rows);
+
+ return block;
+}
+
+TEST_F(NativeReaderWriterTest, round_trip_native_file) {
+ // Prepare a temporary local file path
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_" + uuid + ".native";
+
+ // 1. Write block to Native file via VNativeTransformer
+ auto fs = io::global_local_filesystem();
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs; // empty, VNativeTransformer won't use it directly
+ VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ Block src_block = create_test_block(16);
+ st = transformer.write(src_block);
+ ASSERT_TRUE(st.ok()) << st;
+ // VNativeTransformer::close() will also close the underlying FileWriter,
+ // so we don't need (and must not) close file_writer again here.
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // 2. Read back via NativeReader using normal file scan path
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, 4064, nullptr,
&state);
+
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(src_block.rows(), read_rows);
+ ASSERT_EQ(src_block.columns(), dst_block.columns());
+
+ // Compare column-wise values
+ for (size_t col = 0; col < src_block.columns(); ++col) {
+ const auto& src = src_block.get_by_position(col);
+ const auto& dst = dst_block.get_by_position(col);
+ ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name());
+ ASSERT_EQ(src.column->size(), dst.column->size());
+ for (size_t row = 0; row < src_block.rows(); ++row) {
+ auto src_field = (*src.column)[row];
+ auto dst_field = (*dst.column)[row];
+ ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << "
row=" << row;
+ }
+ }
+
+ // Next call should hit EOF
+ Block dst_block2;
+ read_rows = 0;
+ eof = false;
+ st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_TRUE(eof);
+ ASSERT_EQ(read_rows, 0);
+
+ // Clean up temp file
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+}
+
+// Edge cases: empty file and single-row file
+TEST_F(NativeReaderWriterTest, round_trip_empty_and_single_row) {
+ auto fs = io::global_local_filesystem();
+
+ // 1. Empty native file (no data blocks)
+ {
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_empty_" + uuid + ".native";
+
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs;
+ VNativeTransformer transformer(&state, file_writer.get(), exprs,
false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Write an empty block, should not produce any data block in file.
+ Block empty_block;
+ st = transformer.write(empty_block);
+ ASSERT_TRUE(st.ok()) << st;
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Read back: should directly hit EOF with 0 rows.
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, 4096,
nullptr, &state);
+
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(0U, read_rows);
+ ASSERT_TRUE(eof);
+
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+ }
+
+ // 2. Single-row native file
+ {
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_single_row_" + uuid +
".native";
+
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs;
+ VNativeTransformer transformer(&state, file_writer.get(), exprs,
false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ Block src_block = create_test_block(1);
+ st = transformer.write(src_block);
+ ASSERT_TRUE(st.ok()) << st;
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, 4096,
nullptr, &state);
+
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(src_block.rows(), read_rows);
+ ASSERT_EQ(src_block.columns(), dst_block.columns());
+ ASSERT_FALSE(eof);
+
+ // Verify data equality for the single row.
+ for (size_t col = 0; col < src_block.columns(); ++col) {
+ const auto& src = src_block.get_by_position(col);
+ const auto& dst = dst_block.get_by_position(col);
+ ASSERT_EQ(src.type->get_family_name(),
dst.type->get_family_name());
+ ASSERT_EQ(src.column->size(), dst.column->size());
+ auto src_field = (*src.column)[0];
+ auto dst_field = (*dst.column)[0];
+ ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << "
row=0";
+ }
+
+ // Next call should hit EOF
+ Block dst_block2;
+ read_rows = 0;
+ eof = false;
+ st = reader_impl.get_next_block(&dst_block2, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_TRUE(eof);
+ ASSERT_EQ(read_rows, 0U);
+
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+ }
+}
+
+// Large volume test: verify round-trip correctness with many rows.
+TEST_F(NativeReaderWriterTest, round_trip_native_file_large_rows) {
+ UniqueId uid = UniqueId::gen_uid();
+ std::string uuid = uid.to_string();
+ std::string file_path = "./native_format_large_" + uuid + ".native";
+
+ auto fs = io::global_local_filesystem();
+ io::FileWriterPtr file_writer;
+ Status st = fs->create_file(file_path, &file_writer);
+ ASSERT_TRUE(st.ok()) << st;
+
+ RuntimeState state;
+ VExprContextSPtrs exprs;
+ VNativeTransformer transformer(&state, file_writer.get(), exprs, false);
+ st = transformer.open();
+ ASSERT_TRUE(st.ok()) << st;
+
+ // Use a relatively large number of rows to test stability and performance
characteristics.
+ const size_t kRows = 9999;
+ Block src_block = create_test_block(kRows);
+
+ // Split into multiple blocks and write them one by one to exercise
multi-block path.
+ const size_t kBatchRows = 128;
+ for (size_t offset = 0; offset < kRows; offset += kBatchRows) {
+ size_t len = std::min(kBatchRows, kRows - offset);
+ // Create a sub block with the same schema and a row range [offset,
offset+len)
+ Block sub_block = src_block.clone_empty();
+ for (size_t col = 0; col < src_block.columns(); ++col) {
+ const auto& src_col = *src_block.get_by_position(col).column;
+ const auto& dst_col_holder =
*sub_block.get_by_position(col).column;
+ auto dst_mutable = dst_col_holder.assume_mutable();
+ dst_mutable->insert_range_from(src_col, offset, len);
+ }
+ st = transformer.write(sub_block);
+ ASSERT_TRUE(st.ok()) << st;
+ }
+ st = transformer.close();
+ ASSERT_TRUE(st.ok()) << st;
+
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ TFileRangeDesc scan_range;
+ scan_range.__set_path(file_path);
+ scan_range.__set_file_type(TFileType::FILE_LOCAL);
+ NativeReader reader_impl(nullptr, scan_params, scan_range, 8192, nullptr,
&state);
+
+ // Read back in multiple blocks and merge into a single result block.
+ Block merged_block;
+ bool first_block = true;
+ size_t total_read_rows = 0;
+ while (true) {
+ Block dst_block;
+ size_t read_rows = 0;
+ bool eof = false;
+ st = reader_impl.get_next_block(&dst_block, &read_rows, &eof);
+ ASSERT_TRUE(st.ok()) << st;
+ if (read_rows > 0) {
+ if (first_block) {
+ merged_block = dst_block;
+ total_read_rows = read_rows;
+ first_block = false;
+ } else {
+ MutableBlock merged_mutable(&merged_block);
+ Status add_st = merged_mutable.add_rows(&dst_block, 0,
read_rows);
+ ASSERT_TRUE(add_st.ok()) << add_st;
+ total_read_rows += read_rows;
+ }
+ }
+ if (eof) {
+ break;
+ }
+ if (read_rows == 0) {
+ break;
+ }
+ }
+
+ ASSERT_EQ(src_block.rows(), total_read_rows);
+ ASSERT_EQ(src_block.columns(), merged_block.columns());
+
+ // Compare column-wise values
+ for (size_t col = 0; col < src_block.columns(); ++col) {
+ const auto& src = src_block.get_by_position(col);
+ const auto& dst = merged_block.get_by_position(col);
+ ASSERT_EQ(src.type->get_family_name(), dst.type->get_family_name());
+ ASSERT_EQ(src.column->size(), dst.column->size());
+ for (size_t row = 0; row < src_block.rows(); row += 10) {
+ auto src_field = (*src.column)[row];
+ auto dst_field = (*dst.column)[row];
+ ASSERT_EQ(src_field, dst_field) << "mismatch at col=" << col << "
row=" << row;
+ }
+ }
+ bool exists = false;
+ st = fs->exists(file_path, &exists);
+ if (st.ok() && exists) {
+ static_cast<void>(fs->delete_file(file_path));
+ }
+}
+
+} // namespace doris::vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
index 2cd5852dea4..9b17d06708d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
@@ -30,6 +30,7 @@ public class FileFormatConstants {
public static final String FORMAT_AVRO = "avro";
public static final String FORMAT_WAL = "wal";
public static final String FORMAT_ARROW = "arrow";
+ public static final String FORMAT_NATIVE = "native";
public static final String PROP_FORMAT = "format";
public static final String PROP_COLUMN_SEPARATOR = "column_separator";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index 3b96e7b3b54..7ec4456eddf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -575,6 +575,9 @@ public class Util {
return TFileFormatType.FORMAT_WAL;
} else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ARROW)) {
return TFileFormatType.FORMAT_ARROW;
+ } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_NATIVE)) {
+ // Doris Native binary columnar format
+ return TFileFormatType.FORMAT_NATIVE;
} else {
return TFileFormatType.FORMAT_UNKNOWN;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
index 865da510d23..f3f1bb1a6d8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
@@ -39,6 +39,7 @@ public abstract class FileFormatProperties {
public static final String FORMAT_AVRO = "avro";
public static final String FORMAT_WAL = "wal";
public static final String FORMAT_ARROW = "arrow";
+ public static final String FORMAT_NATIVE = "native";
public static final String PROP_COMPRESS_TYPE = "compress_type";
protected String formatName;
@@ -98,6 +99,8 @@ public abstract class FileFormatProperties {
return new WalFileFormatProperties();
case FORMAT_ARROW:
return new ArrowFileFormatProperties();
+ case FORMAT_NATIVE:
+ return new NativeFileFormatProperties();
default:
throw new AnalysisException("format:" + formatString + " is
not supported.");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
new file mode 100644
index 00000000000..fe4306906d4
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/NativeFileFormatProperties.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import java.util.Map;
+
+/**
+ * File format properties for Doris Native binary columnar format.
+ *
+ * This format is intended for high-performance internal data exchange
+ */
+
+public class NativeFileFormatProperties extends FileFormatProperties {
+
+ public NativeFileFormatProperties() {
+ super(TFileFormatType.FORMAT_NATIVE,
FileFormatProperties.FORMAT_NATIVE);
+ }
+
+ @Override
+ public void analyzeFileFormatProperties(Map<String, String>
formatProperties,
+ boolean isRemoveOriginProperty)
+ throws AnalysisException {
+ // Currently no extra user visible properties for native format.
+ // Just ignore all other properties gracefully.
+ }
+
+ @Override
+ public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions)
{
+ // No extra sink options are required for native format.
+ }
+
+ @Override
+ public TFileAttributes toTFileAttributes() {
+ // For now we don't need text params for Native format, but
TFileAttributes
+ // requires a text_params field to be non-null on BE side.
+ TFileAttributes fileAttributes = new TFileAttributes();
+ TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
+ fileAttributes.setTextParams(textParams);
+ return fileAttributes;
+ }
+}
+
+
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/LoadExprTransformUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/load/LoadExprTransformUtils.java
index e8498b0aff7..7e24576edf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadExprTransformUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadExprTransformUtils.java
@@ -327,7 +327,7 @@ public class LoadExprTransformUtils {
} else {
SlotDescriptor slotDesc =
analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
- if (formatType == TFileFormatType.FORMAT_ARROW) {
+ if (formatType == TFileFormatType.FORMAT_ARROW || formatType
== TFileFormatType.FORMAT_NATIVE) {
slotDesc.setColumn(new Column(realColName,
colToType.get(realColName)));
} else {
if (uniquekeyUpdateMode ==
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && hasSkipBitmapColumn) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 9f9f769c780..7d758d1acaa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.VariantType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.Pair;
@@ -324,7 +325,8 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
* @return column type and the number of parsed PTypeNodes
*/
private Pair<Type, Integer> getColumnType(List<PTypeNode> typeNodes, int
start) {
- PScalarType columnType = typeNodes.get(start).getScalarType();
+ PTypeNode typeNode = typeNodes.get(start);
+ PScalarType columnType = typeNode.getScalarType();
TPrimitiveType tPrimitiveType =
TPrimitiveType.findByValue(columnType.getType());
Type type;
int parsedNodes;
@@ -356,6 +358,15 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
parsedNodes += fieldType.value();
}
type = new StructType(fields);
+ } else if (tPrimitiveType == TPrimitiveType.VARIANT) {
+ // Preserve VARIANT-specific properties from PTypeNode, especially
variant_max_subcolumns_count.
+ int maxSubcolumns = typeNode.getVariantMaxSubcolumnsCount();
+ // Currently no predefined fields are carried in PTypeNode for
VARIANT, so use empty list and default
+ // values for other properties.
+ type = new VariantType(new ArrayList<>(), maxSubcolumns,
+ /*enableTypedPathsToSparse*/ false,
+ /*variantMaxSparseColumnStatisticsSize*/ 10000);
+ parsedNodes = 1;
} else {
type =
ScalarType.createType(PrimitiveType.fromThrift(tPrimitiveType),
columnType.getLen(), columnType.getPrecision(),
columnType.getScale());
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 908c7d10530..7b70e5da4c8 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -122,7 +122,8 @@ enum TFileFormatType {
FORMAT_CSV_SNAPPYBLOCK,
FORMAT_WAL,
FORMAT_ARROW,
- FORMAT_TEXT
+ FORMAT_TEXT,
+ FORMAT_NATIVE
}
// In previous versions, the data compression format and file format were
stored together, as TFileFormatType,
diff --git
a/regression-test/data/export_p0/outfile/native/test_outfile_native.out
b/regression-test/data/export_p0/outfile/native/test_outfile_native.out
new file mode 100644
index 00000000000..fc60340a7e1
--- /dev/null
+++ b/regression-test/data/export_p0/outfile/native/test_outfile_native.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_default --
+1 2024-01-01 2024-01-01T00:00 s1 1 1 true
1.1
+2 2024-01-01 2024-01-01T00:00 s2 2 2 true
2.2
+3 2024-01-01 2024-01-01T00:00 s3 3 3 true
3.3
+4 2024-01-01 2024-01-01T00:00 s4 4 4 true
4.4
+5 2024-01-01 2024-01-01T00:00 s5 5 5 true
5.5
+6 2024-01-01 2024-01-01T00:00 s6 6 6 true
6.6
+7 2024-01-01 2024-01-01T00:00 s7 7 7 true
7.7
+8 2024-01-01 2024-01-01T00:00 s8 8 8 true
8.800000000000001
+9 2024-01-01 2024-01-01T00:00 s9 9 9 true
9.9
+10 2024-01-01 2024-01-01T00:00 s10 10 10 true
10.1
+
+-- !select_s3_native --
+1 2024-01-01 2024-01-01T00:00 s1 1 1 true
1.1
+2 2024-01-01 2024-01-01T00:00 s2 2 2 true
2.2
+3 2024-01-01 2024-01-01T00:00 s3 3 3 true
3.3
+4 2024-01-01 2024-01-01T00:00 s4 4 4 true
4.4
+5 2024-01-01 2024-01-01T00:00 s5 5 5 true
5.5
+6 2024-01-01 2024-01-01T00:00 s6 6 6 true
6.6
+7 2024-01-01 2024-01-01T00:00 s7 7 7 true
7.7
+8 2024-01-01 2024-01-01T00:00 s8 8 8 true
8.800000000000001
+9 2024-01-01 2024-01-01T00:00 s9 9 9 true
9.9
+10 2024-01-01 2024-01-01T00:00 s10 10 10 true
10.1
+
diff --git
a/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy
b/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy
new file mode 100644
index 00000000000..26c9e9ab704
--- /dev/null
+++ b/regression-test/suites/export_p0/outfile/native/test_outfile_native.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_outfile_native", "p0") {
+ // open nereids
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_fallback_to_original_planner=false """
+
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+
+ def tableName = "outfile_native_test"
+ def outFilePath = "${bucket}/outfile/native/exp_"
+
+ // 导出 helper:写到 S3,返回 FE 输出的 URL
+ def outfile_to_s3 = {
+ def res = sql """
+ SELECT * FROM ${tableName} t ORDER BY id
+ INTO OUTFILE "s3://${outFilePath}"
+ FORMAT AS native
+ PROPERTIES (
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}"
+ );
+ """
+ return res[0][3]
+ }
+
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `id` INT NOT NULL,
+ `c_date` DATE NOT NULL,
+ `c_dt` DATETIME NOT NULL,
+ `c_str` VARCHAR(20),
+ `c_int` INT,
+ `c_tinyint` TINYINT,
+ `c_bool` boolean,
+ `c_double` double
+ )
+ DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ """
+
+ // 插入 10 行测试数据(最后一行全 NULL)
+ StringBuilder sb = new StringBuilder()
+ int i = 1
+ for (; i < 10000; i ++) {
+ sb.append("""
+ (${i}, '2024-01-01', '2024-01-01 00:00:00', 's${i}', ${i}, ${i
% 128}, true, ${i}.${i}),
+ """)
+ }
+ sb.append("""
+ (${i}, '2024-01-01', '2024-01-01 00:00:00', NULL, NULL, NULL,
NULL, NULL)
+ """)
+ sql """ INSERT INTO ${tableName} VALUES ${sb.toString()} """
+
+ // baseline:本地表查询结果
+ qt_select_default """ SELECT * FROM ${tableName} t ORDER BY id limit
10; """
+
+ // 1) 导出为 Native 文件到 S3
+ def outfileUrl = outfile_to_s3()
+
+ // 2) 从 S3 使用 S3 TVF (format=native) 查询回数据,并与 baseline 对比
+ // outfileUrl 形如:s3://bucket/outfile/native/exp_xxx_* ,需要去掉
"s3://bucket" 前缀和末尾的 '*'
+ qt_select_s3_native """ SELECT * FROM S3 (
+ "uri" = "http://${bucket}.${s3_endpoint}${
+ outfileUrl.substring(5 + bucket.length(),
outfileUrl.length() - 1)
+ }0.native",
+ "ACCESS_KEY"= "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "format" = "native",
+ "region" = "${region}"
+ ) order by id limit 10;
+ """
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${tableName}")
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy
b/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy
new file mode 100644
index 00000000000..cf448a064e2
--- /dev/null
+++ b/regression-test/suites/export_p0/test_export_variant_10k_columns.groovy
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.io.File
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+suite("test_export_variant_10k_columns", "p0") {
+ // open nereids
+ sql """ set enable_nereids_planner=true """
+ sql """ set enable_fallback_to_original_planner=false """
+
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ // String bucket = context.config.otherConfigs.get("s3BucketName");
+ String bucket = getS3BucketName()
+
+ def table_export_name = "test_export_variant_10k"
+ def table_load_name = "test_load_variant_10k"
+ def outfile_path_prefix = """${bucket}/export/p0/variant_10k/exp"""
+
+ def waiting_export = { export_label ->
+ while (true) {
+ def res = sql """ show export where label = "${export_label}" """
+ logger.info("export state: " + res[0][2])
+ if (res[0][2] == "FINISHED") {
+ def json = parseJson(res[0][11])
+ assert json instanceof List
+ // assertEquals("1", json.fileNumber[0][0])
+ log.info("outfile_path: ${json.url[0][0]}")
+ return json.url[0][0];
+ } else if (res[0][2] == "CANCELLED") {
+ throw new IllegalStateException("""export failed:
${res[0][10]}""")
+ } else {
+ sleep(5000)
+ }
+ }
+ }
+
+ // 1. Create table with variant column
+ sql """ DROP TABLE IF EXISTS ${table_export_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_export_name} (
+ `id` INT NOT NULL,
+ `v` VARIANT<PROPERTIES ("variant_max_subcolumns_count" = "2048")> NULL
+ )
+ DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ """
+
+ // 2. Generate data with 10000 keys in variant
+ // Generate N=100,000 rows.
+ // Total 10,000 columns, but each row only has 50 columns (sparse).
+ // This simulates a realistic sparse wide table scenario.
+
+ File dataFile = File.createTempFile("variant_10k_data", ".json")
+ dataFile.deleteOnExit()
+ int num_rows = 1000
+ try {
+ dataFile.withWriter { writer ->
+ StringBuilder sb = new StringBuilder()
+ for (int i = 1; i <= num_rows; i++) {
+ sb.setLength(0)
+ sb.append("{\"id\": ").append(i).append(", \"v\": {")
+ // Select 50 keys out of 10000 for each row
+ for (int k = 0; k < 50; k++) {
+ if (k > 0) sb.append(", ")
+ // Scatter the keys to ensure coverage of all 10000
columns across rows
+ int keyIdx = (i + k * 200) % 10000
+ sb.append('"k').append(keyIdx).append('":').append(i)
+ }
+ sb.append("}}\n")
+ writer.write(sb.toString())
+ }
+ }
+
+ // 3. Stream Load
+ streamLoad {
+ table table_export_name
+ set 'format', 'json'
+ set 'read_json_by_line', 'true'
+ file dataFile.getAbsolutePath()
+ time 60000 // 60s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(num_rows, json.NumberTotalRows)
+ assertEquals(num_rows, json.NumberLoadedRows)
+ }
+ }
+ } finally {
+ dataFile.delete()
+ }
+
+ // def format = "parquet"
+ def format = "native"
+
+ // 4. Export to S3 (Parquet)
+ def uuid = UUID.randomUUID().toString()
+ // def outFilePath = """/tmp/variant_10k_export"""
+ def outFilePath = """${outfile_path_prefix}_${uuid}"""
+ def label = "label_${uuid}"
+
+ try {
+ // for (int i = 0; i < 10; i++) {
+ // // exec export
+ // sql """
+ // EXPORT TABLE ${table_export_name} TO
"file://${outFilePath}/"
+ // PROPERTIES(
+ // "label" = "${label}_${i}",
+ // "format" = "${format}"
+ // )
+ // """
+ // long startExport = System.currentTimeMillis()
+ // def outfile_url = waiting_export.call("${label}_${i}")
+ // long endExport = System.currentTimeMillis()
+ // logger.info("Export 100k rows with variant took ${endExport -
startExport} ms")
+ // }
+ // exec export
+ // sql """
+ // EXPORT TABLE ${table_export_name} TO "file://${outFilePath}/"
+ // PROPERTIES(
+ // "label" = "${label}",
+ // "format" = "${format}"
+ // )
+ // """
+
+ sql """
+ EXPORT TABLE ${table_export_name} TO "s3://${outFilePath}/"
+ PROPERTIES(
+ "label" = "${label}",
+ "format" = "${format}"
+ )
+ WITH S3(
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}",
+ "provider" = "${getS3Provider()}"
+ );
+ """
+
+ long startExport = System.currentTimeMillis()
+ def outfile_url = waiting_export.call(label)
+ long endExport = System.currentTimeMillis()
+ logger.info("Export ${num_rows} rows with variant took ${endExport -
startExport} ms")
+
+ // 5. Validate by S3 TVF
+ def s3_tvf_sql = """ s3(
+ "uri" =
"http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(),
outfile_url.length() - 1)}0.${format}",
+ "s3.access_key"= "${ak}",
+ "s3.secret_key" = "${sk}",
+ "format" = "${format}",
+ "provider" = "${getS3Provider()}",
+ "region" = "${region}"
+ ) """
+
+ // Verify row count
+ def res = sql """ SELECT count(*) FROM ${s3_tvf_sql} """
+ assertEquals(num_rows, res[0][0])
+
+ def value_type = "VARIANT<PROPERTIES (\"variant_max_subcolumns_count\"
= \"2048\")>"
+ if (new Random().nextInt(2) == 0) {
+ value_type = "text"
+ }
+ // 6. Load back into Doris (to a new table) to verify import
performance/capability
+ sql """ DROP TABLE IF EXISTS ${table_load_name} """
+ sql """
+ CREATE TABLE ${table_load_name} (
+ `id` INT,
+ `v` ${value_type}
+ )
+ DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ """
+
+ // Use LOAD LABEL. The label contains '-' from UUID, so it must be
quoted with back-quotes.
+ def load_label = "load_${uuid}"
+
+ sql """
+ LOAD LABEL `${load_label}`
+ (
+ DATA INFILE("s3://${outFilePath}/*")
+ INTO TABLE ${table_load_name}
+ FORMAT AS "${format}"
+ (id, v)
+ )
+ WITH S3 (
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${region}",
+ "s3.secret_key"="${sk}",
+ "s3.access_key" = "${ak}",
+ "provider" = "${getS3Provider()}"
+ );
+ """
+
+ Awaitility.await().atMost(240, SECONDS).pollInterval(5,
SECONDS).until({
+ def loadResult = sql """
+ show load where label = '${load_label}'
+ """
+ if (loadResult.get(0).get(2) == 'CANCELLED' ||
loadResult.get(0).get(2) == 'FAILED') {
+ println("load failed: " + loadResult.get(0))
+ throw new RuntimeException("load failed"+ loadResult.get(0))
+ }
+ return loadResult.get(0).get(2) == 'FINISHED'
+ })
+ // Check if data loaded
+ def load_count = sql """ SELECT count(*) FROM ${table_load_name} """
+ assertEquals(num_rows, load_count[0][0])
+
+ // Check variant data integrity (sample)
+ // Row 1 has keys: (1 + k*200) % 10000. For k=0, key is k1. Value is 1.
+ def check_v = sql """ SELECT cast(v['k1'] as int) FROM
${table_load_name} WHERE id = 1 """
+ assertEquals(1, check_v[0][0])
+
+ } finally {
+ // try_sql("DROP TABLE IF EXISTS ${table_export_name}")
+ // try_sql("DROP TABLE IF EXISTS ${table_load_name}")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]